Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36pub(crate) fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40/// Public re-export for sibling modules (e.g. `pg_tx_store`) that need
41/// to build SQL strings against the same dialect rules. Keeping the
42/// crate-private function untouched preserves the existing call sites
43/// inside this module.
44#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46    quote_ident(name)
47}
48
49/// Public re-export of [`live::row_to_json`] for sibling modules. Used
50/// by `pg_tx_store` so transactional reads return rows in the same
51/// JSON shape as the non-transactional path.
52#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54    live::row_to_json(row)
55}
56
57/// Public re-export of the filter builder so the in-tx PgTxStore can
58/// reuse the same operator surface ($eq, $like, $in, $order, $limit,
59/// $offset, $not, $gt, $gte, $lt, $lte) as the non-tx path. Returns
60/// `(sql, params)` ready to feed to either `client.query` or
61/// `transaction.query`.
62#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64    entity: &str,
65    filter: &serde_json::Value,
66    valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68    live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71/// Public re-export of the aggregate builder. Returns
72/// `(sql, params, column_names)` — the column_names list drives the
73/// per-row projection in `aggregate_rows_to_json_pub`.
74#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76    entity: &str,
77    spec: &serde_json::Value,
78    valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80    live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83/// Public re-export of the post-processing helper that converts raw
84/// aggregate `Row`s into the `{ rows: [{...}] }` JSON shape.
85#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87    rows: &[postgres::Row],
88    column_names: &[String],
89) -> serde_json::Value {
90    live::aggregate_rows_to_json(rows, column_names)
91}
92
93// ---------------------------------------------------------------------------
94// SQL generation
95// ---------------------------------------------------------------------------
96
97/// Generate a Postgres CREATE TABLE statement.
98pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101    for field in fields {
102        let col_type = pg_column_type(&field.field_type);
103        let not_null = if field.optional { "" } else { " NOT NULL" };
104        let unique = if field.unique { " UNIQUE" } else { "" };
105        columns.push(format!(
106            "{} {}{}{}",
107            quote_ident(&field.name),
108            col_type,
109            not_null,
110            unique
111        ));
112    }
113
114    format!(
115        "CREATE TABLE IF NOT EXISTS {} ({})",
116        quote_ident(entity_name),
117        columns.join(", ")
118    )
119}
120
121/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
122/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
123/// Required-ness is tracked in the manifest; enforcement deferred.
124pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125    let col_type = pg_column_type(&field.field_type);
126    let unique = if field.unique { " UNIQUE" } else { "" };
127    format!(
128        "ALTER TABLE {} ADD COLUMN {} {}{}",
129        quote_ident(entity_name),
130        quote_ident(&field.name),
131        col_type,
132        unique
133    )
134}
135
136/// Generate a Postgres CREATE INDEX statement.
137pub fn create_index_sql(
138    entity_name: &str,
139    index_name: &str,
140    fields: &[String],
141    unique: bool,
142) -> String {
143    let unique_str = if unique { "UNIQUE " } else { "" };
144    let full_index_name = format!("{}_{}", entity_name, index_name);
145    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146    format!(
147        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148        unique_str,
149        quote_ident(&full_index_name),
150        quote_ident(entity_name),
151        quoted_fields.join(", ")
152    )
153}
154
155// ---------------------------------------------------------------------------
156// PostgresAdapter — planning-only adapter
157// ---------------------------------------------------------------------------
158
159/// A Postgres storage adapter. Currently supports planning only.
160/// No live connection — SQL generation and planning from manifest.
161pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165        // Plan from empty baseline.
166        let mut operations = Vec::new();
167
168        for entity in &target.entities {
169            let fields: Vec<FieldSpec> = entity
170                .fields
171                .iter()
172                .map(|f| FieldSpec {
173                    name: f.name.clone(),
174                    field_type: f.field_type.clone(),
175                    optional: f.optional,
176                    unique: f.unique,
177                })
178                .collect();
179
180            operations.push(SchemaOperation::CreateEntity {
181                name: entity.name.clone(),
182                fields,
183            });
184
185            for index in &entity.indexes {
186                operations.push(SchemaOperation::AddIndex {
187                    entity: entity.name.clone(),
188                    name: index.name.clone(),
189                    fields: index.fields.clone(),
190                    unique: index.unique,
191                });
192            }
193        }
194
195        if operations.is_empty() {
196            operations.push(SchemaOperation::Noop);
197        }
198
199        Ok(SchemaPlan { operations })
200    }
201
202    // apply_schema intentionally not implemented — uses default trait error.
203}
204
205/// Generate all SQL statements for a plan, in order.
206/// Useful for dry-run preview of what Postgres DDL would be executed.
207pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208    let mut statements = Vec::new();
209
210    for op in &plan.operations {
211        match op {
212            SchemaOperation::CreateEntity { name, fields } => {
213                statements.push(create_table_sql(name, fields));
214            }
215            SchemaOperation::AddField { entity, field } => {
216                statements.push(add_column_sql(entity, field));
217            }
218            SchemaOperation::AlterField {
219                entity,
220                previous,
221                target,
222            } => {
223                // Only nullable transitions today. SET / DROP NOT NULL is
224                // safe on a populated table when going from required →
225                // optional (existing rows already satisfy NOT NULL); the
226                // reverse direction (optional → required) succeeds only
227                // if every row has a non-null value, which the planner
228                // has no way to know — Postgres will fail the migration
229                // if it can't and the operator gets a clear error from
230                // the apply step.
231                if previous.optional && !target.optional {
232                    statements.push(format!(
233                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234                        quote_ident(entity),
235                        quote_ident(&target.name)
236                    ));
237                } else if !previous.optional && target.optional {
238                    statements.push(format!(
239                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240                        quote_ident(entity),
241                        quote_ident(&target.name)
242                    ));
243                }
244                // Nothing emitted when neither nullable nor type changed
245                // — falls through silently. AlterField with no actual
246                // shape change shouldn't happen in practice (the planner
247                // only emits it on real drift), but guard against
248                // emitting empty SQL just in case.
249            }
250            SchemaOperation::AddIndex {
251                entity,
252                name,
253                fields,
254                unique,
255            } => {
256                statements.push(create_index_sql(entity, name, fields, *unique));
257            }
258            SchemaOperation::CreateSearchIndex { entity, config } => {
259                #[cfg(feature = "postgres-live")]
260                {
261                    statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262                }
263                #[cfg(not(feature = "postgres-live"))]
264                {
265                    let _ = (entity, config);
266                    return Err(StorageError {
267                        code: "PG_SEARCH_FEATURE_OFF".into(),
268                        message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269                    });
270                }
271            }
272            SchemaOperation::RemoveSearchIndex { entity } => {
273                // Without the original config we don't know which
274                // facet/sort indexes were created. Drop the FTS table
275                // and the GIN index by their fixed names; per-field
276                // facet/sort indexes are dropped automatically by the
277                // entity DROP path. Operators removing search via
278                // schema diff will see leftover indexes only if the
279                // entity table itself still exists — at which point
280                // the next CREATE INDEX IF NOT EXISTS catches up.
281                //
282                // quote_ident on the synthetic table/index names so a
283                // malicious entity name with embedded `"` can't break
284                // out of the identifier.
285                statements.push(format!(
286                    "DROP TABLE IF EXISTS {} CASCADE",
287                    quote_ident(&format!("_fts_{entity}"))
288                ));
289                statements.push(format!(
290                    "DROP INDEX IF EXISTS {}",
291                    quote_ident(&format!("{entity}_fts_gin"))
292                ));
293            }
294            SchemaOperation::Noop => {}
295            other => {
296                return Err(StorageError {
297                    code: "PG_OP_UNSUPPORTED".into(),
298                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
299                });
300            }
301        }
302    }
303
304    Ok(statements)
305}
306
307// ---------------------------------------------------------------------------
308// Introspection SQL helpers
309//
310// These generate the SQL queries that a live Postgres connection would run
311// to read the current schema. No connection required — just SQL strings.
312// ---------------------------------------------------------------------------
313
314/// SQL to list user tables in the public schema.
315pub const INTROSPECT_TABLES_SQL: &str = "\
316    SELECT table_name \
317    FROM information_schema.tables \
318    WHERE table_schema = 'public' \
319      AND table_type = 'BASE TABLE' \
320      AND table_name NOT LIKE '_pylon_%' \
321    ORDER BY table_name";
322
323/// SQL to list columns for a given table.
324/// Use with parameter: table_name.
325pub const INTROSPECT_COLUMNS_SQL: &str = "\
326    SELECT column_name, data_type, is_nullable, \
327           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328            JOIN information_schema.key_column_usage kcu \
329              ON tc.constraint_name = kcu.constraint_name \
330            WHERE tc.table_name = c.table_name \
331              AND kcu.column_name = c.column_name \
332              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333    FROM information_schema.columns c \
334    WHERE table_schema = 'public' AND table_name = $1 \
335    ORDER BY ordinal_position";
336
337/// SQL to list indexes for a given table.
338/// Use with parameter: table_name.
339pub const INTROSPECT_INDEXES_SQL: &str = "\
340    SELECT i.relname as index_name, \
341           ix.indisunique as is_unique, \
342           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343    FROM pg_index ix \
344    JOIN pg_class t ON t.oid = ix.indrelid \
345    JOIN pg_class i ON i.oid = ix.indexrelid \
346    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347    JOIN pg_namespace n ON n.oid = t.relnamespace \
348    WHERE n.nspname = 'public' \
349      AND t.relname = $1 \
350      AND NOT ix.indisprimary \
351    GROUP BY i.relname, ix.indisunique \
352    ORDER BY i.relname";
353
354/// Plan from a snapshot (reuses the shared plan_from_snapshot).
355/// This allows Postgres to plan incrementally once introspection data is available.
356pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357    crate::plan_from_snapshot(snapshot, target)
358}
359
360// ---------------------------------------------------------------------------
361// CRUD SQL generation helpers (used by live adapter, testable without a DB)
362// ---------------------------------------------------------------------------
363
364/// Generate a lex-sortable, monotonic-ish unique ID.
365///
366/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
367/// of a per-process atomic counter. The counter prevents collisions when two
368/// inserts hit the same nanosecond and — critically — keeps order stable: an
369/// id minted at the same nanosecond is monotonically greater than the
370/// previous one. Width is fixed at 40 chars so lexicographic comparison
371/// matches creation order, which is what cursor pagination relies on.
372pub fn generate_id() -> String {
373    use std::sync::atomic::{AtomicU32, Ordering};
374    use std::time::{SystemTime, UNIX_EPOCH};
375    static COUNTER: AtomicU32 = AtomicU32::new(0);
376    let ts = SystemTime::now()
377        .duration_since(UNIX_EPOCH)
378        .unwrap_or_default()
379        .as_nanos();
380    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381    format!("{ts:032x}{seq:08x}")
382}
383
384/// Convert a JSON value to its string representation for use as a SQL parameter.
385///
386/// Kept for back-compat with callers that need a textual fallback (e.g.
387/// human-readable logs). New code should bind through [`JsonParam`] so
388/// integers/booleans/nulls reach Postgres in their typed form instead of
389/// collapsing to TEXT, which the driver can't coerce into INTEGER /
390/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
391/// into an empty string for FK columns.
392pub fn json_value_to_string(val: &serde_json::Value) -> String {
393    match val {
394        serde_json::Value::String(s) => s.clone(),
395        serde_json::Value::Number(n) => n.to_string(),
396        serde_json::Value::Bool(b) => b.to_string(),
397        serde_json::Value::Null => String::new(),
398        other => other.to_string(),
399    }
400}
401
402/// A typed wrapper around a JSON scalar that implements
403/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
404///
405/// The previous implementation passed every value as `String`, which broke
406/// non-text columns (the postgres driver can't bind a string literal into
407/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
408/// `null` into the empty string for nullable FKs (so `unlink` left
409/// dangling `""` references instead of NULL). `JsonParam` carries the
410/// JSON variant tag through to `to_sql` so the driver can pick the
411/// correct binary representation per column type.
412///
413/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
414/// runtime layer doesn't currently model array/object columns at the
415/// manifest level on Postgres, so anything that lands here came from
416/// caller-supplied prose that's expected to fit into a TEXT column.
417#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419    Null,
420    Text(String),
421    Int(i64),
422    Float(f64),
423    Bool(bool),
424}
425
426impl JsonParam {
427    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
428    /// that fit `i64` go through as Int; everything else goes through as
429    /// Float to preserve fractional / large-magnitude values.
430    pub fn from_json(val: &serde_json::Value) -> Self {
431        match val {
432            serde_json::Value::Null => JsonParam::Null,
433            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435            serde_json::Value::Number(n) => {
436                if let Some(i) = n.as_i64() {
437                    JsonParam::Int(i)
438                } else if let Some(f) = n.as_f64() {
439                    JsonParam::Float(f)
440                } else {
441                    JsonParam::Text(n.to_string())
442                }
443            }
444            other => JsonParam::Text(other.to_string()),
445        }
446    }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451    fn to_sql(
452        &self,
453        ty: &postgres::types::Type,
454        out: &mut bytes::BytesMut,
455    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456        use postgres::types::Type;
457
458        // Null binds as SQL NULL regardless of the column's declared
459        // type — the postgres driver treats `IsNull::Yes` as a null
460        // value of the requested type.
461        if matches!(self, JsonParam::Null) {
462            return Ok(postgres::types::IsNull::Yes);
463        }
464
465        // Match each JsonParam variant against the COLUMN's declared
466        // type so the binary encoding actually fits the target slot.
467        // Postgres rejects "binary data of wrong size" if you bind an
468        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
469        // exactly what the previous "everything is a String" path did
470        // on every non-TEXT column.
471        match (self, ty) {
472            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485            (JsonParam::Text(s), &Type::TEXT)
486            | (JsonParam::Text(s), &Type::VARCHAR)
487            | (JsonParam::Text(s), &Type::BPCHAR)
488            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490                // The runtime models datetimes as ISO 8601 strings
491                // (`pylon_kernel::util::now_iso` shape, plus
492                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
493                // binary wire format is `i64` microseconds since
494                // 2000-01-01 UTC — NOT the bytes of an ISO string.
495                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
496                // which advertised TIMESTAMPTZ format but wrote raw
497                // ASCII; Postgres rejected with "incorrect binary
498                // data format in bind parameter N". This was the
499                // OAuth-callback failure mode on pylon-cloud
500                // (User.createdAt). Parse via chrono and let the
501                // postgres crate's `with-chrono-0_4` ToSql impl
502                // emit the proper binary format.
503                let dt = chrono::DateTime::parse_from_rfc3339(s)
504                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505                    .with_timezone(&chrono::Utc);
506                dt.to_sql(ty, out)
507            }
508            (JsonParam::Text(s), &Type::TIMESTAMP) => {
509                // TIMESTAMP (no timezone) — same conversion shape but
510                // bind as NaiveDateTime so chrono picks the right
511                // binary encoding for the column.
512                let dt = chrono::DateTime::parse_from_rfc3339(s)
513                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514                    .with_timezone(&chrono::Utc)
515                    .naive_utc();
516                dt.to_sql(ty, out)
517            }
518            (JsonParam::Text(s), &Type::DATE) => {
519                let dt = chrono::DateTime::parse_from_rfc3339(s)
520                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521                    .with_timezone(&chrono::Utc)
522                    .date_naive();
523                dt.to_sql(ty, out)
524            }
525
526            // Cross-type fallback: render as text and bind into a TEXT
527            // slot, OR error if the target column doesn't accept text.
528            // Catches "manifest says INT but caller sent a stringified
529            // number" — better to fail loudly than silently coerce.
530            (other, _) => {
531                let s = match other {
532                    JsonParam::Bool(b) => b.to_string(),
533                    JsonParam::Int(n) => n.to_string(),
534                    JsonParam::Float(f) => f.to_string(),
535                    JsonParam::Text(s) => s.clone(),
536                    JsonParam::Null => unreachable!(),
537                };
538                s.to_sql(ty, out)
539            }
540        }
541    }
542
543    fn accepts(_ty: &postgres::types::Type) -> bool {
544        // Defer per-variant acceptance to to_sql_checked, which dispatches
545        // to the inner type's ToSql impl. Returning `true` here matches
546        // the postgres crate's recommended pattern for sum-type wrappers.
547        true
548    }
549
550    postgres::types::to_sql_checked!();
551}
552
553/// Build an INSERT SQL statement and collect typed parameter values.
554/// Returns `(sql, params)` where `params[0]` is the generated ID
555/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
556/// value so the postgres driver can bind them to typed columns
557/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
558/// the database as SQL NULL — the previous string-collapsing path stored
559/// `""` for nullable FKs and broke any non-text column.
560pub fn build_insert_sql(
561    entity: &str,
562    data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564    let obj = data.as_object().ok_or_else(|| StorageError {
565        code: "PG_INVALID_DATA".into(),
566        message: "Insert data must be a JSON object".into(),
567    })?;
568
569    // Honor caller-supplied `id` (used by the CRDT path that needs to
570    // share the row id with the LoroDoc snapshot key it just wrote).
571    // Fall back to a fresh ULID-shaped id when absent. A non-string
572    // `id` value is rejected explicitly — silently regenerating would
573    // mask schema bugs (e.g. a caller passing an int id) and let the
574    // CRDT snapshot key drift from the materialized row id.
575    let id = match obj.get("id") {
576        None | Some(serde_json::Value::Null) => generate_id(),
577        Some(serde_json::Value::String(s)) => s.clone(),
578        Some(other) => {
579            return Err(StorageError {
580                code: "PG_INVALID_ID".into(),
581                message: format!(
582                    "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583                     are always strings (40-char hex). Drop the `id` field to let the \
584                     server generate one, or supply a string."
585                ),
586            });
587        }
588    };
589
590    let mut col_names = vec!["id".to_string()];
591    let mut placeholders = vec!["$1".to_string()];
592    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594    let mut i = 0usize;
595    for (key, val) in obj {
596        if key == "id" {
597            // Already emitted via the synthetic first column; skipping
598            // here avoids `INSERT ... (id, id, ...)` which Postgres
599            // rejects with `duplicate key value`.
600            continue;
601        }
602        col_names.push(quote_ident(key));
603        placeholders.push(format!("${}", i + 2));
604        values.push(JsonParam::from_json(val));
605        i += 1;
606    }
607
608    let sql = format!(
609        "INSERT INTO {} ({}) VALUES ({})",
610        quote_ident(entity),
611        col_names.join(", "),
612        placeholders.join(", ")
613    );
614
615    Ok((sql, values))
616}
617
618/// Build an UPDATE SQL statement and collect typed parameter values.
619/// Returns `(sql, params)` where `params[0]` is the row ID.
620pub fn build_update_sql(
621    entity: &str,
622    id: &str,
623    data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625    let obj = data.as_object().ok_or_else(|| StorageError {
626        code: "PG_INVALID_DATA".into(),
627        message: "Update data must be a JSON object".into(),
628    })?;
629
630    if obj.is_empty() {
631        return Err(StorageError {
632            code: "PG_INVALID_DATA".into(),
633            message: "Update data must contain at least one field".into(),
634        });
635    }
636
637    let mut set_clauses = Vec::new();
638    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640    let mut i = 0usize;
641    for (key, val) in obj {
642        if key == "id" {
643            // Reject primary-key mutation. Letting `id` into the SET
644            // clause lets a client move a row out from under its CRDT
645            // sidecar (which is keyed by the original row_id) and its
646            // FTS shadow row (FK-bound to the original id). The SQLite
647            // path silently drops `id` here too — keep the same
648            // shape, but errors so the caller sees the bug.
649            return Err(StorageError {
650                code: "PG_INVALID_UPDATE".into(),
651                message:
652                    "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653                     drop the field from the patch."
654                        .into(),
655            });
656        }
657        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658        values.push(JsonParam::from_json(val));
659        i += 1;
660    }
661
662    if set_clauses.is_empty() {
663        return Err(StorageError {
664            code: "PG_INVALID_DATA".into(),
665            message: "Update data must contain at least one non-id field".into(),
666        });
667    }
668
669    let sql = format!(
670        "UPDATE {} SET {} WHERE id = $1",
671        quote_ident(entity),
672        set_clauses.join(", ")
673    );
674
675    Ok((sql, values))
676}
677
678/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
679/// at insert/update/transact call sites. The postgres driver wants a
680/// slice of trait objects; this avoids repeating the same map at each site.
681#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683    values
684        .iter()
685        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686        .collect()
687}
688
689// ---------------------------------------------------------------------------
690// Live Postgres adapter (requires "postgres-live" feature)
691// ---------------------------------------------------------------------------
692
693#[cfg(feature = "postgres-live")]
694pub mod live {
695    use super::*;
696    use crate::{
697        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698    };
699
700    /// Connect to Postgres with the same TLS handling the entity
701    /// store uses. Strips libpq-only URL params (`sslmode=verify-full`,
702    /// `sslrootcert=system`) and uses rustls + native CA store when
703    /// the URL asks for TLS. Used by the auxiliary auth backends
704    /// (sessions, OAuth state, magic codes, accounts, API keys, orgs)
705    /// so they all behave identically to the entity store on managed
706    /// providers (Fly Postgres, PlanetScale, Neon, Supabase).
707    pub fn connect_pg(url: &str) -> Result<postgres::Client, String> {
708        let (cleaned, ssl) = parse_pg_url_ssl(url);
709        if ssl.use_tls {
710            let mut roots = rustls::RootCertStore::empty();
711            let native_certs = rustls_native_certs::load_native_certs();
712            for cert in native_certs.certs {
713                let _ = roots.add(cert);
714            }
715            if !native_certs.errors.is_empty() {
716                tracing::warn!(
717                    "[pg] rustls native cert load reported {} non-fatal errors",
718                    native_certs.errors.len()
719                );
720            }
721            let config = rustls::ClientConfig::builder()
722                .with_root_certificates(roots)
723                .with_no_client_auth();
724            let tls = tokio_postgres_rustls::MakeRustlsConnect::new(config);
725            postgres::Client::connect(&cleaned, tls).map_err(|e| format!("PG connect: {e}"))
726        } else {
727            postgres::Client::connect(&cleaned, postgres::NoTls)
728                .map_err(|e| format!("PG connect: {e}"))
729        }
730    }
731
732    /// SSL parsing result for `parse_pg_url_ssl`. We need both the
733    /// cleaned-up URL (libpq-only params stripped) and the boolean
734    /// "should we use TLS for this connection" so the connect path
735    /// can pick between `NoTls` and `MakeTlsConnector`.
736    pub struct PgUrlSsl {
737        pub use_tls: bool,
738    }
739
740    /// Pre-process a Postgres URL: strip libpq-specific query params
741    /// the Rust `postgres` crate's URL parser doesn't accept, and
742    /// figure out whether TLS should be enabled for the connection.
743    ///
744    /// Recognizes:
745    ///   - sslmode=disable / prefer / allow → no TLS
746    ///   - sslmode=require / verify-ca / verify-full → TLS
747    ///   - sslrootcert=system → TLS, use OS CA store (handled by
748    ///     `native-tls` automatically; we just record the intent)
749    ///   - sslrootcert=<path> → TLS, but the path is currently
750    ///     ignored — `native-tls` reads system roots only. Logged
751    ///     as a one-time warning so operators see the gap.
752    ///
753    /// Anything we don't understand is dropped from the URL silently
754    /// (defense against future libpq additions confusing the parser).
755    pub fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
756        let (base, query) = match url.find('?') {
757            Some(idx) => (&url[..idx], &url[idx + 1..]),
758            None => return (url.to_string(), PgUrlSsl { use_tls: false }),
759        };
760
761        let mut use_tls = false;
762        let mut kept: Vec<String> = Vec::new();
763        for pair in query.split('&') {
764            if pair.is_empty() {
765                continue;
766            }
767            let (k, v) = match pair.find('=') {
768                Some(i) => (&pair[..i], &pair[i + 1..]),
769                None => (pair, ""),
770            };
771            match k {
772                "sslmode" => match v.to_ascii_lowercase().as_str() {
773                    "disable" | "allow" => {
774                        // Caller said "no TLS" — pass through to the
775                        // postgres crate, which knows `disable`.
776                        kept.push("sslmode=disable".to_string());
777                    }
778                    "prefer" => {
779                        // Postgres crate's default; let it through.
780                        kept.push("sslmode=prefer".to_string());
781                    }
782                    "require" | "verify-ca" | "verify-full" | "" => {
783                        // All the "use TLS" modes. Rust crate only
784                        // accepts `require`; verify-* would be
785                        // rejected. Normalize to `require` and let
786                        // our TLS connector handle cert verification.
787                        use_tls = true;
788                        kept.push("sslmode=require".to_string());
789                    }
790                    other => {
791                        tracing::warn!(
792                            "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
793                        );
794                        use_tls = true;
795                        kept.push("sslmode=require".to_string());
796                    }
797                },
798                "sslrootcert" => {
799                    // Either "system" (use OS roots — that's what
800                    // native-tls does anyway) or a path (which we
801                    // can't honor without bringing in openssl). Log
802                    // and drop in both cases; TLS still happens.
803                    if v != "system" && !v.is_empty() {
804                        tracing::warn!(
805                            "[pg] sslrootcert={v} ignored — native-tls uses system roots"
806                        );
807                    }
808                    use_tls = true;
809                }
810                _ => {
811                    // Forward everything else (search_path, application_name,
812                    // connect_timeout, etc.) so libpq-style URLs that work
813                    // for the rest of the world keep working here.
814                    kept.push(pair.to_string());
815                }
816            }
817        }
818
819        let cleaned = if kept.is_empty() {
820            base.to_string()
821        } else {
822            format!("{}?{}", base, kept.join("&"))
823        };
824        (cleaned, PgUrlSsl { use_tls })
825    }
826
827    #[cfg(test)]
828    mod url_tests {
829        use super::parse_pg_url_ssl;
830
831        #[test]
832        fn strips_libpq_only_sslmode_verify_full() {
833            let (cleaned, ssl) = parse_pg_url_ssl(
834                "postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system",
835            );
836            assert!(ssl.use_tls);
837            // verify-full normalized to require; sslrootcert dropped.
838            assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
839        }
840
841        #[test]
842        fn passes_through_disable() {
843            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
844            assert!(!ssl.use_tls);
845            assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
846        }
847
848        #[test]
849        fn no_query_string_no_tls() {
850            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
851            assert!(!ssl.use_tls);
852            assert_eq!(cleaned, "postgres://h/db");
853        }
854
855        #[test]
856        fn unknown_params_pass_through() {
857            let (cleaned, _) = parse_pg_url_ssl(
858                "postgres://h/db?application_name=pylon&connect_timeout=5",
859            );
860            assert!(cleaned.contains("application_name=pylon"));
861            assert!(cleaned.contains("connect_timeout=5"));
862        }
863
864        #[test]
865        fn sslrootcert_alone_enables_tls() {
866            // Rare but valid — sslrootcert=system implies TLS even
867            // without an explicit sslmode.
868            let (cleaned, ssl) = parse_pg_url_ssl(
869                "postgres://h/db?sslrootcert=system",
870            );
871            assert!(ssl.use_tls);
872            // No sslmode synthesized — the postgres crate defaults
873            // to `prefer` which happily upgrades to our TLS connector.
874            assert_eq!(cleaned, "postgres://h/db");
875        }
876    }
877
878    /// A live Postgres adapter with a real database connection.
879    pub struct LivePostgresAdapter {
880        client: postgres::Client,
881    }
882
883    impl LivePostgresAdapter {
884        /// Borrow the underlying postgres client mutably. Used by
885        /// `PostgresDataStore::with_transaction` to start an
886        /// interactive transaction across multiple TS-function
887        /// `ctx.db` calls. `pub(crate)` because exposing raw
888        /// `&mut Client` outside pylon-storage would let callers
889        /// issue arbitrary SQL, bypassing the typed `DataStore`
890        /// surface that the rest of the framework relies on.
891        pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
892            &mut self.client
893        }
894
895        /// Connect to a Postgres database.
896        ///
897        /// Honors libpq-style URL params the Rust postgres crate doesn't
898        /// natively understand:
899        ///   - `sslmode=verify-full`, `verify-ca`, `require` → use TLS
900        ///     via rustls + the OS trust store. The Rust postgres crate
901        ///     only knows `disable`/`prefer`/`require` — it rejects the
902        ///     libpq extras with "invalid connection string".
903        ///   - `sslrootcert=system` → trust the OS CA store (rustls-
904        ///     native-certs reads it via SecurityFramework / SChannel /
905        ///     /etc/ssl). `sslrootcert=<path>` is not yet supported and
906        ///     falls back to system roots with a warning.
907        ///
908        /// Strips the libpq-only params from the URL before passing to
909        /// the postgres crate's parser, so it doesn't choke. Common
910        /// real-world example that was failing pre-fix: Fly Postgres
911        /// emits `?sslmode=verify-full&sslrootcert=system` URLs.
912        ///
913        /// Uses rustls (not native-tls/openssl) so binary builds
914        /// cross-compile cleanly on musl + arm64 without needing
915        /// OPENSSL_DIR. Pure Rust all the way down.
916        pub fn connect(url: &str) -> Result<Self, StorageError> {
917            let client = connect_pg(url).map_err(|e| StorageError {
918                code: "PG_CONNECT_FAILED".into(),
919                message: format!("Failed to connect to Postgres: {e}"),
920            })?;
921            Ok(Self { client })
922        }
923
924        /// Read the current schema from the live database.
925        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
926            let table_rows = self
927                .client
928                .query(INTROSPECT_TABLES_SQL, &[])
929                .map_err(pg_err)?;
930
931            let mut tables = Vec::new();
932            for row in &table_rows {
933                let table_name: String = row.get(0);
934                let columns = self.read_columns(&table_name)?;
935                let indexes = self.read_indexes(&table_name)?;
936                tables.push(TableSnapshot {
937                    name: table_name,
938                    columns,
939                    indexes,
940                });
941            }
942
943            Ok(SchemaSnapshot { tables })
944        }
945
946        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
947            let rows = self
948                .client
949                .query(INTROSPECT_COLUMNS_SQL, &[&table])
950                .map_err(pg_err)?;
951
952            let mut columns = Vec::new();
953            for row in &rows {
954                let name: String = row.get(0);
955                let data_type: String = row.get(1);
956                let is_nullable: String = row.get(2);
957                let is_pk: i64 = row.get(3);
958                columns.push(ColumnSnapshot {
959                    name,
960                    column_type: data_type,
961                    notnull: is_nullable == "NO",
962                    primary_key: is_pk > 0,
963                });
964            }
965            Ok(columns)
966        }
967
968        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
969            let rows = self
970                .client
971                .query(INTROSPECT_INDEXES_SQL, &[&table])
972                .map_err(pg_err)?;
973
974            let mut indexes = Vec::new();
975            for row in &rows {
976                let name: String = row.get(0);
977                let unique: bool = row.get(1);
978                let columns: Vec<String> = row.get(2);
979                indexes.push(IndexSnapshot {
980                    name,
981                    columns,
982                    unique,
983                });
984            }
985            Ok(indexes)
986        }
987
988        /// Plan from live database state.
989        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
990            let snapshot = self.read_schema()?;
991            Ok(crate::plan_from_snapshot(&snapshot, target))
992        }
993    }
994
995    impl StorageAdapter for LivePostgresAdapter {
996        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
997            Err(StorageError {
998                code: "PG_PLAN_NEEDS_MUTABLE".into(),
999                message: "Use plan_from_live() instead for live Postgres planning".into(),
1000            })
1001        }
1002
1003        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
1004            Err(StorageError {
1005                code: "PG_APPLY_USE_METHOD".into(),
1006                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
1007            })
1008        }
1009    }
1010
1011    impl LivePostgresAdapter {
1012        /// Apply a schema plan to the live database.
1013        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
1014            let statements = plan_to_sql(plan)?;
1015            for sql in &statements {
1016                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
1017            }
1018            Ok(())
1019        }
1020
1021        /// Execute a raw SQL statement against the live database. Used by
1022        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
1023        /// production code should go through `apply_plan` so changes are
1024        /// represented in the migration history. Returns the number of
1025        /// rows affected.
1026        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1027            self.client.execute(sql, &[]).map_err(pg_err)
1028        }
1029
1030        /// Insert a row. Returns the generated ID.
1031        pub fn insert(
1032            &mut self,
1033            entity: &str,
1034            data: &serde_json::Value,
1035        ) -> Result<String, StorageError> {
1036            let (sql, values) = build_insert_sql(entity, data)?;
1037            // The first param is always the generated ID — extract it before
1038            // we hand `values` off to the postgres driver as borrowed slices.
1039            let id = match &values[0] {
1040                JsonParam::Text(s) => s.clone(),
1041                _ => {
1042                    return Err(StorageError {
1043                        code: "PG_INTERNAL".into(),
1044                        message: "build_insert_sql produced non-text id param".into(),
1045                    });
1046                }
1047            };
1048            let params = as_pg_params(&values);
1049            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1050            Ok(id)
1051        }
1052
1053        /// Get a row by ID.
1054        pub fn get_by_id(
1055            &mut self,
1056            entity: &str,
1057            id: &str,
1058        ) -> Result<Option<serde_json::Value>, StorageError> {
1059            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1060            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1061
1062            match rows.first() {
1063                Some(row) => Ok(Some(row_to_json(row))),
1064                None => Ok(None),
1065            }
1066        }
1067
1068        /// List all rows from an entity.
1069        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1070            let sql = format!("SELECT * FROM {}", quote_ident(entity));
1071            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1072
1073            Ok(rows.iter().map(row_to_json).collect())
1074        }
1075
1076        /// Cursor-paginated list. `after` is the last `id` from the previous
1077        /// page; the result contains rows with `id > after` (lex order),
1078        /// limited to `limit`. Used for sync push/pull.
1079        pub fn list_after(
1080            &mut self,
1081            entity: &str,
1082            after: Option<&str>,
1083            limit: usize,
1084        ) -> Result<Vec<serde_json::Value>, StorageError> {
1085            // Cap limit at a sensible upper bound so a malicious client can't
1086            // stream the whole table by passing limit=u64::MAX.
1087            let capped: i64 = limit.min(10_000) as i64;
1088            let sql = match after {
1089                Some(_) => format!(
1090                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1091                    quote_ident(entity)
1092                ),
1093                None => format!(
1094                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1095                    quote_ident(entity)
1096                ),
1097            };
1098            let rows = match after {
1099                Some(cursor) => self
1100                    .client
1101                    .query(sql.as_str(), &[&cursor, &capped])
1102                    .map_err(pg_err)?,
1103                None => self
1104                    .client
1105                    .query(sql.as_str(), &[&capped])
1106                    .map_err(pg_err)?,
1107            };
1108            Ok(rows.iter().map(row_to_json).collect())
1109        }
1110
1111        /// Update a row by ID. Returns true if the row was found and updated.
1112        pub fn update(
1113            &mut self,
1114            entity: &str,
1115            id: &str,
1116            data: &serde_json::Value,
1117        ) -> Result<bool, StorageError> {
1118            let (sql, values) = build_update_sql(entity, id, data)?;
1119            let params = as_pg_params(&values);
1120            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1121            Ok(rows_affected > 0)
1122        }
1123
1124        /// Delete a row by ID. Returns true if the row was found and deleted.
1125        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1126            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1127            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1128            Ok(rows_affected > 0)
1129        }
1130
1131        /// Look up a row by `field = value`. Caller must validate `field`
1132        /// against the manifest before calling — we still `quote_ident` it
1133        /// but won't catch a typo against the entity definition.
1134        pub fn lookup_field(
1135            &mut self,
1136            entity: &str,
1137            field: &str,
1138            value: &str,
1139        ) -> Result<Option<serde_json::Value>, StorageError> {
1140            let sql = format!(
1141                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1142                quote_ident(entity),
1143                quote_ident(field),
1144            );
1145            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1146            Ok(rows.first().map(row_to_json))
1147        }
1148
1149        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
1150        ///
1151        /// Supported operators (parity with the SQLite path):
1152        /// - Equality (`field: value`)
1153        /// - `$not`: emits `field != value`
1154        /// - `$gt` / `$gte` / `$lt` / `$lte`
1155        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
1156        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
1157        ///   if/when the SQLite side adds it)
1158        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
1159        ///
1160        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
1161        ///
1162        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
1163        /// would need a tsvector column or a generic ILIKE OR-fold across
1164        /// every text field, neither of which is wired up yet. Returns
1165        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
1166        /// receiving silently-broad results.
1167        ///
1168        /// Anything else is silently ignored (matches the in-memory fallback's
1169        /// permissive behavior). Field names are validated against `valid_columns`
1170        /// to prevent SQL injection — pass the entity's column set.
1171        pub fn query_filtered(
1172            &mut self,
1173            entity: &str,
1174            filter: &serde_json::Value,
1175            valid_columns: &[String],
1176        ) -> Result<Vec<serde_json::Value>, StorageError> {
1177            let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1178            let pg_params = as_pg_params(&params);
1179            let rows = self
1180                .client
1181                .query(sql.as_str(), &pg_params)
1182                .map_err(pg_err)?;
1183            Ok(rows.iter().map(row_to_json).collect())
1184        }
1185
1186        /// Build the `SELECT ... FROM entity ...` SQL + bound params for
1187        /// a `query_filtered` request. Pure: takes a manifest's column
1188        /// list, returns text. Both the live adapter and the in-tx
1189        /// `PgTxStore` call this so the operator surface ($eq, $like,
1190        /// $in, $order, $limit, $offset) stays identical regardless of
1191        /// where the query runs.
1192        pub(crate) fn build_query_filtered_sql(
1193            entity: &str,
1194            filter: &serde_json::Value,
1195            valid_columns: &[String],
1196        ) -> Result<(String, Vec<JsonParam>), StorageError> {
1197            let empty = serde_json::Map::new();
1198            let obj = filter.as_object().unwrap_or(&empty);
1199
1200            let validate = |col: &str| -> Result<(), StorageError> {
1201                if col == "id" || valid_columns.iter().any(|c| c == col) {
1202                    Ok(())
1203                } else {
1204                    Err(StorageError {
1205                        code: "UNKNOWN_COLUMN".into(),
1206                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1207                    })
1208                }
1209            };
1210
1211            let mut where_clauses: Vec<String> = Vec::new();
1212            let mut order_clause = String::new();
1213            let mut limit_clause = String::new();
1214            let mut offset_clause = String::new();
1215            // Collect (col, op, value) so placeholder numbers can be assigned
1216            // in a single materialization pass after the parse loop. Values
1217            // are now JsonParam (typed) instead of String — see `value_to_pg`.
1218            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1219
1220            for (key, val) in obj {
1221                match key.as_str() {
1222                    "$search" => {
1223                        // PG full-text via the entity's `_fts_<entity>`
1224                        // shadow table: `id IN (SELECT entity_id FROM
1225                        // _fts_<E> WHERE tsv @@ plainto_tsquery(...))`.
1226                        // Mirrors the SQLite path that joins the FTS5
1227                        // virtual table; the join here is a subquery so
1228                        // it composes with arbitrary other predicates
1229                        // (`$gt`, `$in`, etc.) in the same WHERE.
1230                        let raw = match val {
1231                            serde_json::Value::String(s) => s.clone(),
1232                            other => other.to_string(),
1233                        };
1234                        // Bind as a normal text param so all the
1235                        // existing placeholder-numbering and binding
1236                        // logic applies. The SQL uses the placeholder
1237                        // inside `plainto_tsquery('english', $N)`.
1238                        // Positioning logic mirrors the `$in` arm: the
1239                        // value will land at `planned.len()+1` because
1240                        // the materialization pass below pushes one
1241                        // param per planned item in order.
1242                        let placeholder_n = planned.len() + 1;
1243                        where_clauses.push(format!(
1244                            "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1245                                       WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1246                            quote_ident(entity),
1247                        ));
1248                        // Reuse the IN-style sentinel so the
1249                        // materialization pass below pushes the param
1250                        // without re-emitting a where_clause for it.
1251                        planned.push((
1252                            format!("__search_{}", planned.len()),
1253                            "__INLINE__".into(),
1254                            JsonParam::Text(raw),
1255                        ));
1256                    }
1257                    "$order" => {
1258                        if let Some(ord) = val.as_object() {
1259                            let mut parts = Vec::new();
1260                            for (col, dir) in ord {
1261                                validate(col)?;
1262                                let d = match dir.as_str().unwrap_or("asc") {
1263                                    "desc" | "DESC" => "DESC",
1264                                    _ => "ASC",
1265                                };
1266                                parts.push(format!("{} {d}", quote_ident(col)));
1267                            }
1268                            if !parts.is_empty() {
1269                                order_clause = format!(" ORDER BY {}", parts.join(", "));
1270                            }
1271                        }
1272                    }
1273                    "$limit" => {
1274                        if let Some(n) = val.as_u64() {
1275                            limit_clause = format!(" LIMIT {}", n);
1276                        }
1277                    }
1278                    "$offset" => {
1279                        if let Some(n) = val.as_u64() {
1280                            offset_clause = format!(" OFFSET {}", n);
1281                        }
1282                    }
1283                    field => {
1284                        validate(field)?;
1285                        match val {
1286                            serde_json::Value::Object(ops) => {
1287                                for (op, v) in ops {
1288                                    match op.as_str() {
1289                                        "$not" => planned.push((
1290                                            field.into(),
1291                                            "!=".into(),
1292                                            value_to_pg(v),
1293                                        )),
1294                                        "$gt" => {
1295                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
1296                                        }
1297                                        "$gte" => planned.push((
1298                                            field.into(),
1299                                            ">=".into(),
1300                                            value_to_pg(v),
1301                                        )),
1302                                        "$lt" => {
1303                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
1304                                        }
1305                                        "$lte" => planned.push((
1306                                            field.into(),
1307                                            "<=".into(),
1308                                            value_to_pg(v),
1309                                        )),
1310                                        "$like" => {
1311                                            // Wrap in `%...%` to match the
1312                                            // SQLite path's substring
1313                                            // semantics. Pre-fix divergence:
1314                                            // SQLite wrapped, PG forwarded
1315                                            // literally — `{name: {$like: "ann"}}`
1316                                            // matched "Joanne" on SQLite but
1317                                            // nothing on PG. Caller-supplied
1318                                            // wildcards inside the value still
1319                                            // work (`%j_n%` etc.) because we
1320                                            // only add wraps, never strip.
1321                                            let raw = match v {
1322                                                serde_json::Value::String(s) => s.clone(),
1323                                                other => other.to_string(),
1324                                            };
1325                                            planned.push((
1326                                                field.into(),
1327                                                "LIKE".into(),
1328                                                JsonParam::Text(format!("%{raw}%")),
1329                                            ));
1330                                        }
1331                                        "$in" => {
1332                                            if let Some(arr) = v.as_array() {
1333                                                if arr.is_empty() {
1334                                                    // `field IN ()` is invalid
1335                                                    // SQL on PG (and on SQLite
1336                                                    // too, technically — its
1337                                                    // path also short-circuits).
1338                                                    // An empty $in matches
1339                                                    // nothing; emit a guaranteed-
1340                                                    // false predicate so the
1341                                                    // parser doesn't choke and
1342                                                    // the result set comes back
1343                                                    // empty.
1344                                                    where_clauses.push("FALSE".into());
1345                                                } else {
1346                                                    let placeholders: Vec<String> = (0..arr.len())
1347                                                        .map(|i| {
1348                                                            format!("${}", planned.len() + 1 + i)
1349                                                        })
1350                                                        .collect();
1351                                                    where_clauses.push(format!(
1352                                                        "{} IN ({})",
1353                                                        quote_ident(field),
1354                                                        placeholders.join(", "),
1355                                                    ));
1356                                                    for x in arr {
1357                                                        planned.push((
1358                                                            format!("__inline_{}", planned.len()),
1359                                                            "__INLINE__".into(),
1360                                                            value_to_pg(x),
1361                                                        ));
1362                                                    }
1363                                                }
1364                                            }
1365                                        }
1366                                        _ => {}
1367                                    }
1368                                }
1369                            }
1370                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1371                        }
1372                    }
1373                }
1374            }
1375
1376            // Materialize planned -> SQL + params.
1377            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1378            for (field, op, v) in &planned {
1379                if op == "__INLINE__" {
1380                    // Already emitted via the IN-clause path; just push the value.
1381                    params.push(v.clone());
1382                } else {
1383                    let placeholder = format!("${}", params.len() + 1);
1384                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1385                    params.push(v.clone());
1386                }
1387            }
1388
1389            let where_sql = if where_clauses.is_empty() {
1390                String::new()
1391            } else {
1392                format!(" WHERE {}", where_clauses.join(" AND "))
1393            };
1394            // Default deterministic order when the caller didn't pass
1395            // `$order` — matches the SQLite path. Without this,
1396            // identical queries return rows in different orders across
1397            // backends, which makes paginated APIs flaky.
1398            let final_order = if order_clause.is_empty() {
1399                format!(" ORDER BY {}", quote_ident("id"))
1400            } else {
1401                order_clause
1402            };
1403            let sql = format!(
1404                "SELECT * FROM {}{}{}{}{}",
1405                quote_ident(entity),
1406                where_sql,
1407                final_order,
1408                limit_clause,
1409                offset_clause,
1410            );
1411
1412            Ok((sql, params))
1413        }
1414
1415        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1416        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1417        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1418        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1419        /// via `date_trunc`), and a flat-equality `where` filter.
1420        ///
1421        /// Spec format (same JSON shape used by the SQLite path):
1422        /// ```json
1423        /// { "count": "*",
1424        ///   "sum": ["amount"],
1425        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1426        ///   "where": {"status": "paid"} }
1427        /// ```
1428        ///
1429        /// `valid_columns` is used to validate every field name before it's
1430        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1431        /// `DataStore` impl in this crate) supplies the entity's column set
1432        /// from the manifest.
1433        pub fn aggregate(
1434            &mut self,
1435            entity: &str,
1436            spec: &serde_json::Value,
1437            valid_columns: &[String],
1438        ) -> Result<serde_json::Value, StorageError> {
1439            let (sql, params, column_names) =
1440                Self::build_aggregate_sql(entity, spec, valid_columns)?;
1441            let pg_params = as_pg_params(&params);
1442            let rows = self
1443                .client
1444                .query(sql.as_str(), &pg_params)
1445                .map_err(pg_err)?;
1446            Ok(aggregate_rows_to_json(&rows, &column_names))
1447        }
1448
1449        /// Build the aggregate `SELECT` SQL + bound params + the
1450        /// expected output column names. Pure: takes the entity's
1451        /// validated column list, returns text. Both the live adapter
1452        /// and the in-tx `PgTxStore` call this so spec parsing
1453        /// (validation, bucket vocabulary, where-clause translation)
1454        /// stays identical regardless of where the query runs.
1455        pub(crate) fn build_aggregate_sql(
1456            entity: &str,
1457            spec: &serde_json::Value,
1458            valid_columns: &[String],
1459        ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1460            let obj = spec.as_object().ok_or_else(|| StorageError {
1461                code: "INVALID_QUERY".into(),
1462                message: "aggregate spec must be a JSON object".into(),
1463            })?;
1464
1465            let validate = |col: &str| -> Result<(), StorageError> {
1466                if col == "id" || valid_columns.iter().any(|c| c == col) {
1467                    Ok(())
1468                } else {
1469                    Err(StorageError {
1470                        code: "UNKNOWN_COLUMN".into(),
1471                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1472                    })
1473                }
1474            };
1475
1476            let mut select_parts: Vec<String> = Vec::new();
1477            let mut result_fields: Vec<String> = Vec::new();
1478
1479            if let Some(count) = obj.get("count") {
1480                match count {
1481                    serde_json::Value::String(s) if s == "*" => {
1482                        select_parts.push("COUNT(*) AS count".into());
1483                        result_fields.push("count".into());
1484                    }
1485                    serde_json::Value::String(field) => {
1486                        validate(field)?;
1487                        let alias = format!("count_{field}");
1488                        select_parts.push(format!(
1489                            "COUNT({}) AS {}",
1490                            quote_ident(field),
1491                            quote_ident(&alias),
1492                        ));
1493                        result_fields.push(alias);
1494                    }
1495                    _ => {}
1496                }
1497            }
1498
1499            for (fn_name, prefix) in [
1500                ("sum", "sum_"),
1501                ("avg", "avg_"),
1502                ("min", "min_"),
1503                ("max", "max_"),
1504            ] {
1505                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1506                    for field in fields {
1507                        if let Some(f) = field.as_str() {
1508                            validate(f)?;
1509                            let alias = format!("{prefix}{f}");
1510                            let sql_fn = fn_name.to_uppercase();
1511                            select_parts.push(format!(
1512                                "{}({}) AS {}",
1513                                sql_fn,
1514                                quote_ident(f),
1515                                quote_ident(&alias),
1516                            ));
1517                            result_fields.push(alias);
1518                        }
1519                    }
1520                }
1521            }
1522
1523            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1524                for field in fields {
1525                    if let Some(f) = field.as_str() {
1526                        validate(f)?;
1527                        let alias = format!("count_distinct_{f}");
1528                        select_parts.push(format!(
1529                            "COUNT(DISTINCT {}) AS {}",
1530                            quote_ident(f),
1531                            quote_ident(&alias),
1532                        ));
1533                        result_fields.push(alias);
1534                    }
1535                }
1536            }
1537
1538            // groupBy: column name or { field, bucket } — same vocabulary as
1539            // the SQLite path. Buckets translate to Postgres `date_trunc`
1540            // (SQLite uses `strftime`); both collapse rows to the bucket
1541            // boundary identically.
1542            let mut group_by: Vec<String> = Vec::new();
1543            let mut group_select: Vec<String> = Vec::new();
1544            let mut group_field_names: Vec<String> = Vec::new();
1545            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1546                for g in groups {
1547                    if let Some(f) = g.as_str() {
1548                        validate(f)?;
1549                        let q = quote_ident(f);
1550                        group_by.push(q.clone());
1551                        group_select.push(q);
1552                        group_field_names.push(f.to_string());
1553                    } else if let Some(spec) = g.as_object() {
1554                        let field =
1555                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1556                                StorageError {
1557                                    code: "INVALID_QUERY".into(),
1558                                    message: "groupBy object spec requires `field`".into(),
1559                                }
1560                            })?;
1561                        validate(field)?;
1562                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1563                        let trunc_unit = match bucket {
1564                            "hour" | "day" | "week" | "month" | "year" => bucket,
1565                            _ => {
1566                                return Err(StorageError {
1567                                    code: "INVALID_QUERY".into(),
1568                                    message: format!(
1569                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1570                                    ),
1571                                });
1572                            }
1573                        };
1574                        let alias = format!("{field}_{bucket}");
1575                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1576                        group_by.push(expr.clone());
1577                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1578                        group_field_names.push(alias);
1579                    }
1580                }
1581            }
1582
1583            let mut full_select = group_select.clone();
1584            full_select.extend(select_parts.iter().cloned());
1585            if full_select.is_empty() {
1586                return Err(StorageError {
1587                    code: "INVALID_QUERY".into(),
1588                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1589                });
1590            }
1591
1592            let mut where_clauses: Vec<String> = Vec::new();
1593            let mut params: Vec<JsonParam> = Vec::new();
1594            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1595                for (k, v) in w {
1596                    validate(k)?;
1597                    let placeholder = format!("${}", params.len() + 1);
1598                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1599                    params.push(value_to_pg(v));
1600                }
1601            }
1602            let where_sql = if where_clauses.is_empty() {
1603                String::new()
1604            } else {
1605                format!(" WHERE {}", where_clauses.join(" AND "))
1606            };
1607            let group_sql = if group_by.is_empty() {
1608                String::new()
1609            } else {
1610                format!(" GROUP BY {}", group_by.join(", "))
1611            };
1612
1613            let sql = format!(
1614                "SELECT {} FROM {}{}{}",
1615                full_select.join(", "),
1616                quote_ident(entity),
1617                where_sql,
1618                group_sql,
1619            );
1620
1621            let column_names: Vec<String> = group_field_names
1622                .iter()
1623                .chain(result_fields.iter())
1624                .cloned()
1625                .collect();
1626
1627            Ok((sql, params, column_names))
1628        }
1629    }
1630
1631    /// Project rows from an aggregate `SELECT` into the
1632    /// `{ rows: [{...}] }` JSON shape both the SQLite path and the
1633    /// PG path return. Pure post-processing — works on rows produced
1634    /// from either `Client::query` or `Transaction::query`.
1635    pub fn aggregate_rows_to_json(
1636        rows: &[postgres::Row],
1637        column_names: &[String],
1638    ) -> serde_json::Value {
1639        let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1640        for row in rows {
1641            let row_json = row_to_json(row);
1642            if let serde_json::Value::Object(map) = &row_json {
1643                let mut filtered = serde_json::Map::new();
1644                for name in column_names {
1645                    if let Some(v) = map.get(name) {
1646                        filtered.insert(name.clone(), v.clone());
1647                    }
1648                }
1649                out.push(serde_json::Value::Object(filtered));
1650            } else {
1651                out.push(row_json);
1652            }
1653        }
1654        serde_json::json!({ "rows": out })
1655    }
1656
1657    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1658    pub enum TxOp<'a> {
1659        Insert {
1660            entity: &'a str,
1661            data: &'a serde_json::Value,
1662        },
1663        Update {
1664            entity: &'a str,
1665            id: &'a str,
1666            data: &'a serde_json::Value,
1667        },
1668        Delete {
1669            entity: &'a str,
1670            id: &'a str,
1671        },
1672    }
1673
1674    /// Result of a single op inside a transaction.
1675    #[derive(Debug, Clone)]
1676    pub enum TxResult {
1677        Inserted(String),
1678        Updated(bool),
1679        Deleted(bool),
1680    }
1681
1682    impl LivePostgresAdapter {
1683        /// Run `ops` inside a single Postgres transaction. Either all of them
1684        /// commit together or none of them do — there is no partial state on
1685        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1686        /// guard is dropped without `commit()` being called.
1687        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1688            let mut tx = self.client.transaction().map_err(pg_err)?;
1689            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1690
1691            for op in ops {
1692                match op {
1693                    TxOp::Insert { entity, data } => {
1694                        let (sql, values) = build_insert_sql(entity, data)?;
1695                        let id = match &values[0] {
1696                            JsonParam::Text(s) => s.clone(),
1697                            _ => {
1698                                return Err(StorageError {
1699                                    code: "PG_INTERNAL".into(),
1700                                    message: "build_insert_sql produced non-text id param".into(),
1701                                });
1702                            }
1703                        };
1704                        let params = as_pg_params(&values);
1705                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1706                        results.push(TxResult::Inserted(id));
1707                    }
1708                    TxOp::Update { entity, id, data } => {
1709                        let (sql, values) = build_update_sql(entity, id, data)?;
1710                        let params = as_pg_params(&values);
1711                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1712                        results.push(TxResult::Updated(n > 0));
1713                    }
1714                    TxOp::Delete { entity, id } => {
1715                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1716                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1717                        results.push(TxResult::Deleted(n > 0));
1718                    }
1719                }
1720            }
1721
1722            tx.commit().map_err(pg_err)?;
1723            Ok(results)
1724        }
1725    }
1726
1727    /// Lift a JSON value into a typed Postgres parameter. The previous
1728    /// implementation collapsed everything to `String`, which silently
1729    /// stringified ints/bools and turned JSON `null` into `""` for
1730    /// nullable columns. Forwarding through `JsonParam` keeps the column
1731    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1732    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1733        JsonParam::from_json(v)
1734    }
1735
1736    pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1737        use postgres::types::Type;
1738        let mut obj = serde_json::Map::new();
1739        for (i, col) in row.columns().iter().enumerate() {
1740            let name = col.name().to_string();
1741
1742            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1743            // and a panic in a query handler poisons the connection mutex,
1744            // taking down all subsequent reads on this datastore. Anything
1745            // that fails to decode becomes Null with a one-shot warning.
1746            //
1747            // Timestamps and the catch-all path explicitly DON'T request
1748            // `String` — the postgres crate uses binary protocol by default
1749            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1750            // ask for `Vec<u8>` and lossy-stringify, which works for all
1751            // text-shaped columns in either protocol.
1752            let value: serde_json::Value = match *col.type_() {
1753                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1754                    .flatten()
1755                    .map(serde_json::Value::Bool)
1756                    .unwrap_or(serde_json::Value::Null),
1757                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1758                    .flatten()
1759                    .map(|v| serde_json::Value::Number(v.into()))
1760                    .unwrap_or(serde_json::Value::Null),
1761                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1762                    .flatten()
1763                    .map(|v| serde_json::Value::Number(v.into()))
1764                    .unwrap_or(serde_json::Value::Null),
1765                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1766                    .flatten()
1767                    .map(|v| serde_json::Value::Number(v.into()))
1768                    .unwrap_or(serde_json::Value::Null),
1769                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1770                    .flatten()
1771                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1772                    .map(serde_json::Value::Number)
1773                    .unwrap_or(serde_json::Value::Null),
1774                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1775                    .flatten()
1776                    .and_then(serde_json::Number::from_f64)
1777                    .map(serde_json::Value::Number)
1778                    .unwrap_or(serde_json::Value::Null),
1779                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1780                    .flatten()
1781                    .unwrap_or(serde_json::Value::Null),
1782                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1783                    .flatten()
1784                    .map(|b| serde_json::Value::String(b64(&b)))
1785                    .unwrap_or(serde_json::Value::Null),
1786                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1787                    try_get_or_null::<Option<String>>(row, i)
1788                        .flatten()
1789                        .map(serde_json::Value::String)
1790                        .unwrap_or(serde_json::Value::Null)
1791                }
1792                Type::TIMESTAMPTZ => {
1793                    // Decode via chrono::DateTime<Utc> (postgres's
1794                    // `with-chrono-0_4` feature provides FromSql) and
1795                    // re-format as ISO 8601 — the shape pylon's clients
1796                    // expect (matches `pylon_kernel::util::now_iso`,
1797                    // so timestamps round-trip with the same surface
1798                    // across SQLite + PG).
1799                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1800                        .flatten()
1801                        .map(|dt| {
1802                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1803                        })
1804                        .unwrap_or(serde_json::Value::Null)
1805                }
1806                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1807                    .flatten()
1808                    .map(|dt| {
1809                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1810                    })
1811                    .unwrap_or(serde_json::Value::Null),
1812                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1813                    .flatten()
1814                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1815                    .unwrap_or(serde_json::Value::Null),
1816                _ => {
1817                    // Last resort: ask Postgres to render anything else as
1818                    // text via a stringifying decode through Vec<u8>. If even
1819                    // that fails (rare — Postgres types not implementing the
1820                    // text format), fall through to Null with a warning.
1821                    match row.try_get::<_, Option<String>>(i) {
1822                        Ok(Some(s)) => serde_json::Value::String(s),
1823                        Ok(None) => serde_json::Value::Null,
1824                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1825                            Ok(Some(bytes)) => serde_json::Value::String(
1826                                String::from_utf8_lossy(&bytes).into_owned(),
1827                            ),
1828                            _ => serde_json::Value::Null,
1829                        },
1830                    }
1831                }
1832            };
1833            obj.insert(name, value);
1834        }
1835        serde_json::Value::Object(obj)
1836    }
1837
1838    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1839    where
1840        T: postgres::types::FromSql<'a>,
1841    {
1842        match row.try_get::<_, T>(i) {
1843            Ok(v) => Some(v),
1844            Err(e) => {
1845                tracing::warn!(
1846                    "[postgres] decode failed for column {} ({}): {e}",
1847                    i,
1848                    row.columns()[i].name()
1849                );
1850                None
1851            }
1852        }
1853    }
1854
1855    /// Minimal base64 encoder so we don't need another dependency just for
1856    /// the BYTEA column edge case.
1857    fn b64(bytes: &[u8]) -> String {
1858        const TABLE: &[u8; 64] =
1859            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1860        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1861        let chunks = bytes.chunks(3);
1862        for chunk in chunks {
1863            let b = [
1864                chunk.first().copied().unwrap_or(0),
1865                chunk.get(1).copied().unwrap_or(0),
1866                chunk.get(2).copied().unwrap_or(0),
1867            ];
1868            out.push(TABLE[(b[0] >> 2) as usize] as char);
1869            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1870            if chunk.len() > 1 {
1871                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1872            } else {
1873                out.push('=');
1874            }
1875            if chunk.len() > 2 {
1876                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1877            } else {
1878                out.push('=');
1879            }
1880        }
1881        out
1882    }
1883
1884    fn pg_err(e: postgres::Error) -> StorageError {
1885        // postgres::Error's Display is intentionally short ("db error",
1886        // "connection error" etc.) — the actual SQLSTATE / detail lives
1887        // on the source chain. Walk the chain so the final message has
1888        // enough signal to debug a failed insert/update without
1889        // attaching a debugger.
1890        use std::error::Error;
1891        let mut detail = format!("{e}");
1892        let mut src: Option<&dyn Error> = e.source();
1893        while let Some(s) = src {
1894            detail.push_str(": ");
1895            detail.push_str(&format!("{s}"));
1896            src = s.source();
1897        }
1898        StorageError {
1899            code: "PG_QUERY_FAILED".into(),
1900            message: format!("Postgres query failed: {detail}"),
1901        }
1902    }
1903}
1904
1905// ---------------------------------------------------------------------------
1906// Tests
1907// ---------------------------------------------------------------------------
1908
1909#[cfg(test)]
1910mod tests {
1911    use super::*;
1912
1913    /// Hand-rolled fixture that matches the snapshots in the tests
1914    /// below. Decoupled from any example's `pylon.manifest.json` so
1915    /// changing an example schema doesn't bleed into adapter tests.
1916    fn test_manifest() -> AppManifest {
1917        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1918        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1919            name: name.into(),
1920            field_type: ty.into(),
1921            optional: opt,
1922            unique: uniq,
1923            crdt: None,
1924        };
1925        AppManifest {
1926            manifest_version: 1,
1927            name: "test".into(),
1928            version: "0.0.0".into(),
1929            entities: vec![
1930                ManifestEntity {
1931                    name: "User".into(),
1932                    fields: vec![
1933                        f("email", "string", false, true),
1934                        f("displayName", "string", false, false),
1935                        f("createdAt", "datetime", false, false),
1936                    ],
1937                    indexes: vec![],
1938                    relations: vec![],
1939                    search: None,
1940                    crdt: true,
1941                },
1942                ManifestEntity {
1943                    name: "Todo".into(),
1944                    fields: vec![
1945                        f("title", "string", false, false),
1946                        f("done", "bool", false, false),
1947                        f("userId", "id(User)", false, false),
1948                        f("createdAt", "datetime", false, false),
1949                    ],
1950                    indexes: vec![ManifestIndex {
1951                        name: "by_user".into(),
1952                        fields: vec!["userId".into()],
1953                        unique: false,
1954                    }],
1955                    relations: vec![],
1956                    search: None,
1957                    crdt: true,
1958                },
1959            ],
1960            queries: vec![],
1961            actions: vec![],
1962            policies: vec![],
1963            routes: vec![],
1964            auth: Default::default(),
1965        }
1966    }
1967
1968    #[test]
1969    fn pg_type_mapping() {
1970        assert_eq!(pg_column_type("string"), "TEXT");
1971        assert_eq!(pg_column_type("int"), "INTEGER");
1972        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1973        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1974        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1975        assert_eq!(pg_column_type("richtext"), "TEXT");
1976        assert_eq!(pg_column_type("id(User)"), "TEXT");
1977    }
1978
1979    #[test]
1980    fn quote_ident_simple() {
1981        assert_eq!(quote_ident("User"), "\"User\"");
1982        assert_eq!(quote_ident("email"), "\"email\"");
1983    }
1984
1985    #[test]
1986    fn quote_ident_escapes_embedded_double_quotes() {
1987        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1988        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1989    }
1990
1991    #[test]
1992    fn create_table_sql_basic() {
1993        let fields = vec![
1994            FieldSpec {
1995                name: "email".into(),
1996                field_type: "string".into(),
1997                optional: false,
1998                unique: true,
1999            },
2000            FieldSpec {
2001                name: "age".into(),
2002                field_type: "int".into(),
2003                optional: true,
2004                unique: false,
2005            },
2006        ];
2007        let sql = create_table_sql("User", &fields);
2008        assert_eq!(
2009            sql,
2010            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
2011        );
2012    }
2013
2014    #[test]
2015    fn create_table_sql_escapes_identifiers() {
2016        let fields = vec![FieldSpec {
2017            name: "col\"x".into(),
2018            field_type: "string".into(),
2019            optional: false,
2020            unique: false,
2021        }];
2022        let sql = create_table_sql("my\"table", &fields);
2023        assert!(sql.contains("\"my\"\"table\""));
2024        assert!(sql.contains("\"col\"\"x\""));
2025    }
2026
2027    #[test]
2028    fn create_index_sql_unique() {
2029        let sql = create_index_sql("User", "by_email", &["email".into()], true);
2030        assert_eq!(
2031            sql,
2032            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2033        );
2034    }
2035
2036    #[test]
2037    fn create_index_sql_non_unique() {
2038        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2039        assert_eq!(
2040            sql,
2041            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2042        );
2043    }
2044
2045    #[test]
2046    fn add_column_sql_basic() {
2047        let field = FieldSpec {
2048            name: "bio".into(),
2049            field_type: "string".into(),
2050            optional: true,
2051            unique: false,
2052        };
2053        let sql = add_column_sql("User", &field);
2054        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2055    }
2056
2057    #[test]
2058    fn plan_from_manifest() {
2059        let adapter = PostgresAdapter;
2060        let manifest = test_manifest();
2061        let plan = adapter.plan_schema(&manifest).unwrap();
2062
2063        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
2064        assert!(plan.operations.iter().any(|op| matches!(
2065            op,
2066            SchemaOperation::CreateEntity { name, .. } if name == "User"
2067        )));
2068        assert!(plan.operations.iter().any(|op| matches!(
2069            op,
2070            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2071        )));
2072        assert!(plan.operations.iter().any(|op| matches!(
2073            op,
2074            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2075        )));
2076    }
2077
2078    #[test]
2079    fn plan_to_sql_produces_statements() {
2080        let adapter = PostgresAdapter;
2081        let manifest = test_manifest();
2082        let plan = adapter.plan_schema(&manifest).unwrap();
2083        let stmts = plan_to_sql(&plan).unwrap();
2084
2085        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
2086        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
2087        // declares a unique by_email index on User which lands as part of
2088        // the table. Final count: 2 tables + 2 indexes.
2089        let create_tables = stmts
2090            .iter()
2091            .filter(|s| s.starts_with("CREATE TABLE"))
2092            .count();
2093        let create_indexes = stmts
2094            .iter()
2095            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2096            .count();
2097        assert_eq!(create_tables, 2);
2098        assert!(create_indexes >= 1);
2099        assert!(stmts[0].starts_with("CREATE TABLE"));
2100        assert!(stmts[1].starts_with("CREATE TABLE"));
2101    }
2102
2103    #[test]
2104    fn plan_to_sql_rejects_unsupported() {
2105        let plan = SchemaPlan {
2106            operations: vec![SchemaOperation::RemoveEntity {
2107                name: "User".into(),
2108            }],
2109        };
2110        let result = plan_to_sql(&plan);
2111        assert!(result.is_err());
2112        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2113    }
2114
2115    #[test]
2116    fn apply_not_implemented() {
2117        let adapter = PostgresAdapter;
2118        let plan = SchemaPlan {
2119            operations: vec![SchemaOperation::Noop],
2120        };
2121        let result = adapter.apply_schema(&plan);
2122        assert!(result.is_err());
2123        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2124    }
2125
2126    #[test]
2127    fn sql_uses_quoted_identifiers() {
2128        let fields = vec![FieldSpec {
2129            name: "createdAt".into(),
2130            field_type: "datetime".into(),
2131            optional: false,
2132            unique: false,
2133        }];
2134        let sql = create_table_sql("User", &fields);
2135        // Postgres identifiers should be quoted for case-sensitivity.
2136        assert!(sql.contains("\"User\""));
2137        assert!(sql.contains("\"createdAt\""));
2138        assert!(sql.contains("TIMESTAMPTZ"));
2139    }
2140
2141    // -- Introspection SQL tests --
2142
2143    #[test]
2144    fn introspect_sql_constants_are_valid() {
2145        // Sanity checks that the SQL strings exist and look reasonable.
2146        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2147        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2148        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2149        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2150    }
2151
2152    // -- Plan from snapshot tests --
2153
2154    #[test]
2155    fn plan_from_empty_snapshot_creates_all() {
2156        let snapshot = crate::SchemaSnapshot { tables: vec![] };
2157        let manifest = test_manifest();
2158        let plan = plan_from_snapshot(&snapshot, &manifest);
2159
2160        assert!(plan.operations.iter().any(|op| matches!(
2161            op,
2162            SchemaOperation::CreateEntity { name, .. } if name == "User"
2163        )));
2164        assert!(plan.operations.iter().any(|op| matches!(
2165            op,
2166            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2167        )));
2168        assert!(plan.operations.iter().any(|op| matches!(
2169            op,
2170            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2171        )));
2172    }
2173
2174    #[test]
2175    fn plan_from_full_snapshot_is_noop() {
2176        let snapshot = crate::SchemaSnapshot {
2177            tables: vec![
2178                crate::TableSnapshot {
2179                    name: "User".into(),
2180                    columns: vec![
2181                        crate::ColumnSnapshot {
2182                            name: "id".into(),
2183                            column_type: "TEXT".into(),
2184                            notnull: true,
2185                            primary_key: true,
2186                        },
2187                        crate::ColumnSnapshot {
2188                            name: "email".into(),
2189                            column_type: "TEXT".into(),
2190                            notnull: true,
2191                            primary_key: false,
2192                        },
2193                        crate::ColumnSnapshot {
2194                            name: "displayName".into(),
2195                            column_type: "TEXT".into(),
2196                            notnull: true,
2197                            primary_key: false,
2198                        },
2199                        crate::ColumnSnapshot {
2200                            name: "createdAt".into(),
2201                            column_type: "TIMESTAMPTZ".into(),
2202                            notnull: true,
2203                            primary_key: false,
2204                        },
2205                    ],
2206                    indexes: vec![],
2207                },
2208                crate::TableSnapshot {
2209                    name: "Todo".into(),
2210                    columns: vec![
2211                        crate::ColumnSnapshot {
2212                            name: "id".into(),
2213                            column_type: "TEXT".into(),
2214                            notnull: true,
2215                            primary_key: true,
2216                        },
2217                        crate::ColumnSnapshot {
2218                            name: "title".into(),
2219                            column_type: "TEXT".into(),
2220                            notnull: true,
2221                            primary_key: false,
2222                        },
2223                        crate::ColumnSnapshot {
2224                            name: "done".into(),
2225                            column_type: "BOOLEAN".into(),
2226                            notnull: true,
2227                            primary_key: false,
2228                        },
2229                        crate::ColumnSnapshot {
2230                            name: "userId".into(),
2231                            column_type: "TEXT".into(),
2232                            notnull: true,
2233                            primary_key: false,
2234                        },
2235                        crate::ColumnSnapshot {
2236                            name: "createdAt".into(),
2237                            column_type: "TIMESTAMPTZ".into(),
2238                            notnull: true,
2239                            primary_key: false,
2240                        },
2241                    ],
2242                    indexes: vec![crate::IndexSnapshot {
2243                        name: "Todo_by_user".into(),
2244                        columns: vec!["userId".into()],
2245                        unique: false,
2246                    }],
2247                },
2248            ],
2249        };
2250        let manifest = test_manifest();
2251        let plan = plan_from_snapshot(&snapshot, &manifest);
2252        assert!(plan.is_empty());
2253    }
2254
2255    #[test]
2256    fn plan_detects_missing_column_in_snapshot() {
2257        let snapshot = crate::SchemaSnapshot {
2258            tables: vec![
2259                crate::TableSnapshot {
2260                    name: "User".into(),
2261                    columns: vec![
2262                        crate::ColumnSnapshot {
2263                            name: "id".into(),
2264                            column_type: "TEXT".into(),
2265                            notnull: true,
2266                            primary_key: true,
2267                        },
2268                        crate::ColumnSnapshot {
2269                            name: "email".into(),
2270                            column_type: "TEXT".into(),
2271                            notnull: true,
2272                            primary_key: false,
2273                        },
2274                        // missing displayName and createdAt
2275                    ],
2276                    indexes: vec![],
2277                },
2278                crate::TableSnapshot {
2279                    name: "Todo".into(),
2280                    columns: vec![
2281                        crate::ColumnSnapshot {
2282                            name: "id".into(),
2283                            column_type: "TEXT".into(),
2284                            notnull: true,
2285                            primary_key: true,
2286                        },
2287                        crate::ColumnSnapshot {
2288                            name: "title".into(),
2289                            column_type: "TEXT".into(),
2290                            notnull: true,
2291                            primary_key: false,
2292                        },
2293                        crate::ColumnSnapshot {
2294                            name: "done".into(),
2295                            column_type: "BOOLEAN".into(),
2296                            notnull: true,
2297                            primary_key: false,
2298                        },
2299                        crate::ColumnSnapshot {
2300                            name: "userId".into(),
2301                            column_type: "TEXT".into(),
2302                            notnull: true,
2303                            primary_key: false,
2304                        },
2305                        crate::ColumnSnapshot {
2306                            name: "createdAt".into(),
2307                            column_type: "TIMESTAMPTZ".into(),
2308                            notnull: true,
2309                            primary_key: false,
2310                        },
2311                    ],
2312                    indexes: vec![crate::IndexSnapshot {
2313                        name: "Todo_by_user".into(),
2314                        columns: vec!["userId".into()],
2315                        unique: false,
2316                    }],
2317                },
2318            ],
2319        };
2320        let manifest = test_manifest();
2321        let plan = plan_from_snapshot(&snapshot, &manifest);
2322
2323        let add_fields: Vec<_> = plan
2324            .operations
2325            .iter()
2326            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2327            .collect();
2328        assert_eq!(add_fields.len(), 2); // displayName + createdAt
2329    }
2330
2331    // -- CRUD helper tests (no live database required) --
2332
2333    #[test]
2334    fn json_value_to_string_handles_all_types() {
2335        assert_eq!(
2336            json_value_to_string(&serde_json::Value::String("hello".into())),
2337            "hello"
2338        );
2339        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2340        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2341        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2342        assert_eq!(
2343            json_value_to_string(&serde_json::Value::Bool(false)),
2344            "false"
2345        );
2346        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2347        // Arrays and objects get their JSON representation.
2348        assert_eq!(
2349            json_value_to_string(&serde_json::json!([1, 2, 3])),
2350            "[1,2,3]"
2351        );
2352        assert_eq!(
2353            json_value_to_string(&serde_json::json!({"a": 1})),
2354            "{\"a\":1}"
2355        );
2356    }
2357
2358    #[test]
2359    fn generate_id_returns_hex_string() {
2360        let id = generate_id();
2361        assert!(!id.is_empty());
2362        // Must be valid hex characters.
2363        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2364    }
2365
2366    #[test]
2367    fn generate_id_is_unique_across_calls() {
2368        let id1 = generate_id();
2369        let id2 = generate_id();
2370        assert_ne!(id1, id2);
2371    }
2372
2373    #[test]
2374    fn generate_id_is_lex_sortable() {
2375        // 1000 IDs back-to-back must come out in monotonically increasing
2376        // lexicographic order. This is what makes cursor pagination correct.
2377        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2378        let sorted = {
2379            let mut s = ids.clone();
2380            s.sort();
2381            s
2382        };
2383        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2384        // And every id must be the same width (otherwise lex comparison is
2385        // wrong at width boundaries).
2386        let len0 = ids[0].len();
2387        assert!(ids.iter().all(|id| id.len() == len0));
2388        ids.dedup();
2389        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2390    }
2391
2392    #[test]
2393    fn build_insert_sql_simple() {
2394        let data = serde_json::json!({
2395            "email": "alice@example.com",
2396            "displayName": "Alice"
2397        });
2398        let (sql, values) = build_insert_sql("User", &data).unwrap();
2399
2400        assert!(sql.starts_with("INSERT INTO \"User\""));
2401        assert!(sql.contains("id"));
2402        assert!(sql.contains("$1"));
2403        assert!(sql.contains("$2"));
2404        assert!(sql.contains("$3"));
2405        // First value is the generated ID — JsonParam::Text variant.
2406        match &values[0] {
2407            JsonParam::Text(s) => assert!(!s.is_empty()),
2408            other => panic!("expected Text id param, got {other:?}"),
2409        }
2410        assert_eq!(values.len(), 3); // id + 2 fields
2411    }
2412
2413    #[test]
2414    fn build_insert_sql_preserves_json_types() {
2415        let data = serde_json::json!({
2416            "n": 42,
2417            "f": 1.5,
2418            "b": true,
2419            "s": "hi",
2420            "z": null,
2421        });
2422        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2423        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2424        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2425        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2426        assert!(matches!(kinds[1], JsonParam::Float(_)));
2427        assert!(matches!(kinds[2], JsonParam::Int(42)));
2428        assert!(matches!(kinds[3], JsonParam::Text(_)));
2429        assert!(matches!(kinds[4], JsonParam::Null));
2430    }
2431
2432    #[test]
2433    fn build_insert_sql_quotes_column_names() {
2434        let data = serde_json::json!({"createdAt": "2026-01-01"});
2435        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2436        assert!(sql.contains("\"createdAt\""));
2437        assert!(sql.contains("\"Todo\""));
2438    }
2439
2440    #[test]
2441    fn build_insert_sql_rejects_non_object() {
2442        let data = serde_json::json!("not an object");
2443        let result = build_insert_sql("User", &data);
2444        assert!(result.is_err());
2445        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2446    }
2447
2448    #[test]
2449    fn build_update_sql_simple() {
2450        let data = serde_json::json!({
2451            "displayName": "Bob",
2452            "email": "bob@example.com"
2453        });
2454        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2455
2456        assert!(sql.starts_with("UPDATE \"User\" SET"));
2457        assert!(sql.contains("WHERE id = $1"));
2458        assert!(sql.contains("$2"));
2459        assert!(sql.contains("$3"));
2460        match &values[0] {
2461            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2462            other => panic!("expected Text id param, got {other:?}"),
2463        }
2464        assert_eq!(values.len(), 3); // id + 2 fields
2465    }
2466
2467    #[test]
2468    fn build_update_sql_quotes_column_names() {
2469        let data = serde_json::json!({"displayName": "Carol"});
2470        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2471        assert!(sql.contains("\"displayName\" = $2"));
2472    }
2473
2474    #[test]
2475    fn build_update_sql_rejects_non_object() {
2476        let data = serde_json::json!(42);
2477        let result = build_update_sql("User", "id1", &data);
2478        assert!(result.is_err());
2479        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2480    }
2481
2482    #[test]
2483    fn build_update_sql_rejects_empty_object() {
2484        let data = serde_json::json!({});
2485        let err = build_update_sql("User", "id1", &data).unwrap_err();
2486        assert_eq!(err.code, "PG_INVALID_DATA");
2487        assert!(err.message.contains("at least one field"));
2488    }
2489}