Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36pub(crate) fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40/// Public re-export for sibling modules (e.g. `pg_tx_store`) that need
41/// to build SQL strings against the same dialect rules. Keeping the
42/// crate-private function untouched preserves the existing call sites
43/// inside this module.
44#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46    quote_ident(name)
47}
48
49/// Public re-export of [`live::row_to_json`] for sibling modules. Used
50/// by `pg_tx_store` so transactional reads return rows in the same
51/// JSON shape as the non-transactional path.
52#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54    live::row_to_json(row)
55}
56
57/// Public re-export of the filter builder so the in-tx PgTxStore can
58/// reuse the same operator surface ($eq, $like, $in, $order, $limit,
59/// $offset, $not, $gt, $gte, $lt, $lte) as the non-tx path. Returns
60/// `(sql, params)` ready to feed to either `client.query` or
61/// `transaction.query`.
62#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64    entity: &str,
65    filter: &serde_json::Value,
66    valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68    live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71/// Public re-export of the aggregate builder. Returns
72/// `(sql, params, column_names)` — the column_names list drives the
73/// per-row projection in `aggregate_rows_to_json_pub`.
74#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76    entity: &str,
77    spec: &serde_json::Value,
78    valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80    live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83/// Public re-export of the post-processing helper that converts raw
84/// aggregate `Row`s into the `{ rows: [{...}] }` JSON shape.
85#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87    rows: &[postgres::Row],
88    column_names: &[String],
89) -> serde_json::Value {
90    live::aggregate_rows_to_json(rows, column_names)
91}
92
93// ---------------------------------------------------------------------------
94// SQL generation
95// ---------------------------------------------------------------------------
96
97/// Generate a Postgres CREATE TABLE statement.
98pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101    for field in fields {
102        let col_type = pg_column_type(&field.field_type);
103        let not_null = if field.optional { "" } else { " NOT NULL" };
104        let unique = if field.unique { " UNIQUE" } else { "" };
105        columns.push(format!(
106            "{} {}{}{}",
107            quote_ident(&field.name),
108            col_type,
109            not_null,
110            unique
111        ));
112    }
113
114    format!(
115        "CREATE TABLE IF NOT EXISTS {} ({})",
116        quote_ident(entity_name),
117        columns.join(", ")
118    )
119}
120
121/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
122/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
123/// Required-ness is tracked in the manifest; enforcement deferred.
124pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125    let col_type = pg_column_type(&field.field_type);
126    let unique = if field.unique { " UNIQUE" } else { "" };
127    format!(
128        "ALTER TABLE {} ADD COLUMN {} {}{}",
129        quote_ident(entity_name),
130        quote_ident(&field.name),
131        col_type,
132        unique
133    )
134}
135
136/// Generate a Postgres CREATE INDEX statement.
137pub fn create_index_sql(
138    entity_name: &str,
139    index_name: &str,
140    fields: &[String],
141    unique: bool,
142) -> String {
143    let unique_str = if unique { "UNIQUE " } else { "" };
144    let full_index_name = format!("{}_{}", entity_name, index_name);
145    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146    format!(
147        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148        unique_str,
149        quote_ident(&full_index_name),
150        quote_ident(entity_name),
151        quoted_fields.join(", ")
152    )
153}
154
155// ---------------------------------------------------------------------------
156// PostgresAdapter — planning-only adapter
157// ---------------------------------------------------------------------------
158
159/// A Postgres storage adapter. Currently supports planning only.
160/// No live connection — SQL generation and planning from manifest.
161pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165        // Plan from empty baseline.
166        let mut operations = Vec::new();
167
168        for entity in &target.entities {
169            let fields: Vec<FieldSpec> = entity
170                .fields
171                .iter()
172                .map(|f| FieldSpec {
173                    name: f.name.clone(),
174                    field_type: f.field_type.clone(),
175                    optional: f.optional,
176                    unique: f.unique,
177                })
178                .collect();
179
180            operations.push(SchemaOperation::CreateEntity {
181                name: entity.name.clone(),
182                fields,
183            });
184
185            for index in &entity.indexes {
186                operations.push(SchemaOperation::AddIndex {
187                    entity: entity.name.clone(),
188                    name: index.name.clone(),
189                    fields: index.fields.clone(),
190                    unique: index.unique,
191                });
192            }
193        }
194
195        if operations.is_empty() {
196            operations.push(SchemaOperation::Noop);
197        }
198
199        Ok(SchemaPlan { operations })
200    }
201
202    // apply_schema intentionally not implemented — uses default trait error.
203}
204
205/// Generate all SQL statements for a plan, in order.
206/// Useful for dry-run preview of what Postgres DDL would be executed.
207pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208    let mut statements = Vec::new();
209
210    for op in &plan.operations {
211        match op {
212            SchemaOperation::CreateEntity { name, fields } => {
213                statements.push(create_table_sql(name, fields));
214            }
215            SchemaOperation::AddField { entity, field } => {
216                statements.push(add_column_sql(entity, field));
217            }
218            SchemaOperation::AlterField {
219                entity,
220                previous,
221                target,
222            } => {
223                // Only nullable transitions today. SET / DROP NOT NULL is
224                // safe on a populated table when going from required →
225                // optional (existing rows already satisfy NOT NULL); the
226                // reverse direction (optional → required) succeeds only
227                // if every row has a non-null value, which the planner
228                // has no way to know — Postgres will fail the migration
229                // if it can't and the operator gets a clear error from
230                // the apply step.
231                if previous.optional && !target.optional {
232                    statements.push(format!(
233                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234                        quote_ident(entity),
235                        quote_ident(&target.name)
236                    ));
237                } else if !previous.optional && target.optional {
238                    statements.push(format!(
239                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240                        quote_ident(entity),
241                        quote_ident(&target.name)
242                    ));
243                }
244                // Nothing emitted when neither nullable nor type changed
245                // — falls through silently. AlterField with no actual
246                // shape change shouldn't happen in practice (the planner
247                // only emits it on real drift), but guard against
248                // emitting empty SQL just in case.
249            }
250            SchemaOperation::AddIndex {
251                entity,
252                name,
253                fields,
254                unique,
255            } => {
256                statements.push(create_index_sql(entity, name, fields, *unique));
257            }
258            SchemaOperation::CreateSearchIndex { entity, config } => {
259                #[cfg(feature = "postgres-live")]
260                {
261                    statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262                }
263                #[cfg(not(feature = "postgres-live"))]
264                {
265                    let _ = (entity, config);
266                    return Err(StorageError {
267                        code: "PG_SEARCH_FEATURE_OFF".into(),
268                        message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269                    });
270                }
271            }
272            SchemaOperation::RemoveSearchIndex { entity } => {
273                // Without the original config we don't know which
274                // facet/sort indexes were created. Drop the FTS table
275                // and the GIN index by their fixed names; per-field
276                // facet/sort indexes are dropped automatically by the
277                // entity DROP path. Operators removing search via
278                // schema diff will see leftover indexes only if the
279                // entity table itself still exists — at which point
280                // the next CREATE INDEX IF NOT EXISTS catches up.
281                //
282                // quote_ident on the synthetic table/index names so a
283                // malicious entity name with embedded `"` can't break
284                // out of the identifier.
285                statements.push(format!(
286                    "DROP TABLE IF EXISTS {} CASCADE",
287                    quote_ident(&format!("_fts_{entity}"))
288                ));
289                statements.push(format!(
290                    "DROP INDEX IF EXISTS {}",
291                    quote_ident(&format!("{entity}_fts_gin"))
292                ));
293            }
294            SchemaOperation::Noop => {}
295            other => {
296                return Err(StorageError {
297                    code: "PG_OP_UNSUPPORTED".into(),
298                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
299                });
300            }
301        }
302    }
303
304    Ok(statements)
305}
306
307// ---------------------------------------------------------------------------
308// Introspection SQL helpers
309//
310// These generate the SQL queries that a live Postgres connection would run
311// to read the current schema. No connection required — just SQL strings.
312// ---------------------------------------------------------------------------
313
314/// SQL to list user tables in the public schema.
315pub const INTROSPECT_TABLES_SQL: &str = "\
316    SELECT table_name \
317    FROM information_schema.tables \
318    WHERE table_schema = 'public' \
319      AND table_type = 'BASE TABLE' \
320      AND table_name NOT LIKE '_pylon_%' \
321    ORDER BY table_name";
322
323/// SQL to list columns for a given table.
324/// Use with parameter: table_name.
325pub const INTROSPECT_COLUMNS_SQL: &str = "\
326    SELECT column_name, data_type, is_nullable, \
327           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328            JOIN information_schema.key_column_usage kcu \
329              ON tc.constraint_name = kcu.constraint_name \
330            WHERE tc.table_name = c.table_name \
331              AND kcu.column_name = c.column_name \
332              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333    FROM information_schema.columns c \
334    WHERE table_schema = 'public' AND table_name = $1 \
335    ORDER BY ordinal_position";
336
337/// SQL to list indexes for a given table.
338/// Use with parameter: table_name.
339pub const INTROSPECT_INDEXES_SQL: &str = "\
340    SELECT i.relname as index_name, \
341           ix.indisunique as is_unique, \
342           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343    FROM pg_index ix \
344    JOIN pg_class t ON t.oid = ix.indrelid \
345    JOIN pg_class i ON i.oid = ix.indexrelid \
346    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347    JOIN pg_namespace n ON n.oid = t.relnamespace \
348    WHERE n.nspname = 'public' \
349      AND t.relname = $1 \
350      AND NOT ix.indisprimary \
351    GROUP BY i.relname, ix.indisunique \
352    ORDER BY i.relname";
353
354/// Plan from a snapshot (reuses the shared plan_from_snapshot).
355/// This allows Postgres to plan incrementally once introspection data is available.
356pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357    crate::plan_from_snapshot(snapshot, target)
358}
359
360// ---------------------------------------------------------------------------
361// CRUD SQL generation helpers (used by live adapter, testable without a DB)
362// ---------------------------------------------------------------------------
363
364/// Generate a lex-sortable, monotonic-ish unique ID.
365///
366/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
367/// of a per-process atomic counter. The counter prevents collisions when two
368/// inserts hit the same nanosecond and — critically — keeps order stable: an
369/// id minted at the same nanosecond is monotonically greater than the
370/// previous one. Width is fixed at 40 chars so lexicographic comparison
371/// matches creation order, which is what cursor pagination relies on.
372pub fn generate_id() -> String {
373    use std::sync::atomic::{AtomicU32, Ordering};
374    use std::time::{SystemTime, UNIX_EPOCH};
375    static COUNTER: AtomicU32 = AtomicU32::new(0);
376    let ts = SystemTime::now()
377        .duration_since(UNIX_EPOCH)
378        .unwrap_or_default()
379        .as_nanos();
380    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381    format!("{ts:032x}{seq:08x}")
382}
383
384/// Convert a JSON value to its string representation for use as a SQL parameter.
385///
386/// Kept for back-compat with callers that need a textual fallback (e.g.
387/// human-readable logs). New code should bind through [`JsonParam`] so
388/// integers/booleans/nulls reach Postgres in their typed form instead of
389/// collapsing to TEXT, which the driver can't coerce into INTEGER /
390/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
391/// into an empty string for FK columns.
392pub fn json_value_to_string(val: &serde_json::Value) -> String {
393    match val {
394        serde_json::Value::String(s) => s.clone(),
395        serde_json::Value::Number(n) => n.to_string(),
396        serde_json::Value::Bool(b) => b.to_string(),
397        serde_json::Value::Null => String::new(),
398        other => other.to_string(),
399    }
400}
401
402/// A typed wrapper around a JSON scalar that implements
403/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
404///
405/// The previous implementation passed every value as `String`, which broke
406/// non-text columns (the postgres driver can't bind a string literal into
407/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
408/// `null` into the empty string for nullable FKs (so `unlink` left
409/// dangling `""` references instead of NULL). `JsonParam` carries the
410/// JSON variant tag through to `to_sql` so the driver can pick the
411/// correct binary representation per column type.
412///
413/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
414/// runtime layer doesn't currently model array/object columns at the
415/// manifest level on Postgres, so anything that lands here came from
416/// caller-supplied prose that's expected to fit into a TEXT column.
417#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419    Null,
420    Text(String),
421    Int(i64),
422    Float(f64),
423    Bool(bool),
424}
425
426impl JsonParam {
427    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
428    /// that fit `i64` go through as Int; everything else goes through as
429    /// Float to preserve fractional / large-magnitude values.
430    pub fn from_json(val: &serde_json::Value) -> Self {
431        match val {
432            serde_json::Value::Null => JsonParam::Null,
433            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435            serde_json::Value::Number(n) => {
436                if let Some(i) = n.as_i64() {
437                    JsonParam::Int(i)
438                } else if let Some(f) = n.as_f64() {
439                    JsonParam::Float(f)
440                } else {
441                    JsonParam::Text(n.to_string())
442                }
443            }
444            other => JsonParam::Text(other.to_string()),
445        }
446    }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451    fn to_sql(
452        &self,
453        ty: &postgres::types::Type,
454        out: &mut bytes::BytesMut,
455    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456        use postgres::types::Type;
457
458        // Null binds as SQL NULL regardless of the column's declared
459        // type — the postgres driver treats `IsNull::Yes` as a null
460        // value of the requested type.
461        if matches!(self, JsonParam::Null) {
462            return Ok(postgres::types::IsNull::Yes);
463        }
464
465        // Match each JsonParam variant against the COLUMN's declared
466        // type so the binary encoding actually fits the target slot.
467        // Postgres rejects "binary data of wrong size" if you bind an
468        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
469        // exactly what the previous "everything is a String" path did
470        // on every non-TEXT column.
471        match (self, ty) {
472            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485            (JsonParam::Text(s), &Type::TEXT)
486            | (JsonParam::Text(s), &Type::VARCHAR)
487            | (JsonParam::Text(s), &Type::BPCHAR)
488            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490                // The runtime models datetimes as ISO 8601 strings
491                // (`pylon_kernel::util::now_iso` shape, plus
492                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
493                // binary wire format is `i64` microseconds since
494                // 2000-01-01 UTC — NOT the bytes of an ISO string.
495                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
496                // which advertised TIMESTAMPTZ format but wrote raw
497                // ASCII; Postgres rejected with "incorrect binary
498                // data format in bind parameter N". This was the
499                // OAuth-callback failure mode on pylon-cloud
500                // (User.createdAt). Parse via chrono and let the
501                // postgres crate's `with-chrono-0_4` ToSql impl
502                // emit the proper binary format.
503                let dt = chrono::DateTime::parse_from_rfc3339(s)
504                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505                    .with_timezone(&chrono::Utc);
506                dt.to_sql(ty, out)
507            }
508            (JsonParam::Text(s), &Type::TIMESTAMP) => {
509                // TIMESTAMP (no timezone) — same conversion shape but
510                // bind as NaiveDateTime so chrono picks the right
511                // binary encoding for the column.
512                let dt = chrono::DateTime::parse_from_rfc3339(s)
513                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514                    .with_timezone(&chrono::Utc)
515                    .naive_utc();
516                dt.to_sql(ty, out)
517            }
518            (JsonParam::Text(s), &Type::DATE) => {
519                let dt = chrono::DateTime::parse_from_rfc3339(s)
520                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521                    .with_timezone(&chrono::Utc)
522                    .date_naive();
523                dt.to_sql(ty, out)
524            }
525
526            // Cross-type fallback: render as text and bind into a TEXT
527            // slot, OR error if the target column doesn't accept text.
528            // Catches "manifest says INT but caller sent a stringified
529            // number" — better to fail loudly than silently coerce.
530            (other, _) => {
531                let s = match other {
532                    JsonParam::Bool(b) => b.to_string(),
533                    JsonParam::Int(n) => n.to_string(),
534                    JsonParam::Float(f) => f.to_string(),
535                    JsonParam::Text(s) => s.clone(),
536                    JsonParam::Null => unreachable!(),
537                };
538                s.to_sql(ty, out)
539            }
540        }
541    }
542
543    fn accepts(_ty: &postgres::types::Type) -> bool {
544        // Defer per-variant acceptance to to_sql_checked, which dispatches
545        // to the inner type's ToSql impl. Returning `true` here matches
546        // the postgres crate's recommended pattern for sum-type wrappers.
547        true
548    }
549
550    postgres::types::to_sql_checked!();
551}
552
553/// Build an INSERT SQL statement and collect typed parameter values.
554/// Returns `(sql, params)` where `params[0]` is the generated ID
555/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
556/// value so the postgres driver can bind them to typed columns
557/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
558/// the database as SQL NULL — the previous string-collapsing path stored
559/// `""` for nullable FKs and broke any non-text column.
560pub fn build_insert_sql(
561    entity: &str,
562    data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564    let obj = data.as_object().ok_or_else(|| StorageError {
565        code: "PG_INVALID_DATA".into(),
566        message: "Insert data must be a JSON object".into(),
567    })?;
568
569    // Honor caller-supplied `id` (used by the CRDT path that needs to
570    // share the row id with the LoroDoc snapshot key it just wrote).
571    // Fall back to a fresh ULID-shaped id when absent. A non-string
572    // `id` value is rejected explicitly — silently regenerating would
573    // mask schema bugs (e.g. a caller passing an int id) and let the
574    // CRDT snapshot key drift from the materialized row id.
575    let id = match obj.get("id") {
576        None | Some(serde_json::Value::Null) => generate_id(),
577        Some(serde_json::Value::String(s)) => s.clone(),
578        Some(other) => {
579            return Err(StorageError {
580                code: "PG_INVALID_ID".into(),
581                message: format!(
582                    "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583                     are always strings (40-char hex). Drop the `id` field to let the \
584                     server generate one, or supply a string."
585                ),
586            });
587        }
588    };
589
590    let mut col_names = vec!["id".to_string()];
591    let mut placeholders = vec!["$1".to_string()];
592    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594    let mut i = 0usize;
595    for (key, val) in obj {
596        if key == "id" {
597            // Already emitted via the synthetic first column; skipping
598            // here avoids `INSERT ... (id, id, ...)` which Postgres
599            // rejects with `duplicate key value`.
600            continue;
601        }
602        col_names.push(quote_ident(key));
603        placeholders.push(format!("${}", i + 2));
604        values.push(JsonParam::from_json(val));
605        i += 1;
606    }
607
608    let sql = format!(
609        "INSERT INTO {} ({}) VALUES ({})",
610        quote_ident(entity),
611        col_names.join(", "),
612        placeholders.join(", ")
613    );
614
615    Ok((sql, values))
616}
617
618/// Build an UPDATE SQL statement and collect typed parameter values.
619/// Returns `(sql, params)` where `params[0]` is the row ID.
620pub fn build_update_sql(
621    entity: &str,
622    id: &str,
623    data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625    let obj = data.as_object().ok_or_else(|| StorageError {
626        code: "PG_INVALID_DATA".into(),
627        message: "Update data must be a JSON object".into(),
628    })?;
629
630    if obj.is_empty() {
631        return Err(StorageError {
632            code: "PG_INVALID_DATA".into(),
633            message: "Update data must contain at least one field".into(),
634        });
635    }
636
637    let mut set_clauses = Vec::new();
638    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640    let mut i = 0usize;
641    for (key, val) in obj {
642        if key == "id" {
643            // Reject primary-key mutation. Letting `id` into the SET
644            // clause lets a client move a row out from under its CRDT
645            // sidecar (which is keyed by the original row_id) and its
646            // FTS shadow row (FK-bound to the original id). The SQLite
647            // path silently drops `id` here too — keep the same
648            // shape, but errors so the caller sees the bug.
649            return Err(StorageError {
650                code: "PG_INVALID_UPDATE".into(),
651                message:
652                    "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653                     drop the field from the patch."
654                        .into(),
655            });
656        }
657        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658        values.push(JsonParam::from_json(val));
659        i += 1;
660    }
661
662    if set_clauses.is_empty() {
663        return Err(StorageError {
664            code: "PG_INVALID_DATA".into(),
665            message: "Update data must contain at least one non-id field".into(),
666        });
667    }
668
669    let sql = format!(
670        "UPDATE {} SET {} WHERE id = $1",
671        quote_ident(entity),
672        set_clauses.join(", ")
673    );
674
675    Ok((sql, values))
676}
677
678/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
679/// at insert/update/transact call sites. The postgres driver wants a
680/// slice of trait objects; this avoids repeating the same map at each site.
681#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683    values
684        .iter()
685        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686        .collect()
687}
688
689// ---------------------------------------------------------------------------
690// Live Postgres adapter (requires "postgres-live" feature)
691// ---------------------------------------------------------------------------
692
693#[cfg(feature = "postgres-live")]
694pub mod live {
695    use super::*;
696    use crate::{
697        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698    };
699
700    /// SSL parsing result for `parse_pg_url_ssl`. We need both the
701    /// cleaned-up URL (libpq-only params stripped) and the boolean
702    /// "should we use TLS for this connection" so the connect path
703    /// can pick between `NoTls` and `MakeTlsConnector`.
704    pub(super) struct PgUrlSsl {
705        pub use_tls: bool,
706    }
707
708    /// Pre-process a Postgres URL: strip libpq-specific query params
709    /// the Rust `postgres` crate's URL parser doesn't accept, and
710    /// figure out whether TLS should be enabled for the connection.
711    ///
712    /// Recognizes:
713    ///   - sslmode=disable / prefer / allow → no TLS
714    ///   - sslmode=require / verify-ca / verify-full → TLS
715    ///   - sslrootcert=system → TLS, use OS CA store (handled by
716    ///     `native-tls` automatically; we just record the intent)
717    ///   - sslrootcert=<path> → TLS, but the path is currently
718    ///     ignored — `native-tls` reads system roots only. Logged
719    ///     as a one-time warning so operators see the gap.
720    ///
721    /// Anything we don't understand is dropped from the URL silently
722    /// (defense against future libpq additions confusing the parser).
723    pub(super) fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
724        let (base, query) = match url.find('?') {
725            Some(idx) => (&url[..idx], &url[idx + 1..]),
726            None => return (url.to_string(), PgUrlSsl { use_tls: false }),
727        };
728
729        let mut use_tls = false;
730        let mut kept: Vec<String> = Vec::new();
731        for pair in query.split('&') {
732            if pair.is_empty() {
733                continue;
734            }
735            let (k, v) = match pair.find('=') {
736                Some(i) => (&pair[..i], &pair[i + 1..]),
737                None => (pair, ""),
738            };
739            match k {
740                "sslmode" => match v.to_ascii_lowercase().as_str() {
741                    "disable" | "allow" => {
742                        // Caller said "no TLS" — pass through to the
743                        // postgres crate, which knows `disable`.
744                        kept.push("sslmode=disable".to_string());
745                    }
746                    "prefer" => {
747                        // Postgres crate's default; let it through.
748                        kept.push("sslmode=prefer".to_string());
749                    }
750                    "require" | "verify-ca" | "verify-full" | "" => {
751                        // All the "use TLS" modes. Rust crate only
752                        // accepts `require`; verify-* would be
753                        // rejected. Normalize to `require` and let
754                        // our TLS connector handle cert verification.
755                        use_tls = true;
756                        kept.push("sslmode=require".to_string());
757                    }
758                    other => {
759                        tracing::warn!(
760                            "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
761                        );
762                        use_tls = true;
763                        kept.push("sslmode=require".to_string());
764                    }
765                },
766                "sslrootcert" => {
767                    // Either "system" (use OS roots — that's what
768                    // native-tls does anyway) or a path (which we
769                    // can't honor without bringing in openssl). Log
770                    // and drop in both cases; TLS still happens.
771                    if v != "system" && !v.is_empty() {
772                        tracing::warn!(
773                            "[pg] sslrootcert={v} ignored — native-tls uses system roots"
774                        );
775                    }
776                    use_tls = true;
777                }
778                _ => {
779                    // Forward everything else (search_path, application_name,
780                    // connect_timeout, etc.) so libpq-style URLs that work
781                    // for the rest of the world keep working here.
782                    kept.push(pair.to_string());
783                }
784            }
785        }
786
787        let cleaned = if kept.is_empty() {
788            base.to_string()
789        } else {
790            format!("{}?{}", base, kept.join("&"))
791        };
792        (cleaned, PgUrlSsl { use_tls })
793    }
794
795    #[cfg(test)]
796    mod url_tests {
797        use super::parse_pg_url_ssl;
798
799        #[test]
800        fn strips_libpq_only_sslmode_verify_full() {
801            let (cleaned, ssl) = parse_pg_url_ssl(
802                "postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system",
803            );
804            assert!(ssl.use_tls);
805            // verify-full normalized to require; sslrootcert dropped.
806            assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
807        }
808
809        #[test]
810        fn passes_through_disable() {
811            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
812            assert!(!ssl.use_tls);
813            assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
814        }
815
816        #[test]
817        fn no_query_string_no_tls() {
818            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
819            assert!(!ssl.use_tls);
820            assert_eq!(cleaned, "postgres://h/db");
821        }
822
823        #[test]
824        fn unknown_params_pass_through() {
825            let (cleaned, _) = parse_pg_url_ssl(
826                "postgres://h/db?application_name=pylon&connect_timeout=5",
827            );
828            assert!(cleaned.contains("application_name=pylon"));
829            assert!(cleaned.contains("connect_timeout=5"));
830        }
831
832        #[test]
833        fn sslrootcert_alone_enables_tls() {
834            // Rare but valid — sslrootcert=system implies TLS even
835            // without an explicit sslmode.
836            let (cleaned, ssl) = parse_pg_url_ssl(
837                "postgres://h/db?sslrootcert=system",
838            );
839            assert!(ssl.use_tls);
840            // No sslmode synthesized — the postgres crate defaults
841            // to `prefer` which happily upgrades to our TLS connector.
842            assert_eq!(cleaned, "postgres://h/db");
843        }
844    }
845
846    /// A live Postgres adapter with a real database connection.
847    pub struct LivePostgresAdapter {
848        client: postgres::Client,
849    }
850
851    impl LivePostgresAdapter {
852        /// Borrow the underlying postgres client mutably. Used by
853        /// `PostgresDataStore::with_transaction` to start an
854        /// interactive transaction across multiple TS-function
855        /// `ctx.db` calls. `pub(crate)` because exposing raw
856        /// `&mut Client` outside pylon-storage would let callers
857        /// issue arbitrary SQL, bypassing the typed `DataStore`
858        /// surface that the rest of the framework relies on.
859        pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
860            &mut self.client
861        }
862
863        /// Connect to a Postgres database.
864        ///
865        /// Honors libpq-style URL params the Rust postgres crate doesn't
866        /// natively understand:
867        ///   - `sslmode=verify-full`, `verify-ca`, `require` → use TLS
868        ///     via rustls + the OS trust store. The Rust postgres crate
869        ///     only knows `disable`/`prefer`/`require` — it rejects the
870        ///     libpq extras with "invalid connection string".
871        ///   - `sslrootcert=system` → trust the OS CA store (rustls-
872        ///     native-certs reads it via SecurityFramework / SChannel /
873        ///     /etc/ssl). `sslrootcert=<path>` is not yet supported and
874        ///     falls back to system roots with a warning.
875        ///
876        /// Strips the libpq-only params from the URL before passing to
877        /// the postgres crate's parser, so it doesn't choke. Common
878        /// real-world example that was failing pre-fix: Fly Postgres
879        /// emits `?sslmode=verify-full&sslrootcert=system` URLs.
880        ///
881        /// Uses rustls (not native-tls/openssl) so binary builds
882        /// cross-compile cleanly on musl + arm64 without needing
883        /// OPENSSL_DIR. Pure Rust all the way down.
884        pub fn connect(url: &str) -> Result<Self, StorageError> {
885            let (cleaned, ssl) = parse_pg_url_ssl(url);
886            let result = if ssl.use_tls {
887                let mut roots = rustls::RootCertStore::empty();
888                let native_certs = rustls_native_certs::load_native_certs();
889                for cert in native_certs.certs {
890                    let _ = roots.add(cert);
891                }
892                if !native_certs.errors.is_empty() {
893                    tracing::warn!(
894                        "[pg] rustls native cert load reported {} non-fatal errors",
895                        native_certs.errors.len()
896                    );
897                }
898                let config = rustls::ClientConfig::builder()
899                    .with_root_certificates(roots)
900                    .with_no_client_auth();
901                let tls = tokio_postgres_rustls::MakeRustlsConnect::new(config);
902                postgres::Client::connect(&cleaned, tls)
903            } else {
904                postgres::Client::connect(&cleaned, postgres::NoTls)
905            };
906            let client = result.map_err(|e| StorageError {
907                code: "PG_CONNECT_FAILED".into(),
908                message: format!("Failed to connect to Postgres: {e}"),
909            })?;
910            Ok(Self { client })
911        }
912
913        /// Read the current schema from the live database.
914        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
915            let table_rows = self
916                .client
917                .query(INTROSPECT_TABLES_SQL, &[])
918                .map_err(pg_err)?;
919
920            let mut tables = Vec::new();
921            for row in &table_rows {
922                let table_name: String = row.get(0);
923                let columns = self.read_columns(&table_name)?;
924                let indexes = self.read_indexes(&table_name)?;
925                tables.push(TableSnapshot {
926                    name: table_name,
927                    columns,
928                    indexes,
929                });
930            }
931
932            Ok(SchemaSnapshot { tables })
933        }
934
935        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
936            let rows = self
937                .client
938                .query(INTROSPECT_COLUMNS_SQL, &[&table])
939                .map_err(pg_err)?;
940
941            let mut columns = Vec::new();
942            for row in &rows {
943                let name: String = row.get(0);
944                let data_type: String = row.get(1);
945                let is_nullable: String = row.get(2);
946                let is_pk: i64 = row.get(3);
947                columns.push(ColumnSnapshot {
948                    name,
949                    column_type: data_type,
950                    notnull: is_nullable == "NO",
951                    primary_key: is_pk > 0,
952                });
953            }
954            Ok(columns)
955        }
956
957        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
958            let rows = self
959                .client
960                .query(INTROSPECT_INDEXES_SQL, &[&table])
961                .map_err(pg_err)?;
962
963            let mut indexes = Vec::new();
964            for row in &rows {
965                let name: String = row.get(0);
966                let unique: bool = row.get(1);
967                let columns: Vec<String> = row.get(2);
968                indexes.push(IndexSnapshot {
969                    name,
970                    columns,
971                    unique,
972                });
973            }
974            Ok(indexes)
975        }
976
977        /// Plan from live database state.
978        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
979            let snapshot = self.read_schema()?;
980            Ok(crate::plan_from_snapshot(&snapshot, target))
981        }
982    }
983
984    impl StorageAdapter for LivePostgresAdapter {
985        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
986            Err(StorageError {
987                code: "PG_PLAN_NEEDS_MUTABLE".into(),
988                message: "Use plan_from_live() instead for live Postgres planning".into(),
989            })
990        }
991
992        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
993            Err(StorageError {
994                code: "PG_APPLY_USE_METHOD".into(),
995                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
996            })
997        }
998    }
999
1000    impl LivePostgresAdapter {
1001        /// Apply a schema plan to the live database.
1002        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
1003            let statements = plan_to_sql(plan)?;
1004            for sql in &statements {
1005                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
1006            }
1007            Ok(())
1008        }
1009
1010        /// Execute a raw SQL statement against the live database. Used by
1011        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
1012        /// production code should go through `apply_plan` so changes are
1013        /// represented in the migration history. Returns the number of
1014        /// rows affected.
1015        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1016            self.client.execute(sql, &[]).map_err(pg_err)
1017        }
1018
1019        /// Insert a row. Returns the generated ID.
1020        pub fn insert(
1021            &mut self,
1022            entity: &str,
1023            data: &serde_json::Value,
1024        ) -> Result<String, StorageError> {
1025            let (sql, values) = build_insert_sql(entity, data)?;
1026            // The first param is always the generated ID — extract it before
1027            // we hand `values` off to the postgres driver as borrowed slices.
1028            let id = match &values[0] {
1029                JsonParam::Text(s) => s.clone(),
1030                _ => {
1031                    return Err(StorageError {
1032                        code: "PG_INTERNAL".into(),
1033                        message: "build_insert_sql produced non-text id param".into(),
1034                    });
1035                }
1036            };
1037            let params = as_pg_params(&values);
1038            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1039            Ok(id)
1040        }
1041
1042        /// Get a row by ID.
1043        pub fn get_by_id(
1044            &mut self,
1045            entity: &str,
1046            id: &str,
1047        ) -> Result<Option<serde_json::Value>, StorageError> {
1048            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1049            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1050
1051            match rows.first() {
1052                Some(row) => Ok(Some(row_to_json(row))),
1053                None => Ok(None),
1054            }
1055        }
1056
1057        /// List all rows from an entity.
1058        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1059            let sql = format!("SELECT * FROM {}", quote_ident(entity));
1060            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1061
1062            Ok(rows.iter().map(row_to_json).collect())
1063        }
1064
1065        /// Cursor-paginated list. `after` is the last `id` from the previous
1066        /// page; the result contains rows with `id > after` (lex order),
1067        /// limited to `limit`. Used for sync push/pull.
1068        pub fn list_after(
1069            &mut self,
1070            entity: &str,
1071            after: Option<&str>,
1072            limit: usize,
1073        ) -> Result<Vec<serde_json::Value>, StorageError> {
1074            // Cap limit at a sensible upper bound so a malicious client can't
1075            // stream the whole table by passing limit=u64::MAX.
1076            let capped: i64 = limit.min(10_000) as i64;
1077            let sql = match after {
1078                Some(_) => format!(
1079                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1080                    quote_ident(entity)
1081                ),
1082                None => format!(
1083                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1084                    quote_ident(entity)
1085                ),
1086            };
1087            let rows = match after {
1088                Some(cursor) => self
1089                    .client
1090                    .query(sql.as_str(), &[&cursor, &capped])
1091                    .map_err(pg_err)?,
1092                None => self
1093                    .client
1094                    .query(sql.as_str(), &[&capped])
1095                    .map_err(pg_err)?,
1096            };
1097            Ok(rows.iter().map(row_to_json).collect())
1098        }
1099
1100        /// Update a row by ID. Returns true if the row was found and updated.
1101        pub fn update(
1102            &mut self,
1103            entity: &str,
1104            id: &str,
1105            data: &serde_json::Value,
1106        ) -> Result<bool, StorageError> {
1107            let (sql, values) = build_update_sql(entity, id, data)?;
1108            let params = as_pg_params(&values);
1109            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1110            Ok(rows_affected > 0)
1111        }
1112
1113        /// Delete a row by ID. Returns true if the row was found and deleted.
1114        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1115            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1116            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1117            Ok(rows_affected > 0)
1118        }
1119
1120        /// Look up a row by `field = value`. Caller must validate `field`
1121        /// against the manifest before calling — we still `quote_ident` it
1122        /// but won't catch a typo against the entity definition.
1123        pub fn lookup_field(
1124            &mut self,
1125            entity: &str,
1126            field: &str,
1127            value: &str,
1128        ) -> Result<Option<serde_json::Value>, StorageError> {
1129            let sql = format!(
1130                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1131                quote_ident(entity),
1132                quote_ident(field),
1133            );
1134            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1135            Ok(rows.first().map(row_to_json))
1136        }
1137
1138        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
1139        ///
1140        /// Supported operators (parity with the SQLite path):
1141        /// - Equality (`field: value`)
1142        /// - `$not`: emits `field != value`
1143        /// - `$gt` / `$gte` / `$lt` / `$lte`
1144        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
1145        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
1146        ///   if/when the SQLite side adds it)
1147        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
1148        ///
1149        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
1150        ///
1151        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
1152        /// would need a tsvector column or a generic ILIKE OR-fold across
1153        /// every text field, neither of which is wired up yet. Returns
1154        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
1155        /// receiving silently-broad results.
1156        ///
1157        /// Anything else is silently ignored (matches the in-memory fallback's
1158        /// permissive behavior). Field names are validated against `valid_columns`
1159        /// to prevent SQL injection — pass the entity's column set.
1160        pub fn query_filtered(
1161            &mut self,
1162            entity: &str,
1163            filter: &serde_json::Value,
1164            valid_columns: &[String],
1165        ) -> Result<Vec<serde_json::Value>, StorageError> {
1166            let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1167            let pg_params = as_pg_params(&params);
1168            let rows = self
1169                .client
1170                .query(sql.as_str(), &pg_params)
1171                .map_err(pg_err)?;
1172            Ok(rows.iter().map(row_to_json).collect())
1173        }
1174
1175        /// Build the `SELECT ... FROM entity ...` SQL + bound params for
1176        /// a `query_filtered` request. Pure: takes a manifest's column
1177        /// list, returns text. Both the live adapter and the in-tx
1178        /// `PgTxStore` call this so the operator surface ($eq, $like,
1179        /// $in, $order, $limit, $offset) stays identical regardless of
1180        /// where the query runs.
1181        pub(crate) fn build_query_filtered_sql(
1182            entity: &str,
1183            filter: &serde_json::Value,
1184            valid_columns: &[String],
1185        ) -> Result<(String, Vec<JsonParam>), StorageError> {
1186            let empty = serde_json::Map::new();
1187            let obj = filter.as_object().unwrap_or(&empty);
1188
1189            let validate = |col: &str| -> Result<(), StorageError> {
1190                if col == "id" || valid_columns.iter().any(|c| c == col) {
1191                    Ok(())
1192                } else {
1193                    Err(StorageError {
1194                        code: "UNKNOWN_COLUMN".into(),
1195                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1196                    })
1197                }
1198            };
1199
1200            let mut where_clauses: Vec<String> = Vec::new();
1201            let mut order_clause = String::new();
1202            let mut limit_clause = String::new();
1203            let mut offset_clause = String::new();
1204            // Collect (col, op, value) so placeholder numbers can be assigned
1205            // in a single materialization pass after the parse loop. Values
1206            // are now JsonParam (typed) instead of String — see `value_to_pg`.
1207            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1208
1209            for (key, val) in obj {
1210                match key.as_str() {
1211                    "$search" => {
1212                        // PG full-text via the entity's `_fts_<entity>`
1213                        // shadow table: `id IN (SELECT entity_id FROM
1214                        // _fts_<E> WHERE tsv @@ plainto_tsquery(...))`.
1215                        // Mirrors the SQLite path that joins the FTS5
1216                        // virtual table; the join here is a subquery so
1217                        // it composes with arbitrary other predicates
1218                        // (`$gt`, `$in`, etc.) in the same WHERE.
1219                        let raw = match val {
1220                            serde_json::Value::String(s) => s.clone(),
1221                            other => other.to_string(),
1222                        };
1223                        // Bind as a normal text param so all the
1224                        // existing placeholder-numbering and binding
1225                        // logic applies. The SQL uses the placeholder
1226                        // inside `plainto_tsquery('english', $N)`.
1227                        // Positioning logic mirrors the `$in` arm: the
1228                        // value will land at `planned.len()+1` because
1229                        // the materialization pass below pushes one
1230                        // param per planned item in order.
1231                        let placeholder_n = planned.len() + 1;
1232                        where_clauses.push(format!(
1233                            "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1234                                       WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1235                            quote_ident(entity),
1236                        ));
1237                        // Reuse the IN-style sentinel so the
1238                        // materialization pass below pushes the param
1239                        // without re-emitting a where_clause for it.
1240                        planned.push((
1241                            format!("__search_{}", planned.len()),
1242                            "__INLINE__".into(),
1243                            JsonParam::Text(raw),
1244                        ));
1245                    }
1246                    "$order" => {
1247                        if let Some(ord) = val.as_object() {
1248                            let mut parts = Vec::new();
1249                            for (col, dir) in ord {
1250                                validate(col)?;
1251                                let d = match dir.as_str().unwrap_or("asc") {
1252                                    "desc" | "DESC" => "DESC",
1253                                    _ => "ASC",
1254                                };
1255                                parts.push(format!("{} {d}", quote_ident(col)));
1256                            }
1257                            if !parts.is_empty() {
1258                                order_clause = format!(" ORDER BY {}", parts.join(", "));
1259                            }
1260                        }
1261                    }
1262                    "$limit" => {
1263                        if let Some(n) = val.as_u64() {
1264                            limit_clause = format!(" LIMIT {}", n);
1265                        }
1266                    }
1267                    "$offset" => {
1268                        if let Some(n) = val.as_u64() {
1269                            offset_clause = format!(" OFFSET {}", n);
1270                        }
1271                    }
1272                    field => {
1273                        validate(field)?;
1274                        match val {
1275                            serde_json::Value::Object(ops) => {
1276                                for (op, v) in ops {
1277                                    match op.as_str() {
1278                                        "$not" => planned.push((
1279                                            field.into(),
1280                                            "!=".into(),
1281                                            value_to_pg(v),
1282                                        )),
1283                                        "$gt" => {
1284                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
1285                                        }
1286                                        "$gte" => planned.push((
1287                                            field.into(),
1288                                            ">=".into(),
1289                                            value_to_pg(v),
1290                                        )),
1291                                        "$lt" => {
1292                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
1293                                        }
1294                                        "$lte" => planned.push((
1295                                            field.into(),
1296                                            "<=".into(),
1297                                            value_to_pg(v),
1298                                        )),
1299                                        "$like" => {
1300                                            // Wrap in `%...%` to match the
1301                                            // SQLite path's substring
1302                                            // semantics. Pre-fix divergence:
1303                                            // SQLite wrapped, PG forwarded
1304                                            // literally — `{name: {$like: "ann"}}`
1305                                            // matched "Joanne" on SQLite but
1306                                            // nothing on PG. Caller-supplied
1307                                            // wildcards inside the value still
1308                                            // work (`%j_n%` etc.) because we
1309                                            // only add wraps, never strip.
1310                                            let raw = match v {
1311                                                serde_json::Value::String(s) => s.clone(),
1312                                                other => other.to_string(),
1313                                            };
1314                                            planned.push((
1315                                                field.into(),
1316                                                "LIKE".into(),
1317                                                JsonParam::Text(format!("%{raw}%")),
1318                                            ));
1319                                        }
1320                                        "$in" => {
1321                                            if let Some(arr) = v.as_array() {
1322                                                if arr.is_empty() {
1323                                                    // `field IN ()` is invalid
1324                                                    // SQL on PG (and on SQLite
1325                                                    // too, technically — its
1326                                                    // path also short-circuits).
1327                                                    // An empty $in matches
1328                                                    // nothing; emit a guaranteed-
1329                                                    // false predicate so the
1330                                                    // parser doesn't choke and
1331                                                    // the result set comes back
1332                                                    // empty.
1333                                                    where_clauses.push("FALSE".into());
1334                                                } else {
1335                                                    let placeholders: Vec<String> = (0..arr.len())
1336                                                        .map(|i| {
1337                                                            format!("${}", planned.len() + 1 + i)
1338                                                        })
1339                                                        .collect();
1340                                                    where_clauses.push(format!(
1341                                                        "{} IN ({})",
1342                                                        quote_ident(field),
1343                                                        placeholders.join(", "),
1344                                                    ));
1345                                                    for x in arr {
1346                                                        planned.push((
1347                                                            format!("__inline_{}", planned.len()),
1348                                                            "__INLINE__".into(),
1349                                                            value_to_pg(x),
1350                                                        ));
1351                                                    }
1352                                                }
1353                                            }
1354                                        }
1355                                        _ => {}
1356                                    }
1357                                }
1358                            }
1359                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1360                        }
1361                    }
1362                }
1363            }
1364
1365            // Materialize planned -> SQL + params.
1366            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1367            for (field, op, v) in &planned {
1368                if op == "__INLINE__" {
1369                    // Already emitted via the IN-clause path; just push the value.
1370                    params.push(v.clone());
1371                } else {
1372                    let placeholder = format!("${}", params.len() + 1);
1373                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1374                    params.push(v.clone());
1375                }
1376            }
1377
1378            let where_sql = if where_clauses.is_empty() {
1379                String::new()
1380            } else {
1381                format!(" WHERE {}", where_clauses.join(" AND "))
1382            };
1383            // Default deterministic order when the caller didn't pass
1384            // `$order` — matches the SQLite path. Without this,
1385            // identical queries return rows in different orders across
1386            // backends, which makes paginated APIs flaky.
1387            let final_order = if order_clause.is_empty() {
1388                format!(" ORDER BY {}", quote_ident("id"))
1389            } else {
1390                order_clause
1391            };
1392            let sql = format!(
1393                "SELECT * FROM {}{}{}{}{}",
1394                quote_ident(entity),
1395                where_sql,
1396                final_order,
1397                limit_clause,
1398                offset_clause,
1399            );
1400
1401            Ok((sql, params))
1402        }
1403
1404        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1405        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1406        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1407        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1408        /// via `date_trunc`), and a flat-equality `where` filter.
1409        ///
1410        /// Spec format (same JSON shape used by the SQLite path):
1411        /// ```json
1412        /// { "count": "*",
1413        ///   "sum": ["amount"],
1414        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1415        ///   "where": {"status": "paid"} }
1416        /// ```
1417        ///
1418        /// `valid_columns` is used to validate every field name before it's
1419        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1420        /// `DataStore` impl in this crate) supplies the entity's column set
1421        /// from the manifest.
1422        pub fn aggregate(
1423            &mut self,
1424            entity: &str,
1425            spec: &serde_json::Value,
1426            valid_columns: &[String],
1427        ) -> Result<serde_json::Value, StorageError> {
1428            let (sql, params, column_names) =
1429                Self::build_aggregate_sql(entity, spec, valid_columns)?;
1430            let pg_params = as_pg_params(&params);
1431            let rows = self
1432                .client
1433                .query(sql.as_str(), &pg_params)
1434                .map_err(pg_err)?;
1435            Ok(aggregate_rows_to_json(&rows, &column_names))
1436        }
1437
1438        /// Build the aggregate `SELECT` SQL + bound params + the
1439        /// expected output column names. Pure: takes the entity's
1440        /// validated column list, returns text. Both the live adapter
1441        /// and the in-tx `PgTxStore` call this so spec parsing
1442        /// (validation, bucket vocabulary, where-clause translation)
1443        /// stays identical regardless of where the query runs.
1444        pub(crate) fn build_aggregate_sql(
1445            entity: &str,
1446            spec: &serde_json::Value,
1447            valid_columns: &[String],
1448        ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1449            let obj = spec.as_object().ok_or_else(|| StorageError {
1450                code: "INVALID_QUERY".into(),
1451                message: "aggregate spec must be a JSON object".into(),
1452            })?;
1453
1454            let validate = |col: &str| -> Result<(), StorageError> {
1455                if col == "id" || valid_columns.iter().any(|c| c == col) {
1456                    Ok(())
1457                } else {
1458                    Err(StorageError {
1459                        code: "UNKNOWN_COLUMN".into(),
1460                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1461                    })
1462                }
1463            };
1464
1465            let mut select_parts: Vec<String> = Vec::new();
1466            let mut result_fields: Vec<String> = Vec::new();
1467
1468            if let Some(count) = obj.get("count") {
1469                match count {
1470                    serde_json::Value::String(s) if s == "*" => {
1471                        select_parts.push("COUNT(*) AS count".into());
1472                        result_fields.push("count".into());
1473                    }
1474                    serde_json::Value::String(field) => {
1475                        validate(field)?;
1476                        let alias = format!("count_{field}");
1477                        select_parts.push(format!(
1478                            "COUNT({}) AS {}",
1479                            quote_ident(field),
1480                            quote_ident(&alias),
1481                        ));
1482                        result_fields.push(alias);
1483                    }
1484                    _ => {}
1485                }
1486            }
1487
1488            for (fn_name, prefix) in [
1489                ("sum", "sum_"),
1490                ("avg", "avg_"),
1491                ("min", "min_"),
1492                ("max", "max_"),
1493            ] {
1494                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1495                    for field in fields {
1496                        if let Some(f) = field.as_str() {
1497                            validate(f)?;
1498                            let alias = format!("{prefix}{f}");
1499                            let sql_fn = fn_name.to_uppercase();
1500                            select_parts.push(format!(
1501                                "{}({}) AS {}",
1502                                sql_fn,
1503                                quote_ident(f),
1504                                quote_ident(&alias),
1505                            ));
1506                            result_fields.push(alias);
1507                        }
1508                    }
1509                }
1510            }
1511
1512            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1513                for field in fields {
1514                    if let Some(f) = field.as_str() {
1515                        validate(f)?;
1516                        let alias = format!("count_distinct_{f}");
1517                        select_parts.push(format!(
1518                            "COUNT(DISTINCT {}) AS {}",
1519                            quote_ident(f),
1520                            quote_ident(&alias),
1521                        ));
1522                        result_fields.push(alias);
1523                    }
1524                }
1525            }
1526
1527            // groupBy: column name or { field, bucket } — same vocabulary as
1528            // the SQLite path. Buckets translate to Postgres `date_trunc`
1529            // (SQLite uses `strftime`); both collapse rows to the bucket
1530            // boundary identically.
1531            let mut group_by: Vec<String> = Vec::new();
1532            let mut group_select: Vec<String> = Vec::new();
1533            let mut group_field_names: Vec<String> = Vec::new();
1534            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1535                for g in groups {
1536                    if let Some(f) = g.as_str() {
1537                        validate(f)?;
1538                        let q = quote_ident(f);
1539                        group_by.push(q.clone());
1540                        group_select.push(q);
1541                        group_field_names.push(f.to_string());
1542                    } else if let Some(spec) = g.as_object() {
1543                        let field =
1544                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1545                                StorageError {
1546                                    code: "INVALID_QUERY".into(),
1547                                    message: "groupBy object spec requires `field`".into(),
1548                                }
1549                            })?;
1550                        validate(field)?;
1551                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1552                        let trunc_unit = match bucket {
1553                            "hour" | "day" | "week" | "month" | "year" => bucket,
1554                            _ => {
1555                                return Err(StorageError {
1556                                    code: "INVALID_QUERY".into(),
1557                                    message: format!(
1558                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1559                                    ),
1560                                });
1561                            }
1562                        };
1563                        let alias = format!("{field}_{bucket}");
1564                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1565                        group_by.push(expr.clone());
1566                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1567                        group_field_names.push(alias);
1568                    }
1569                }
1570            }
1571
1572            let mut full_select = group_select.clone();
1573            full_select.extend(select_parts.iter().cloned());
1574            if full_select.is_empty() {
1575                return Err(StorageError {
1576                    code: "INVALID_QUERY".into(),
1577                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1578                });
1579            }
1580
1581            let mut where_clauses: Vec<String> = Vec::new();
1582            let mut params: Vec<JsonParam> = Vec::new();
1583            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1584                for (k, v) in w {
1585                    validate(k)?;
1586                    let placeholder = format!("${}", params.len() + 1);
1587                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1588                    params.push(value_to_pg(v));
1589                }
1590            }
1591            let where_sql = if where_clauses.is_empty() {
1592                String::new()
1593            } else {
1594                format!(" WHERE {}", where_clauses.join(" AND "))
1595            };
1596            let group_sql = if group_by.is_empty() {
1597                String::new()
1598            } else {
1599                format!(" GROUP BY {}", group_by.join(", "))
1600            };
1601
1602            let sql = format!(
1603                "SELECT {} FROM {}{}{}",
1604                full_select.join(", "),
1605                quote_ident(entity),
1606                where_sql,
1607                group_sql,
1608            );
1609
1610            let column_names: Vec<String> = group_field_names
1611                .iter()
1612                .chain(result_fields.iter())
1613                .cloned()
1614                .collect();
1615
1616            Ok((sql, params, column_names))
1617        }
1618    }
1619
1620    /// Project rows from an aggregate `SELECT` into the
1621    /// `{ rows: [{...}] }` JSON shape both the SQLite path and the
1622    /// PG path return. Pure post-processing — works on rows produced
1623    /// from either `Client::query` or `Transaction::query`.
1624    pub fn aggregate_rows_to_json(
1625        rows: &[postgres::Row],
1626        column_names: &[String],
1627    ) -> serde_json::Value {
1628        let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1629        for row in rows {
1630            let row_json = row_to_json(row);
1631            if let serde_json::Value::Object(map) = &row_json {
1632                let mut filtered = serde_json::Map::new();
1633                for name in column_names {
1634                    if let Some(v) = map.get(name) {
1635                        filtered.insert(name.clone(), v.clone());
1636                    }
1637                }
1638                out.push(serde_json::Value::Object(filtered));
1639            } else {
1640                out.push(row_json);
1641            }
1642        }
1643        serde_json::json!({ "rows": out })
1644    }
1645
1646    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1647    pub enum TxOp<'a> {
1648        Insert {
1649            entity: &'a str,
1650            data: &'a serde_json::Value,
1651        },
1652        Update {
1653            entity: &'a str,
1654            id: &'a str,
1655            data: &'a serde_json::Value,
1656        },
1657        Delete {
1658            entity: &'a str,
1659            id: &'a str,
1660        },
1661    }
1662
1663    /// Result of a single op inside a transaction.
1664    #[derive(Debug, Clone)]
1665    pub enum TxResult {
1666        Inserted(String),
1667        Updated(bool),
1668        Deleted(bool),
1669    }
1670
1671    impl LivePostgresAdapter {
1672        /// Run `ops` inside a single Postgres transaction. Either all of them
1673        /// commit together or none of them do — there is no partial state on
1674        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1675        /// guard is dropped without `commit()` being called.
1676        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1677            let mut tx = self.client.transaction().map_err(pg_err)?;
1678            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1679
1680            for op in ops {
1681                match op {
1682                    TxOp::Insert { entity, data } => {
1683                        let (sql, values) = build_insert_sql(entity, data)?;
1684                        let id = match &values[0] {
1685                            JsonParam::Text(s) => s.clone(),
1686                            _ => {
1687                                return Err(StorageError {
1688                                    code: "PG_INTERNAL".into(),
1689                                    message: "build_insert_sql produced non-text id param".into(),
1690                                });
1691                            }
1692                        };
1693                        let params = as_pg_params(&values);
1694                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1695                        results.push(TxResult::Inserted(id));
1696                    }
1697                    TxOp::Update { entity, id, data } => {
1698                        let (sql, values) = build_update_sql(entity, id, data)?;
1699                        let params = as_pg_params(&values);
1700                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1701                        results.push(TxResult::Updated(n > 0));
1702                    }
1703                    TxOp::Delete { entity, id } => {
1704                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1705                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1706                        results.push(TxResult::Deleted(n > 0));
1707                    }
1708                }
1709            }
1710
1711            tx.commit().map_err(pg_err)?;
1712            Ok(results)
1713        }
1714    }
1715
1716    /// Lift a JSON value into a typed Postgres parameter. The previous
1717    /// implementation collapsed everything to `String`, which silently
1718    /// stringified ints/bools and turned JSON `null` into `""` for
1719    /// nullable columns. Forwarding through `JsonParam` keeps the column
1720    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1721    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1722        JsonParam::from_json(v)
1723    }
1724
1725    pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1726        use postgres::types::Type;
1727        let mut obj = serde_json::Map::new();
1728        for (i, col) in row.columns().iter().enumerate() {
1729            let name = col.name().to_string();
1730
1731            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1732            // and a panic in a query handler poisons the connection mutex,
1733            // taking down all subsequent reads on this datastore. Anything
1734            // that fails to decode becomes Null with a one-shot warning.
1735            //
1736            // Timestamps and the catch-all path explicitly DON'T request
1737            // `String` — the postgres crate uses binary protocol by default
1738            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1739            // ask for `Vec<u8>` and lossy-stringify, which works for all
1740            // text-shaped columns in either protocol.
1741            let value: serde_json::Value = match *col.type_() {
1742                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1743                    .flatten()
1744                    .map(serde_json::Value::Bool)
1745                    .unwrap_or(serde_json::Value::Null),
1746                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1747                    .flatten()
1748                    .map(|v| serde_json::Value::Number(v.into()))
1749                    .unwrap_or(serde_json::Value::Null),
1750                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1751                    .flatten()
1752                    .map(|v| serde_json::Value::Number(v.into()))
1753                    .unwrap_or(serde_json::Value::Null),
1754                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1755                    .flatten()
1756                    .map(|v| serde_json::Value::Number(v.into()))
1757                    .unwrap_or(serde_json::Value::Null),
1758                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1759                    .flatten()
1760                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1761                    .map(serde_json::Value::Number)
1762                    .unwrap_or(serde_json::Value::Null),
1763                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1764                    .flatten()
1765                    .and_then(serde_json::Number::from_f64)
1766                    .map(serde_json::Value::Number)
1767                    .unwrap_or(serde_json::Value::Null),
1768                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1769                    .flatten()
1770                    .unwrap_or(serde_json::Value::Null),
1771                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1772                    .flatten()
1773                    .map(|b| serde_json::Value::String(b64(&b)))
1774                    .unwrap_or(serde_json::Value::Null),
1775                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1776                    try_get_or_null::<Option<String>>(row, i)
1777                        .flatten()
1778                        .map(serde_json::Value::String)
1779                        .unwrap_or(serde_json::Value::Null)
1780                }
1781                Type::TIMESTAMPTZ => {
1782                    // Decode via chrono::DateTime<Utc> (postgres's
1783                    // `with-chrono-0_4` feature provides FromSql) and
1784                    // re-format as ISO 8601 — the shape pylon's clients
1785                    // expect (matches `pylon_kernel::util::now_iso`,
1786                    // so timestamps round-trip with the same surface
1787                    // across SQLite + PG).
1788                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1789                        .flatten()
1790                        .map(|dt| {
1791                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1792                        })
1793                        .unwrap_or(serde_json::Value::Null)
1794                }
1795                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1796                    .flatten()
1797                    .map(|dt| {
1798                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1799                    })
1800                    .unwrap_or(serde_json::Value::Null),
1801                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1802                    .flatten()
1803                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1804                    .unwrap_or(serde_json::Value::Null),
1805                _ => {
1806                    // Last resort: ask Postgres to render anything else as
1807                    // text via a stringifying decode through Vec<u8>. If even
1808                    // that fails (rare — Postgres types not implementing the
1809                    // text format), fall through to Null with a warning.
1810                    match row.try_get::<_, Option<String>>(i) {
1811                        Ok(Some(s)) => serde_json::Value::String(s),
1812                        Ok(None) => serde_json::Value::Null,
1813                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1814                            Ok(Some(bytes)) => serde_json::Value::String(
1815                                String::from_utf8_lossy(&bytes).into_owned(),
1816                            ),
1817                            _ => serde_json::Value::Null,
1818                        },
1819                    }
1820                }
1821            };
1822            obj.insert(name, value);
1823        }
1824        serde_json::Value::Object(obj)
1825    }
1826
1827    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1828    where
1829        T: postgres::types::FromSql<'a>,
1830    {
1831        match row.try_get::<_, T>(i) {
1832            Ok(v) => Some(v),
1833            Err(e) => {
1834                tracing::warn!(
1835                    "[postgres] decode failed for column {} ({}): {e}",
1836                    i,
1837                    row.columns()[i].name()
1838                );
1839                None
1840            }
1841        }
1842    }
1843
1844    /// Minimal base64 encoder so we don't need another dependency just for
1845    /// the BYTEA column edge case.
1846    fn b64(bytes: &[u8]) -> String {
1847        const TABLE: &[u8; 64] =
1848            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1849        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1850        let chunks = bytes.chunks(3);
1851        for chunk in chunks {
1852            let b = [
1853                chunk.first().copied().unwrap_or(0),
1854                chunk.get(1).copied().unwrap_or(0),
1855                chunk.get(2).copied().unwrap_or(0),
1856            ];
1857            out.push(TABLE[(b[0] >> 2) as usize] as char);
1858            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1859            if chunk.len() > 1 {
1860                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1861            } else {
1862                out.push('=');
1863            }
1864            if chunk.len() > 2 {
1865                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1866            } else {
1867                out.push('=');
1868            }
1869        }
1870        out
1871    }
1872
1873    fn pg_err(e: postgres::Error) -> StorageError {
1874        // postgres::Error's Display is intentionally short ("db error",
1875        // "connection error" etc.) — the actual SQLSTATE / detail lives
1876        // on the source chain. Walk the chain so the final message has
1877        // enough signal to debug a failed insert/update without
1878        // attaching a debugger.
1879        use std::error::Error;
1880        let mut detail = format!("{e}");
1881        let mut src: Option<&dyn Error> = e.source();
1882        while let Some(s) = src {
1883            detail.push_str(": ");
1884            detail.push_str(&format!("{s}"));
1885            src = s.source();
1886        }
1887        StorageError {
1888            code: "PG_QUERY_FAILED".into(),
1889            message: format!("Postgres query failed: {detail}"),
1890        }
1891    }
1892}
1893
1894// ---------------------------------------------------------------------------
1895// Tests
1896// ---------------------------------------------------------------------------
1897
1898#[cfg(test)]
1899mod tests {
1900    use super::*;
1901
1902    /// Hand-rolled fixture that matches the snapshots in the tests
1903    /// below. Decoupled from any example's `pylon.manifest.json` so
1904    /// changing an example schema doesn't bleed into adapter tests.
1905    fn test_manifest() -> AppManifest {
1906        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1907        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1908            name: name.into(),
1909            field_type: ty.into(),
1910            optional: opt,
1911            unique: uniq,
1912            crdt: None,
1913        };
1914        AppManifest {
1915            manifest_version: 1,
1916            name: "test".into(),
1917            version: "0.0.0".into(),
1918            entities: vec![
1919                ManifestEntity {
1920                    name: "User".into(),
1921                    fields: vec![
1922                        f("email", "string", false, true),
1923                        f("displayName", "string", false, false),
1924                        f("createdAt", "datetime", false, false),
1925                    ],
1926                    indexes: vec![],
1927                    relations: vec![],
1928                    search: None,
1929                    crdt: true,
1930                },
1931                ManifestEntity {
1932                    name: "Todo".into(),
1933                    fields: vec![
1934                        f("title", "string", false, false),
1935                        f("done", "bool", false, false),
1936                        f("userId", "id(User)", false, false),
1937                        f("createdAt", "datetime", false, false),
1938                    ],
1939                    indexes: vec![ManifestIndex {
1940                        name: "by_user".into(),
1941                        fields: vec!["userId".into()],
1942                        unique: false,
1943                    }],
1944                    relations: vec![],
1945                    search: None,
1946                    crdt: true,
1947                },
1948            ],
1949            queries: vec![],
1950            actions: vec![],
1951            policies: vec![],
1952            routes: vec![],
1953            auth: Default::default(),
1954        }
1955    }
1956
1957    #[test]
1958    fn pg_type_mapping() {
1959        assert_eq!(pg_column_type("string"), "TEXT");
1960        assert_eq!(pg_column_type("int"), "INTEGER");
1961        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1962        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1963        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1964        assert_eq!(pg_column_type("richtext"), "TEXT");
1965        assert_eq!(pg_column_type("id(User)"), "TEXT");
1966    }
1967
1968    #[test]
1969    fn quote_ident_simple() {
1970        assert_eq!(quote_ident("User"), "\"User\"");
1971        assert_eq!(quote_ident("email"), "\"email\"");
1972    }
1973
1974    #[test]
1975    fn quote_ident_escapes_embedded_double_quotes() {
1976        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1977        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1978    }
1979
1980    #[test]
1981    fn create_table_sql_basic() {
1982        let fields = vec![
1983            FieldSpec {
1984                name: "email".into(),
1985                field_type: "string".into(),
1986                optional: false,
1987                unique: true,
1988            },
1989            FieldSpec {
1990                name: "age".into(),
1991                field_type: "int".into(),
1992                optional: true,
1993                unique: false,
1994            },
1995        ];
1996        let sql = create_table_sql("User", &fields);
1997        assert_eq!(
1998            sql,
1999            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
2000        );
2001    }
2002
2003    #[test]
2004    fn create_table_sql_escapes_identifiers() {
2005        let fields = vec![FieldSpec {
2006            name: "col\"x".into(),
2007            field_type: "string".into(),
2008            optional: false,
2009            unique: false,
2010        }];
2011        let sql = create_table_sql("my\"table", &fields);
2012        assert!(sql.contains("\"my\"\"table\""));
2013        assert!(sql.contains("\"col\"\"x\""));
2014    }
2015
2016    #[test]
2017    fn create_index_sql_unique() {
2018        let sql = create_index_sql("User", "by_email", &["email".into()], true);
2019        assert_eq!(
2020            sql,
2021            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2022        );
2023    }
2024
2025    #[test]
2026    fn create_index_sql_non_unique() {
2027        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2028        assert_eq!(
2029            sql,
2030            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2031        );
2032    }
2033
2034    #[test]
2035    fn add_column_sql_basic() {
2036        let field = FieldSpec {
2037            name: "bio".into(),
2038            field_type: "string".into(),
2039            optional: true,
2040            unique: false,
2041        };
2042        let sql = add_column_sql("User", &field);
2043        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2044    }
2045
2046    #[test]
2047    fn plan_from_manifest() {
2048        let adapter = PostgresAdapter;
2049        let manifest = test_manifest();
2050        let plan = adapter.plan_schema(&manifest).unwrap();
2051
2052        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
2053        assert!(plan.operations.iter().any(|op| matches!(
2054            op,
2055            SchemaOperation::CreateEntity { name, .. } if name == "User"
2056        )));
2057        assert!(plan.operations.iter().any(|op| matches!(
2058            op,
2059            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2060        )));
2061        assert!(plan.operations.iter().any(|op| matches!(
2062            op,
2063            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2064        )));
2065    }
2066
2067    #[test]
2068    fn plan_to_sql_produces_statements() {
2069        let adapter = PostgresAdapter;
2070        let manifest = test_manifest();
2071        let plan = adapter.plan_schema(&manifest).unwrap();
2072        let stmts = plan_to_sql(&plan).unwrap();
2073
2074        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
2075        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
2076        // declares a unique by_email index on User which lands as part of
2077        // the table. Final count: 2 tables + 2 indexes.
2078        let create_tables = stmts
2079            .iter()
2080            .filter(|s| s.starts_with("CREATE TABLE"))
2081            .count();
2082        let create_indexes = stmts
2083            .iter()
2084            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2085            .count();
2086        assert_eq!(create_tables, 2);
2087        assert!(create_indexes >= 1);
2088        assert!(stmts[0].starts_with("CREATE TABLE"));
2089        assert!(stmts[1].starts_with("CREATE TABLE"));
2090    }
2091
2092    #[test]
2093    fn plan_to_sql_rejects_unsupported() {
2094        let plan = SchemaPlan {
2095            operations: vec![SchemaOperation::RemoveEntity {
2096                name: "User".into(),
2097            }],
2098        };
2099        let result = plan_to_sql(&plan);
2100        assert!(result.is_err());
2101        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2102    }
2103
2104    #[test]
2105    fn apply_not_implemented() {
2106        let adapter = PostgresAdapter;
2107        let plan = SchemaPlan {
2108            operations: vec![SchemaOperation::Noop],
2109        };
2110        let result = adapter.apply_schema(&plan);
2111        assert!(result.is_err());
2112        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2113    }
2114
2115    #[test]
2116    fn sql_uses_quoted_identifiers() {
2117        let fields = vec![FieldSpec {
2118            name: "createdAt".into(),
2119            field_type: "datetime".into(),
2120            optional: false,
2121            unique: false,
2122        }];
2123        let sql = create_table_sql("User", &fields);
2124        // Postgres identifiers should be quoted for case-sensitivity.
2125        assert!(sql.contains("\"User\""));
2126        assert!(sql.contains("\"createdAt\""));
2127        assert!(sql.contains("TIMESTAMPTZ"));
2128    }
2129
2130    // -- Introspection SQL tests --
2131
2132    #[test]
2133    fn introspect_sql_constants_are_valid() {
2134        // Sanity checks that the SQL strings exist and look reasonable.
2135        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2136        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2137        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2138        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2139    }
2140
2141    // -- Plan from snapshot tests --
2142
2143    #[test]
2144    fn plan_from_empty_snapshot_creates_all() {
2145        let snapshot = crate::SchemaSnapshot { tables: vec![] };
2146        let manifest = test_manifest();
2147        let plan = plan_from_snapshot(&snapshot, &manifest);
2148
2149        assert!(plan.operations.iter().any(|op| matches!(
2150            op,
2151            SchemaOperation::CreateEntity { name, .. } if name == "User"
2152        )));
2153        assert!(plan.operations.iter().any(|op| matches!(
2154            op,
2155            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2156        )));
2157        assert!(plan.operations.iter().any(|op| matches!(
2158            op,
2159            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2160        )));
2161    }
2162
2163    #[test]
2164    fn plan_from_full_snapshot_is_noop() {
2165        let snapshot = crate::SchemaSnapshot {
2166            tables: vec![
2167                crate::TableSnapshot {
2168                    name: "User".into(),
2169                    columns: vec![
2170                        crate::ColumnSnapshot {
2171                            name: "id".into(),
2172                            column_type: "TEXT".into(),
2173                            notnull: true,
2174                            primary_key: true,
2175                        },
2176                        crate::ColumnSnapshot {
2177                            name: "email".into(),
2178                            column_type: "TEXT".into(),
2179                            notnull: true,
2180                            primary_key: false,
2181                        },
2182                        crate::ColumnSnapshot {
2183                            name: "displayName".into(),
2184                            column_type: "TEXT".into(),
2185                            notnull: true,
2186                            primary_key: false,
2187                        },
2188                        crate::ColumnSnapshot {
2189                            name: "createdAt".into(),
2190                            column_type: "TIMESTAMPTZ".into(),
2191                            notnull: true,
2192                            primary_key: false,
2193                        },
2194                    ],
2195                    indexes: vec![],
2196                },
2197                crate::TableSnapshot {
2198                    name: "Todo".into(),
2199                    columns: vec![
2200                        crate::ColumnSnapshot {
2201                            name: "id".into(),
2202                            column_type: "TEXT".into(),
2203                            notnull: true,
2204                            primary_key: true,
2205                        },
2206                        crate::ColumnSnapshot {
2207                            name: "title".into(),
2208                            column_type: "TEXT".into(),
2209                            notnull: true,
2210                            primary_key: false,
2211                        },
2212                        crate::ColumnSnapshot {
2213                            name: "done".into(),
2214                            column_type: "BOOLEAN".into(),
2215                            notnull: true,
2216                            primary_key: false,
2217                        },
2218                        crate::ColumnSnapshot {
2219                            name: "userId".into(),
2220                            column_type: "TEXT".into(),
2221                            notnull: true,
2222                            primary_key: false,
2223                        },
2224                        crate::ColumnSnapshot {
2225                            name: "createdAt".into(),
2226                            column_type: "TIMESTAMPTZ".into(),
2227                            notnull: true,
2228                            primary_key: false,
2229                        },
2230                    ],
2231                    indexes: vec![crate::IndexSnapshot {
2232                        name: "Todo_by_user".into(),
2233                        columns: vec!["userId".into()],
2234                        unique: false,
2235                    }],
2236                },
2237            ],
2238        };
2239        let manifest = test_manifest();
2240        let plan = plan_from_snapshot(&snapshot, &manifest);
2241        assert!(plan.is_empty());
2242    }
2243
2244    #[test]
2245    fn plan_detects_missing_column_in_snapshot() {
2246        let snapshot = crate::SchemaSnapshot {
2247            tables: vec![
2248                crate::TableSnapshot {
2249                    name: "User".into(),
2250                    columns: vec![
2251                        crate::ColumnSnapshot {
2252                            name: "id".into(),
2253                            column_type: "TEXT".into(),
2254                            notnull: true,
2255                            primary_key: true,
2256                        },
2257                        crate::ColumnSnapshot {
2258                            name: "email".into(),
2259                            column_type: "TEXT".into(),
2260                            notnull: true,
2261                            primary_key: false,
2262                        },
2263                        // missing displayName and createdAt
2264                    ],
2265                    indexes: vec![],
2266                },
2267                crate::TableSnapshot {
2268                    name: "Todo".into(),
2269                    columns: vec![
2270                        crate::ColumnSnapshot {
2271                            name: "id".into(),
2272                            column_type: "TEXT".into(),
2273                            notnull: true,
2274                            primary_key: true,
2275                        },
2276                        crate::ColumnSnapshot {
2277                            name: "title".into(),
2278                            column_type: "TEXT".into(),
2279                            notnull: true,
2280                            primary_key: false,
2281                        },
2282                        crate::ColumnSnapshot {
2283                            name: "done".into(),
2284                            column_type: "BOOLEAN".into(),
2285                            notnull: true,
2286                            primary_key: false,
2287                        },
2288                        crate::ColumnSnapshot {
2289                            name: "userId".into(),
2290                            column_type: "TEXT".into(),
2291                            notnull: true,
2292                            primary_key: false,
2293                        },
2294                        crate::ColumnSnapshot {
2295                            name: "createdAt".into(),
2296                            column_type: "TIMESTAMPTZ".into(),
2297                            notnull: true,
2298                            primary_key: false,
2299                        },
2300                    ],
2301                    indexes: vec![crate::IndexSnapshot {
2302                        name: "Todo_by_user".into(),
2303                        columns: vec!["userId".into()],
2304                        unique: false,
2305                    }],
2306                },
2307            ],
2308        };
2309        let manifest = test_manifest();
2310        let plan = plan_from_snapshot(&snapshot, &manifest);
2311
2312        let add_fields: Vec<_> = plan
2313            .operations
2314            .iter()
2315            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2316            .collect();
2317        assert_eq!(add_fields.len(), 2); // displayName + createdAt
2318    }
2319
2320    // -- CRUD helper tests (no live database required) --
2321
2322    #[test]
2323    fn json_value_to_string_handles_all_types() {
2324        assert_eq!(
2325            json_value_to_string(&serde_json::Value::String("hello".into())),
2326            "hello"
2327        );
2328        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2329        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2330        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2331        assert_eq!(
2332            json_value_to_string(&serde_json::Value::Bool(false)),
2333            "false"
2334        );
2335        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2336        // Arrays and objects get their JSON representation.
2337        assert_eq!(
2338            json_value_to_string(&serde_json::json!([1, 2, 3])),
2339            "[1,2,3]"
2340        );
2341        assert_eq!(
2342            json_value_to_string(&serde_json::json!({"a": 1})),
2343            "{\"a\":1}"
2344        );
2345    }
2346
2347    #[test]
2348    fn generate_id_returns_hex_string() {
2349        let id = generate_id();
2350        assert!(!id.is_empty());
2351        // Must be valid hex characters.
2352        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2353    }
2354
2355    #[test]
2356    fn generate_id_is_unique_across_calls() {
2357        let id1 = generate_id();
2358        let id2 = generate_id();
2359        assert_ne!(id1, id2);
2360    }
2361
2362    #[test]
2363    fn generate_id_is_lex_sortable() {
2364        // 1000 IDs back-to-back must come out in monotonically increasing
2365        // lexicographic order. This is what makes cursor pagination correct.
2366        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2367        let sorted = {
2368            let mut s = ids.clone();
2369            s.sort();
2370            s
2371        };
2372        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2373        // And every id must be the same width (otherwise lex comparison is
2374        // wrong at width boundaries).
2375        let len0 = ids[0].len();
2376        assert!(ids.iter().all(|id| id.len() == len0));
2377        ids.dedup();
2378        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2379    }
2380
2381    #[test]
2382    fn build_insert_sql_simple() {
2383        let data = serde_json::json!({
2384            "email": "alice@example.com",
2385            "displayName": "Alice"
2386        });
2387        let (sql, values) = build_insert_sql("User", &data).unwrap();
2388
2389        assert!(sql.starts_with("INSERT INTO \"User\""));
2390        assert!(sql.contains("id"));
2391        assert!(sql.contains("$1"));
2392        assert!(sql.contains("$2"));
2393        assert!(sql.contains("$3"));
2394        // First value is the generated ID — JsonParam::Text variant.
2395        match &values[0] {
2396            JsonParam::Text(s) => assert!(!s.is_empty()),
2397            other => panic!("expected Text id param, got {other:?}"),
2398        }
2399        assert_eq!(values.len(), 3); // id + 2 fields
2400    }
2401
2402    #[test]
2403    fn build_insert_sql_preserves_json_types() {
2404        let data = serde_json::json!({
2405            "n": 42,
2406            "f": 1.5,
2407            "b": true,
2408            "s": "hi",
2409            "z": null,
2410        });
2411        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2412        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2413        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2414        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2415        assert!(matches!(kinds[1], JsonParam::Float(_)));
2416        assert!(matches!(kinds[2], JsonParam::Int(42)));
2417        assert!(matches!(kinds[3], JsonParam::Text(_)));
2418        assert!(matches!(kinds[4], JsonParam::Null));
2419    }
2420
2421    #[test]
2422    fn build_insert_sql_quotes_column_names() {
2423        let data = serde_json::json!({"createdAt": "2026-01-01"});
2424        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2425        assert!(sql.contains("\"createdAt\""));
2426        assert!(sql.contains("\"Todo\""));
2427    }
2428
2429    #[test]
2430    fn build_insert_sql_rejects_non_object() {
2431        let data = serde_json::json!("not an object");
2432        let result = build_insert_sql("User", &data);
2433        assert!(result.is_err());
2434        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2435    }
2436
2437    #[test]
2438    fn build_update_sql_simple() {
2439        let data = serde_json::json!({
2440            "displayName": "Bob",
2441            "email": "bob@example.com"
2442        });
2443        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2444
2445        assert!(sql.starts_with("UPDATE \"User\" SET"));
2446        assert!(sql.contains("WHERE id = $1"));
2447        assert!(sql.contains("$2"));
2448        assert!(sql.contains("$3"));
2449        match &values[0] {
2450            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2451            other => panic!("expected Text id param, got {other:?}"),
2452        }
2453        assert_eq!(values.len(), 3); // id + 2 fields
2454    }
2455
2456    #[test]
2457    fn build_update_sql_quotes_column_names() {
2458        let data = serde_json::json!({"displayName": "Carol"});
2459        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2460        assert!(sql.contains("\"displayName\" = $2"));
2461    }
2462
2463    #[test]
2464    fn build_update_sql_rejects_non_object() {
2465        let data = serde_json::json!(42);
2466        let result = build_update_sql("User", "id1", &data);
2467        assert!(result.is_err());
2468        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2469    }
2470
2471    #[test]
2472    fn build_update_sql_rejects_empty_object() {
2473        let data = serde_json::json!({});
2474        let err = build_update_sql("User", "id1", &data).unwrap_err();
2475        assert_eq!(err.code, "PG_INVALID_DATA");
2476        assert!(err.message.contains("at least one field"));
2477    }
2478}