Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36pub(crate) fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40/// Public re-export for sibling modules (e.g. `pg_tx_store`) that need
41/// to build SQL strings against the same dialect rules. Keeping the
42/// crate-private function untouched preserves the existing call sites
43/// inside this module.
44#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46    quote_ident(name)
47}
48
49/// Public re-export of [`live::row_to_json`] for sibling modules. Used
50/// by `pg_tx_store` so transactional reads return rows in the same
51/// JSON shape as the non-transactional path.
52#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54    live::row_to_json(row)
55}
56
57/// Public re-export of the filter builder so the in-tx PgTxStore can
58/// reuse the same operator surface ($eq, $like, $in, $order, $limit,
59/// $offset, $not, $gt, $gte, $lt, $lte) as the non-tx path. Returns
60/// `(sql, params)` ready to feed to either `client.query` or
61/// `transaction.query`.
62#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64    entity: &str,
65    filter: &serde_json::Value,
66    valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68    live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71/// Public re-export of the aggregate builder. Returns
72/// `(sql, params, column_names)` — the column_names list drives the
73/// per-row projection in `aggregate_rows_to_json_pub`.
74#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76    entity: &str,
77    spec: &serde_json::Value,
78    valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80    live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83/// Public re-export of the post-processing helper that converts raw
84/// aggregate `Row`s into the `{ rows: [{...}] }` JSON shape.
85#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87    rows: &[postgres::Row],
88    column_names: &[String],
89) -> serde_json::Value {
90    live::aggregate_rows_to_json(rows, column_names)
91}
92
93// ---------------------------------------------------------------------------
94// SQL generation
95// ---------------------------------------------------------------------------
96
97/// Generate a Postgres CREATE TABLE statement.
98pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101    for field in fields {
102        let col_type = pg_column_type(&field.field_type);
103        let not_null = if field.optional { "" } else { " NOT NULL" };
104        let unique = if field.unique { " UNIQUE" } else { "" };
105        columns.push(format!(
106            "{} {}{}{}",
107            quote_ident(&field.name),
108            col_type,
109            not_null,
110            unique
111        ));
112    }
113
114    format!(
115        "CREATE TABLE IF NOT EXISTS {} ({})",
116        quote_ident(entity_name),
117        columns.join(", ")
118    )
119}
120
121/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
122/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
123/// Required-ness is tracked in the manifest; enforcement deferred.
124pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125    let col_type = pg_column_type(&field.field_type);
126    let unique = if field.unique { " UNIQUE" } else { "" };
127    format!(
128        "ALTER TABLE {} ADD COLUMN {} {}{}",
129        quote_ident(entity_name),
130        quote_ident(&field.name),
131        col_type,
132        unique
133    )
134}
135
136/// Generate a Postgres CREATE INDEX statement.
137pub fn create_index_sql(
138    entity_name: &str,
139    index_name: &str,
140    fields: &[String],
141    unique: bool,
142) -> String {
143    let unique_str = if unique { "UNIQUE " } else { "" };
144    let full_index_name = format!("{}_{}", entity_name, index_name);
145    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146    format!(
147        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148        unique_str,
149        quote_ident(&full_index_name),
150        quote_ident(entity_name),
151        quoted_fields.join(", ")
152    )
153}
154
155// ---------------------------------------------------------------------------
156// PostgresAdapter — planning-only adapter
157// ---------------------------------------------------------------------------
158
159/// A Postgres storage adapter. Currently supports planning only.
160/// No live connection — SQL generation and planning from manifest.
161pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165        // Plan from empty baseline.
166        let mut operations = Vec::new();
167
168        for entity in &target.entities {
169            let fields: Vec<FieldSpec> = entity
170                .fields
171                .iter()
172                .map(|f| FieldSpec {
173                    name: f.name.clone(),
174                    field_type: f.field_type.clone(),
175                    optional: f.optional,
176                    unique: f.unique,
177                })
178                .collect();
179
180            operations.push(SchemaOperation::CreateEntity {
181                name: entity.name.clone(),
182                fields,
183            });
184
185            for index in &entity.indexes {
186                operations.push(SchemaOperation::AddIndex {
187                    entity: entity.name.clone(),
188                    name: index.name.clone(),
189                    fields: index.fields.clone(),
190                    unique: index.unique,
191                });
192            }
193        }
194
195        if operations.is_empty() {
196            operations.push(SchemaOperation::Noop);
197        }
198
199        Ok(SchemaPlan { operations })
200    }
201
202    // apply_schema intentionally not implemented — uses default trait error.
203}
204
205/// Generate all SQL statements for a plan, in order.
206/// Useful for dry-run preview of what Postgres DDL would be executed.
207pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208    let mut statements = Vec::new();
209
210    for op in &plan.operations {
211        match op {
212            SchemaOperation::CreateEntity { name, fields } => {
213                statements.push(create_table_sql(name, fields));
214            }
215            SchemaOperation::AddField { entity, field } => {
216                statements.push(add_column_sql(entity, field));
217            }
218            SchemaOperation::AlterField {
219                entity,
220                previous,
221                target,
222            } => {
223                // Only nullable transitions today. SET / DROP NOT NULL is
224                // safe on a populated table when going from required →
225                // optional (existing rows already satisfy NOT NULL); the
226                // reverse direction (optional → required) succeeds only
227                // if every row has a non-null value, which the planner
228                // has no way to know — Postgres will fail the migration
229                // if it can't and the operator gets a clear error from
230                // the apply step.
231                if previous.optional && !target.optional {
232                    statements.push(format!(
233                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234                        quote_ident(entity),
235                        quote_ident(&target.name)
236                    ));
237                } else if !previous.optional && target.optional {
238                    statements.push(format!(
239                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240                        quote_ident(entity),
241                        quote_ident(&target.name)
242                    ));
243                }
244                // Nothing emitted when neither nullable nor type changed
245                // — falls through silently. AlterField with no actual
246                // shape change shouldn't happen in practice (the planner
247                // only emits it on real drift), but guard against
248                // emitting empty SQL just in case.
249            }
250            SchemaOperation::AddIndex {
251                entity,
252                name,
253                fields,
254                unique,
255            } => {
256                statements.push(create_index_sql(entity, name, fields, *unique));
257            }
258            SchemaOperation::CreateSearchIndex { entity, config } => {
259                #[cfg(feature = "postgres-live")]
260                {
261                    statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262                }
263                #[cfg(not(feature = "postgres-live"))]
264                {
265                    let _ = (entity, config);
266                    return Err(StorageError {
267                        code: "PG_SEARCH_FEATURE_OFF".into(),
268                        message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269                    });
270                }
271            }
272            SchemaOperation::RemoveSearchIndex { entity } => {
273                // Without the original config we don't know which
274                // facet/sort indexes were created. Drop the FTS table
275                // and the GIN index by their fixed names; per-field
276                // facet/sort indexes are dropped automatically by the
277                // entity DROP path. Operators removing search via
278                // schema diff will see leftover indexes only if the
279                // entity table itself still exists — at which point
280                // the next CREATE INDEX IF NOT EXISTS catches up.
281                //
282                // quote_ident on the synthetic table/index names so a
283                // malicious entity name with embedded `"` can't break
284                // out of the identifier.
285                statements.push(format!(
286                    "DROP TABLE IF EXISTS {} CASCADE",
287                    quote_ident(&format!("_fts_{entity}"))
288                ));
289                statements.push(format!(
290                    "DROP INDEX IF EXISTS {}",
291                    quote_ident(&format!("{entity}_fts_gin"))
292                ));
293            }
294            SchemaOperation::Noop => {}
295            other => {
296                return Err(StorageError {
297                    code: "PG_OP_UNSUPPORTED".into(),
298                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
299                });
300            }
301        }
302    }
303
304    Ok(statements)
305}
306
307// ---------------------------------------------------------------------------
308// Introspection SQL helpers
309//
310// These generate the SQL queries that a live Postgres connection would run
311// to read the current schema. No connection required — just SQL strings.
312// ---------------------------------------------------------------------------
313
314/// SQL to list user tables in the public schema.
315pub const INTROSPECT_TABLES_SQL: &str = "\
316    SELECT table_name \
317    FROM information_schema.tables \
318    WHERE table_schema = 'public' \
319      AND table_type = 'BASE TABLE' \
320      AND table_name NOT LIKE '_pylon_%' \
321    ORDER BY table_name";
322
323/// SQL to list columns for a given table.
324/// Use with parameter: table_name.
325pub const INTROSPECT_COLUMNS_SQL: &str = "\
326    SELECT column_name, data_type, is_nullable, \
327           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328            JOIN information_schema.key_column_usage kcu \
329              ON tc.constraint_name = kcu.constraint_name \
330            WHERE tc.table_name = c.table_name \
331              AND kcu.column_name = c.column_name \
332              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333    FROM information_schema.columns c \
334    WHERE table_schema = 'public' AND table_name = $1 \
335    ORDER BY ordinal_position";
336
337/// SQL to list indexes for a given table.
338/// Use with parameter: table_name.
339pub const INTROSPECT_INDEXES_SQL: &str = "\
340    SELECT i.relname as index_name, \
341           ix.indisunique as is_unique, \
342           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343    FROM pg_index ix \
344    JOIN pg_class t ON t.oid = ix.indrelid \
345    JOIN pg_class i ON i.oid = ix.indexrelid \
346    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347    JOIN pg_namespace n ON n.oid = t.relnamespace \
348    WHERE n.nspname = 'public' \
349      AND t.relname = $1 \
350      AND NOT ix.indisprimary \
351    GROUP BY i.relname, ix.indisunique \
352    ORDER BY i.relname";
353
354/// Plan from a snapshot (reuses the shared plan_from_snapshot).
355/// This allows Postgres to plan incrementally once introspection data is available.
356pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357    crate::plan_from_snapshot(snapshot, target)
358}
359
360// ---------------------------------------------------------------------------
361// CRUD SQL generation helpers (used by live adapter, testable without a DB)
362// ---------------------------------------------------------------------------
363
364/// Generate a lex-sortable, monotonic-ish unique ID.
365///
366/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
367/// of a per-process atomic counter. The counter prevents collisions when two
368/// inserts hit the same nanosecond and — critically — keeps order stable: an
369/// id minted at the same nanosecond is monotonically greater than the
370/// previous one. Width is fixed at 40 chars so lexicographic comparison
371/// matches creation order, which is what cursor pagination relies on.
372pub fn generate_id() -> String {
373    use std::sync::atomic::{AtomicU32, Ordering};
374    use std::time::{SystemTime, UNIX_EPOCH};
375    static COUNTER: AtomicU32 = AtomicU32::new(0);
376    let ts = SystemTime::now()
377        .duration_since(UNIX_EPOCH)
378        .unwrap_or_default()
379        .as_nanos();
380    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381    format!("{ts:032x}{seq:08x}")
382}
383
384/// Convert a JSON value to its string representation for use as a SQL parameter.
385///
386/// Kept for back-compat with callers that need a textual fallback (e.g.
387/// human-readable logs). New code should bind through [`JsonParam`] so
388/// integers/booleans/nulls reach Postgres in their typed form instead of
389/// collapsing to TEXT, which the driver can't coerce into INTEGER /
390/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
391/// into an empty string for FK columns.
392pub fn json_value_to_string(val: &serde_json::Value) -> String {
393    match val {
394        serde_json::Value::String(s) => s.clone(),
395        serde_json::Value::Number(n) => n.to_string(),
396        serde_json::Value::Bool(b) => b.to_string(),
397        serde_json::Value::Null => String::new(),
398        other => other.to_string(),
399    }
400}
401
402/// A typed wrapper around a JSON scalar that implements
403/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
404///
405/// The previous implementation passed every value as `String`, which broke
406/// non-text columns (the postgres driver can't bind a string literal into
407/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
408/// `null` into the empty string for nullable FKs (so `unlink` left
409/// dangling `""` references instead of NULL). `JsonParam` carries the
410/// JSON variant tag through to `to_sql` so the driver can pick the
411/// correct binary representation per column type.
412///
413/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
414/// runtime layer doesn't currently model array/object columns at the
415/// manifest level on Postgres, so anything that lands here came from
416/// caller-supplied prose that's expected to fit into a TEXT column.
417#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419    Null,
420    Text(String),
421    Int(i64),
422    Float(f64),
423    Bool(bool),
424}
425
426impl JsonParam {
427    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
428    /// that fit `i64` go through as Int; everything else goes through as
429    /// Float to preserve fractional / large-magnitude values.
430    pub fn from_json(val: &serde_json::Value) -> Self {
431        match val {
432            serde_json::Value::Null => JsonParam::Null,
433            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435            serde_json::Value::Number(n) => {
436                if let Some(i) = n.as_i64() {
437                    JsonParam::Int(i)
438                } else if let Some(f) = n.as_f64() {
439                    JsonParam::Float(f)
440                } else {
441                    JsonParam::Text(n.to_string())
442                }
443            }
444            other => JsonParam::Text(other.to_string()),
445        }
446    }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451    fn to_sql(
452        &self,
453        ty: &postgres::types::Type,
454        out: &mut bytes::BytesMut,
455    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456        use postgres::types::Type;
457
458        // Null binds as SQL NULL regardless of the column's declared
459        // type — the postgres driver treats `IsNull::Yes` as a null
460        // value of the requested type.
461        if matches!(self, JsonParam::Null) {
462            return Ok(postgres::types::IsNull::Yes);
463        }
464
465        // Match each JsonParam variant against the COLUMN's declared
466        // type so the binary encoding actually fits the target slot.
467        // Postgres rejects "binary data of wrong size" if you bind an
468        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
469        // exactly what the previous "everything is a String" path did
470        // on every non-TEXT column.
471        match (self, ty) {
472            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485            (JsonParam::Text(s), &Type::TEXT)
486            | (JsonParam::Text(s), &Type::VARCHAR)
487            | (JsonParam::Text(s), &Type::BPCHAR)
488            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490                // The runtime models datetimes as ISO 8601 strings
491                // (`pylon_kernel::util::now_iso` shape, plus
492                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
493                // binary wire format is `i64` microseconds since
494                // 2000-01-01 UTC — NOT the bytes of an ISO string.
495                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
496                // which advertised TIMESTAMPTZ format but wrote raw
497                // ASCII; Postgres rejected with "incorrect binary
498                // data format in bind parameter N". This was the
499                // OAuth-callback failure mode on pylon-cloud
500                // (User.createdAt). Parse via chrono and let the
501                // postgres crate's `with-chrono-0_4` ToSql impl
502                // emit the proper binary format.
503                let dt = chrono::DateTime::parse_from_rfc3339(s)
504                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505                    .with_timezone(&chrono::Utc);
506                dt.to_sql(ty, out)
507            }
508            (JsonParam::Text(s), &Type::TIMESTAMP) => {
509                // TIMESTAMP (no timezone) — same conversion shape but
510                // bind as NaiveDateTime so chrono picks the right
511                // binary encoding for the column.
512                let dt = chrono::DateTime::parse_from_rfc3339(s)
513                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514                    .with_timezone(&chrono::Utc)
515                    .naive_utc();
516                dt.to_sql(ty, out)
517            }
518            (JsonParam::Text(s), &Type::DATE) => {
519                let dt = chrono::DateTime::parse_from_rfc3339(s)
520                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521                    .with_timezone(&chrono::Utc)
522                    .date_naive();
523                dt.to_sql(ty, out)
524            }
525
526            // Cross-type fallback: render as text and bind into a TEXT
527            // slot, OR error if the target column doesn't accept text.
528            // Catches "manifest says INT but caller sent a stringified
529            // number" — better to fail loudly than silently coerce.
530            (other, _) => {
531                let s = match other {
532                    JsonParam::Bool(b) => b.to_string(),
533                    JsonParam::Int(n) => n.to_string(),
534                    JsonParam::Float(f) => f.to_string(),
535                    JsonParam::Text(s) => s.clone(),
536                    JsonParam::Null => unreachable!(),
537                };
538                s.to_sql(ty, out)
539            }
540        }
541    }
542
543    fn accepts(_ty: &postgres::types::Type) -> bool {
544        // Defer per-variant acceptance to to_sql_checked, which dispatches
545        // to the inner type's ToSql impl. Returning `true` here matches
546        // the postgres crate's recommended pattern for sum-type wrappers.
547        true
548    }
549
550    postgres::types::to_sql_checked!();
551}
552
553/// Build an INSERT SQL statement and collect typed parameter values.
554/// Returns `(sql, params)` where `params[0]` is the generated ID
555/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
556/// value so the postgres driver can bind them to typed columns
557/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
558/// the database as SQL NULL — the previous string-collapsing path stored
559/// `""` for nullable FKs and broke any non-text column.
560pub fn build_insert_sql(
561    entity: &str,
562    data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564    let obj = data.as_object().ok_or_else(|| StorageError {
565        code: "PG_INVALID_DATA".into(),
566        message: "Insert data must be a JSON object".into(),
567    })?;
568
569    // Honor caller-supplied `id` (used by the CRDT path that needs to
570    // share the row id with the LoroDoc snapshot key it just wrote).
571    // Fall back to a fresh ULID-shaped id when absent. A non-string
572    // `id` value is rejected explicitly — silently regenerating would
573    // mask schema bugs (e.g. a caller passing an int id) and let the
574    // CRDT snapshot key drift from the materialized row id.
575    let id = match obj.get("id") {
576        None | Some(serde_json::Value::Null) => generate_id(),
577        Some(serde_json::Value::String(s)) => s.clone(),
578        Some(other) => {
579            return Err(StorageError {
580                code: "PG_INVALID_ID".into(),
581                message: format!(
582                    "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583                     are always strings (40-char hex). Drop the `id` field to let the \
584                     server generate one, or supply a string."
585                ),
586            });
587        }
588    };
589
590    let mut col_names = vec!["id".to_string()];
591    let mut placeholders = vec!["$1".to_string()];
592    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594    let mut i = 0usize;
595    for (key, val) in obj {
596        if key == "id" {
597            // Already emitted via the synthetic first column; skipping
598            // here avoids `INSERT ... (id, id, ...)` which Postgres
599            // rejects with `duplicate key value`.
600            continue;
601        }
602        col_names.push(quote_ident(key));
603        placeholders.push(format!("${}", i + 2));
604        values.push(JsonParam::from_json(val));
605        i += 1;
606    }
607
608    let sql = format!(
609        "INSERT INTO {} ({}) VALUES ({})",
610        quote_ident(entity),
611        col_names.join(", "),
612        placeholders.join(", ")
613    );
614
615    Ok((sql, values))
616}
617
618/// Build an UPDATE SQL statement and collect typed parameter values.
619/// Returns `(sql, params)` where `params[0]` is the row ID.
620pub fn build_update_sql(
621    entity: &str,
622    id: &str,
623    data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625    let obj = data.as_object().ok_or_else(|| StorageError {
626        code: "PG_INVALID_DATA".into(),
627        message: "Update data must be a JSON object".into(),
628    })?;
629
630    if obj.is_empty() {
631        return Err(StorageError {
632            code: "PG_INVALID_DATA".into(),
633            message: "Update data must contain at least one field".into(),
634        });
635    }
636
637    let mut set_clauses = Vec::new();
638    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640    let mut i = 0usize;
641    for (key, val) in obj {
642        if key == "id" {
643            // Reject primary-key mutation. Letting `id` into the SET
644            // clause lets a client move a row out from under its CRDT
645            // sidecar (which is keyed by the original row_id) and its
646            // FTS shadow row (FK-bound to the original id). The SQLite
647            // path silently drops `id` here too — keep the same
648            // shape, but errors so the caller sees the bug.
649            return Err(StorageError {
650                code: "PG_INVALID_UPDATE".into(),
651                message:
652                    "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653                     drop the field from the patch."
654                        .into(),
655            });
656        }
657        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658        values.push(JsonParam::from_json(val));
659        i += 1;
660    }
661
662    if set_clauses.is_empty() {
663        return Err(StorageError {
664            code: "PG_INVALID_DATA".into(),
665            message: "Update data must contain at least one non-id field".into(),
666        });
667    }
668
669    let sql = format!(
670        "UPDATE {} SET {} WHERE id = $1",
671        quote_ident(entity),
672        set_clauses.join(", ")
673    );
674
675    Ok((sql, values))
676}
677
678/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
679/// at insert/update/transact call sites. The postgres driver wants a
680/// slice of trait objects; this avoids repeating the same map at each site.
681#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683    values
684        .iter()
685        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686        .collect()
687}
688
689// ---------------------------------------------------------------------------
690// Live Postgres adapter (requires "postgres-live" feature)
691// ---------------------------------------------------------------------------
692
693#[cfg(feature = "postgres-live")]
694pub mod live {
695    use super::*;
696    use crate::{
697        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698    };
699
700    /// SSL parsing result for `parse_pg_url_ssl`. We need both the
701    /// cleaned-up URL (libpq-only params stripped) and the boolean
702    /// "should we use TLS for this connection" so the connect path
703    /// can pick between `NoTls` and `MakeTlsConnector`.
704    pub(super) struct PgUrlSsl {
705        pub use_tls: bool,
706    }
707
708    /// Pre-process a Postgres URL: strip libpq-specific query params
709    /// the Rust `postgres` crate's URL parser doesn't accept, and
710    /// figure out whether TLS should be enabled for the connection.
711    ///
712    /// Recognizes:
713    ///   - sslmode=disable / prefer / allow → no TLS
714    ///   - sslmode=require / verify-ca / verify-full → TLS
715    ///   - sslrootcert=system → TLS, use OS CA store (handled by
716    ///     `native-tls` automatically; we just record the intent)
717    ///   - sslrootcert=<path> → TLS, but the path is currently
718    ///     ignored — `native-tls` reads system roots only. Logged
719    ///     as a one-time warning so operators see the gap.
720    ///
721    /// Anything we don't understand is dropped from the URL silently
722    /// (defense against future libpq additions confusing the parser).
723    pub(super) fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
724        let (base, query) = match url.find('?') {
725            Some(idx) => (&url[..idx], &url[idx + 1..]),
726            None => return (url.to_string(), PgUrlSsl { use_tls: false }),
727        };
728
729        let mut use_tls = false;
730        let mut kept: Vec<String> = Vec::new();
731        for pair in query.split('&') {
732            if pair.is_empty() {
733                continue;
734            }
735            let (k, v) = match pair.find('=') {
736                Some(i) => (&pair[..i], &pair[i + 1..]),
737                None => (pair, ""),
738            };
739            match k {
740                "sslmode" => match v.to_ascii_lowercase().as_str() {
741                    "disable" | "allow" => {
742                        // Caller said "no TLS" — pass through to the
743                        // postgres crate, which knows `disable`.
744                        kept.push("sslmode=disable".to_string());
745                    }
746                    "prefer" => {
747                        // Postgres crate's default; let it through.
748                        kept.push("sslmode=prefer".to_string());
749                    }
750                    "require" | "verify-ca" | "verify-full" | "" => {
751                        // All the "use TLS" modes. Rust crate only
752                        // accepts `require`; verify-* would be
753                        // rejected. Normalize to `require` and let
754                        // our TLS connector handle cert verification.
755                        use_tls = true;
756                        kept.push("sslmode=require".to_string());
757                    }
758                    other => {
759                        tracing::warn!(
760                            "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
761                        );
762                        use_tls = true;
763                        kept.push("sslmode=require".to_string());
764                    }
765                },
766                "sslrootcert" => {
767                    // Either "system" (use OS roots — that's what
768                    // native-tls does anyway) or a path (which we
769                    // can't honor without bringing in openssl). Log
770                    // and drop in both cases; TLS still happens.
771                    if v != "system" && !v.is_empty() {
772                        tracing::warn!(
773                            "[pg] sslrootcert={v} ignored — native-tls uses system roots"
774                        );
775                    }
776                    use_tls = true;
777                }
778                _ => {
779                    // Forward everything else (search_path, application_name,
780                    // connect_timeout, etc.) so libpq-style URLs that work
781                    // for the rest of the world keep working here.
782                    kept.push(pair.to_string());
783                }
784            }
785        }
786
787        let cleaned = if kept.is_empty() {
788            base.to_string()
789        } else {
790            format!("{}?{}", base, kept.join("&"))
791        };
792        (cleaned, PgUrlSsl { use_tls })
793    }
794
795    #[cfg(test)]
796    mod url_tests {
797        use super::parse_pg_url_ssl;
798
799        #[test]
800        fn strips_libpq_only_sslmode_verify_full() {
801            let (cleaned, ssl) = parse_pg_url_ssl(
802                "postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system",
803            );
804            assert!(ssl.use_tls);
805            // verify-full normalized to require; sslrootcert dropped.
806            assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
807        }
808
809        #[test]
810        fn passes_through_disable() {
811            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
812            assert!(!ssl.use_tls);
813            assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
814        }
815
816        #[test]
817        fn no_query_string_no_tls() {
818            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
819            assert!(!ssl.use_tls);
820            assert_eq!(cleaned, "postgres://h/db");
821        }
822
823        #[test]
824        fn unknown_params_pass_through() {
825            let (cleaned, _) = parse_pg_url_ssl(
826                "postgres://h/db?application_name=pylon&connect_timeout=5",
827            );
828            assert!(cleaned.contains("application_name=pylon"));
829            assert!(cleaned.contains("connect_timeout=5"));
830        }
831
832        #[test]
833        fn sslrootcert_alone_enables_tls() {
834            // Rare but valid — sslrootcert=system implies TLS even
835            // without an explicit sslmode.
836            let (cleaned, ssl) = parse_pg_url_ssl(
837                "postgres://h/db?sslrootcert=system",
838            );
839            assert!(ssl.use_tls);
840            // No sslmode synthesized — the postgres crate defaults
841            // to `prefer` which happily upgrades to our TLS connector.
842            assert_eq!(cleaned, "postgres://h/db");
843        }
844    }
845
846    /// A live Postgres adapter with a real database connection.
847    pub struct LivePostgresAdapter {
848        client: postgres::Client,
849    }
850
851    impl LivePostgresAdapter {
852        /// Borrow the underlying postgres client mutably. Used by
853        /// `PostgresDataStore::with_transaction` to start an
854        /// interactive transaction across multiple TS-function
855        /// `ctx.db` calls. `pub(crate)` because exposing raw
856        /// `&mut Client` outside pylon-storage would let callers
857        /// issue arbitrary SQL, bypassing the typed `DataStore`
858        /// surface that the rest of the framework relies on.
859        pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
860            &mut self.client
861        }
862
863        /// Connect to a Postgres database.
864        ///
865        /// Honors libpq-style URL params the Rust postgres crate doesn't
866        /// natively understand:
867        ///   - `sslmode=verify-full`, `verify-ca`, `require` → use TLS
868        ///     via `postgres-native-tls`. The Rust crate only knows
869        ///     `disable`, `prefer`, `require` — it rejects the libpq
870        ///     extras with "invalid connection string".
871        ///   - `sslrootcert=system` → trust the OS CA store (the
872        ///     `native-tls` connector reads it via SecurityFramework /
873        ///     SChannel / openssl). `sslrootcert=<path>` is not yet
874        ///     supported and falls back to system roots with a warning.
875        ///
876        /// Strips the libpq-only params from the URL before passing to
877        /// the postgres crate's parser, so it doesn't choke. Common
878        /// real-world example that was failing pre-fix: Fly Postgres
879        /// emits `?sslmode=verify-full&sslrootcert=system` URLs.
880        pub fn connect(url: &str) -> Result<Self, StorageError> {
881            let (cleaned, ssl) = parse_pg_url_ssl(url);
882            let result = if ssl.use_tls {
883                let connector =
884                    native_tls::TlsConnector::new().map_err(|e| StorageError {
885                        code: "PG_TLS_INIT_FAILED".into(),
886                        message: format!("Failed to initialize TLS: {e}"),
887                    })?;
888                let tls = postgres_native_tls::MakeTlsConnector::new(connector);
889                postgres::Client::connect(&cleaned, tls)
890            } else {
891                postgres::Client::connect(&cleaned, postgres::NoTls)
892            };
893            let client = result.map_err(|e| StorageError {
894                code: "PG_CONNECT_FAILED".into(),
895                message: format!("Failed to connect to Postgres: {e}"),
896            })?;
897            Ok(Self { client })
898        }
899
900        /// Read the current schema from the live database.
901        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
902            let table_rows = self
903                .client
904                .query(INTROSPECT_TABLES_SQL, &[])
905                .map_err(pg_err)?;
906
907            let mut tables = Vec::new();
908            for row in &table_rows {
909                let table_name: String = row.get(0);
910                let columns = self.read_columns(&table_name)?;
911                let indexes = self.read_indexes(&table_name)?;
912                tables.push(TableSnapshot {
913                    name: table_name,
914                    columns,
915                    indexes,
916                });
917            }
918
919            Ok(SchemaSnapshot { tables })
920        }
921
922        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
923            let rows = self
924                .client
925                .query(INTROSPECT_COLUMNS_SQL, &[&table])
926                .map_err(pg_err)?;
927
928            let mut columns = Vec::new();
929            for row in &rows {
930                let name: String = row.get(0);
931                let data_type: String = row.get(1);
932                let is_nullable: String = row.get(2);
933                let is_pk: i64 = row.get(3);
934                columns.push(ColumnSnapshot {
935                    name,
936                    column_type: data_type,
937                    notnull: is_nullable == "NO",
938                    primary_key: is_pk > 0,
939                });
940            }
941            Ok(columns)
942        }
943
944        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
945            let rows = self
946                .client
947                .query(INTROSPECT_INDEXES_SQL, &[&table])
948                .map_err(pg_err)?;
949
950            let mut indexes = Vec::new();
951            for row in &rows {
952                let name: String = row.get(0);
953                let unique: bool = row.get(1);
954                let columns: Vec<String> = row.get(2);
955                indexes.push(IndexSnapshot {
956                    name,
957                    columns,
958                    unique,
959                });
960            }
961            Ok(indexes)
962        }
963
964        /// Plan from live database state.
965        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
966            let snapshot = self.read_schema()?;
967            Ok(crate::plan_from_snapshot(&snapshot, target))
968        }
969    }
970
971    impl StorageAdapter for LivePostgresAdapter {
972        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
973            Err(StorageError {
974                code: "PG_PLAN_NEEDS_MUTABLE".into(),
975                message: "Use plan_from_live() instead for live Postgres planning".into(),
976            })
977        }
978
979        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
980            Err(StorageError {
981                code: "PG_APPLY_USE_METHOD".into(),
982                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
983            })
984        }
985    }
986
987    impl LivePostgresAdapter {
988        /// Apply a schema plan to the live database.
989        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
990            let statements = plan_to_sql(plan)?;
991            for sql in &statements {
992                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
993            }
994            Ok(())
995        }
996
997        /// Execute a raw SQL statement against the live database. Used by
998        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
999        /// production code should go through `apply_plan` so changes are
1000        /// represented in the migration history. Returns the number of
1001        /// rows affected.
1002        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1003            self.client.execute(sql, &[]).map_err(pg_err)
1004        }
1005
1006        /// Insert a row. Returns the generated ID.
1007        pub fn insert(
1008            &mut self,
1009            entity: &str,
1010            data: &serde_json::Value,
1011        ) -> Result<String, StorageError> {
1012            let (sql, values) = build_insert_sql(entity, data)?;
1013            // The first param is always the generated ID — extract it before
1014            // we hand `values` off to the postgres driver as borrowed slices.
1015            let id = match &values[0] {
1016                JsonParam::Text(s) => s.clone(),
1017                _ => {
1018                    return Err(StorageError {
1019                        code: "PG_INTERNAL".into(),
1020                        message: "build_insert_sql produced non-text id param".into(),
1021                    });
1022                }
1023            };
1024            let params = as_pg_params(&values);
1025            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1026            Ok(id)
1027        }
1028
1029        /// Get a row by ID.
1030        pub fn get_by_id(
1031            &mut self,
1032            entity: &str,
1033            id: &str,
1034        ) -> Result<Option<serde_json::Value>, StorageError> {
1035            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1036            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1037
1038            match rows.first() {
1039                Some(row) => Ok(Some(row_to_json(row))),
1040                None => Ok(None),
1041            }
1042        }
1043
1044        /// List all rows from an entity.
1045        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1046            let sql = format!("SELECT * FROM {}", quote_ident(entity));
1047            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1048
1049            Ok(rows.iter().map(row_to_json).collect())
1050        }
1051
1052        /// Cursor-paginated list. `after` is the last `id` from the previous
1053        /// page; the result contains rows with `id > after` (lex order),
1054        /// limited to `limit`. Used for sync push/pull.
1055        pub fn list_after(
1056            &mut self,
1057            entity: &str,
1058            after: Option<&str>,
1059            limit: usize,
1060        ) -> Result<Vec<serde_json::Value>, StorageError> {
1061            // Cap limit at a sensible upper bound so a malicious client can't
1062            // stream the whole table by passing limit=u64::MAX.
1063            let capped: i64 = limit.min(10_000) as i64;
1064            let sql = match after {
1065                Some(_) => format!(
1066                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1067                    quote_ident(entity)
1068                ),
1069                None => format!(
1070                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1071                    quote_ident(entity)
1072                ),
1073            };
1074            let rows = match after {
1075                Some(cursor) => self
1076                    .client
1077                    .query(sql.as_str(), &[&cursor, &capped])
1078                    .map_err(pg_err)?,
1079                None => self
1080                    .client
1081                    .query(sql.as_str(), &[&capped])
1082                    .map_err(pg_err)?,
1083            };
1084            Ok(rows.iter().map(row_to_json).collect())
1085        }
1086
1087        /// Update a row by ID. Returns true if the row was found and updated.
1088        pub fn update(
1089            &mut self,
1090            entity: &str,
1091            id: &str,
1092            data: &serde_json::Value,
1093        ) -> Result<bool, StorageError> {
1094            let (sql, values) = build_update_sql(entity, id, data)?;
1095            let params = as_pg_params(&values);
1096            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1097            Ok(rows_affected > 0)
1098        }
1099
1100        /// Delete a row by ID. Returns true if the row was found and deleted.
1101        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1102            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1103            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1104            Ok(rows_affected > 0)
1105        }
1106
1107        /// Look up a row by `field = value`. Caller must validate `field`
1108        /// against the manifest before calling — we still `quote_ident` it
1109        /// but won't catch a typo against the entity definition.
1110        pub fn lookup_field(
1111            &mut self,
1112            entity: &str,
1113            field: &str,
1114            value: &str,
1115        ) -> Result<Option<serde_json::Value>, StorageError> {
1116            let sql = format!(
1117                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1118                quote_ident(entity),
1119                quote_ident(field),
1120            );
1121            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1122            Ok(rows.first().map(row_to_json))
1123        }
1124
1125        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
1126        ///
1127        /// Supported operators (parity with the SQLite path):
1128        /// - Equality (`field: value`)
1129        /// - `$not`: emits `field != value`
1130        /// - `$gt` / `$gte` / `$lt` / `$lte`
1131        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
1132        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
1133        ///   if/when the SQLite side adds it)
1134        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
1135        ///
1136        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
1137        ///
1138        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
1139        /// would need a tsvector column or a generic ILIKE OR-fold across
1140        /// every text field, neither of which is wired up yet. Returns
1141        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
1142        /// receiving silently-broad results.
1143        ///
1144        /// Anything else is silently ignored (matches the in-memory fallback's
1145        /// permissive behavior). Field names are validated against `valid_columns`
1146        /// to prevent SQL injection — pass the entity's column set.
1147        pub fn query_filtered(
1148            &mut self,
1149            entity: &str,
1150            filter: &serde_json::Value,
1151            valid_columns: &[String],
1152        ) -> Result<Vec<serde_json::Value>, StorageError> {
1153            let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1154            let pg_params = as_pg_params(&params);
1155            let rows = self
1156                .client
1157                .query(sql.as_str(), &pg_params)
1158                .map_err(pg_err)?;
1159            Ok(rows.iter().map(row_to_json).collect())
1160        }
1161
1162        /// Build the `SELECT ... FROM entity ...` SQL + bound params for
1163        /// a `query_filtered` request. Pure: takes a manifest's column
1164        /// list, returns text. Both the live adapter and the in-tx
1165        /// `PgTxStore` call this so the operator surface ($eq, $like,
1166        /// $in, $order, $limit, $offset) stays identical regardless of
1167        /// where the query runs.
1168        pub(crate) fn build_query_filtered_sql(
1169            entity: &str,
1170            filter: &serde_json::Value,
1171            valid_columns: &[String],
1172        ) -> Result<(String, Vec<JsonParam>), StorageError> {
1173            let empty = serde_json::Map::new();
1174            let obj = filter.as_object().unwrap_or(&empty);
1175
1176            let validate = |col: &str| -> Result<(), StorageError> {
1177                if col == "id" || valid_columns.iter().any(|c| c == col) {
1178                    Ok(())
1179                } else {
1180                    Err(StorageError {
1181                        code: "UNKNOWN_COLUMN".into(),
1182                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1183                    })
1184                }
1185            };
1186
1187            let mut where_clauses: Vec<String> = Vec::new();
1188            let mut order_clause = String::new();
1189            let mut limit_clause = String::new();
1190            let mut offset_clause = String::new();
1191            // Collect (col, op, value) so placeholder numbers can be assigned
1192            // in a single materialization pass after the parse loop. Values
1193            // are now JsonParam (typed) instead of String — see `value_to_pg`.
1194            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1195
1196            for (key, val) in obj {
1197                match key.as_str() {
1198                    "$search" => {
1199                        // PG full-text via the entity's `_fts_<entity>`
1200                        // shadow table: `id IN (SELECT entity_id FROM
1201                        // _fts_<E> WHERE tsv @@ plainto_tsquery(...))`.
1202                        // Mirrors the SQLite path that joins the FTS5
1203                        // virtual table; the join here is a subquery so
1204                        // it composes with arbitrary other predicates
1205                        // (`$gt`, `$in`, etc.) in the same WHERE.
1206                        let raw = match val {
1207                            serde_json::Value::String(s) => s.clone(),
1208                            other => other.to_string(),
1209                        };
1210                        // Bind as a normal text param so all the
1211                        // existing placeholder-numbering and binding
1212                        // logic applies. The SQL uses the placeholder
1213                        // inside `plainto_tsquery('english', $N)`.
1214                        // Positioning logic mirrors the `$in` arm: the
1215                        // value will land at `planned.len()+1` because
1216                        // the materialization pass below pushes one
1217                        // param per planned item in order.
1218                        let placeholder_n = planned.len() + 1;
1219                        where_clauses.push(format!(
1220                            "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1221                                       WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1222                            quote_ident(entity),
1223                        ));
1224                        // Reuse the IN-style sentinel so the
1225                        // materialization pass below pushes the param
1226                        // without re-emitting a where_clause for it.
1227                        planned.push((
1228                            format!("__search_{}", planned.len()),
1229                            "__INLINE__".into(),
1230                            JsonParam::Text(raw),
1231                        ));
1232                    }
1233                    "$order" => {
1234                        if let Some(ord) = val.as_object() {
1235                            let mut parts = Vec::new();
1236                            for (col, dir) in ord {
1237                                validate(col)?;
1238                                let d = match dir.as_str().unwrap_or("asc") {
1239                                    "desc" | "DESC" => "DESC",
1240                                    _ => "ASC",
1241                                };
1242                                parts.push(format!("{} {d}", quote_ident(col)));
1243                            }
1244                            if !parts.is_empty() {
1245                                order_clause = format!(" ORDER BY {}", parts.join(", "));
1246                            }
1247                        }
1248                    }
1249                    "$limit" => {
1250                        if let Some(n) = val.as_u64() {
1251                            limit_clause = format!(" LIMIT {}", n);
1252                        }
1253                    }
1254                    "$offset" => {
1255                        if let Some(n) = val.as_u64() {
1256                            offset_clause = format!(" OFFSET {}", n);
1257                        }
1258                    }
1259                    field => {
1260                        validate(field)?;
1261                        match val {
1262                            serde_json::Value::Object(ops) => {
1263                                for (op, v) in ops {
1264                                    match op.as_str() {
1265                                        "$not" => planned.push((
1266                                            field.into(),
1267                                            "!=".into(),
1268                                            value_to_pg(v),
1269                                        )),
1270                                        "$gt" => {
1271                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
1272                                        }
1273                                        "$gte" => planned.push((
1274                                            field.into(),
1275                                            ">=".into(),
1276                                            value_to_pg(v),
1277                                        )),
1278                                        "$lt" => {
1279                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
1280                                        }
1281                                        "$lte" => planned.push((
1282                                            field.into(),
1283                                            "<=".into(),
1284                                            value_to_pg(v),
1285                                        )),
1286                                        "$like" => {
1287                                            // Wrap in `%...%` to match the
1288                                            // SQLite path's substring
1289                                            // semantics. Pre-fix divergence:
1290                                            // SQLite wrapped, PG forwarded
1291                                            // literally — `{name: {$like: "ann"}}`
1292                                            // matched "Joanne" on SQLite but
1293                                            // nothing on PG. Caller-supplied
1294                                            // wildcards inside the value still
1295                                            // work (`%j_n%` etc.) because we
1296                                            // only add wraps, never strip.
1297                                            let raw = match v {
1298                                                serde_json::Value::String(s) => s.clone(),
1299                                                other => other.to_string(),
1300                                            };
1301                                            planned.push((
1302                                                field.into(),
1303                                                "LIKE".into(),
1304                                                JsonParam::Text(format!("%{raw}%")),
1305                                            ));
1306                                        }
1307                                        "$in" => {
1308                                            if let Some(arr) = v.as_array() {
1309                                                if arr.is_empty() {
1310                                                    // `field IN ()` is invalid
1311                                                    // SQL on PG (and on SQLite
1312                                                    // too, technically — its
1313                                                    // path also short-circuits).
1314                                                    // An empty $in matches
1315                                                    // nothing; emit a guaranteed-
1316                                                    // false predicate so the
1317                                                    // parser doesn't choke and
1318                                                    // the result set comes back
1319                                                    // empty.
1320                                                    where_clauses.push("FALSE".into());
1321                                                } else {
1322                                                    let placeholders: Vec<String> = (0..arr.len())
1323                                                        .map(|i| {
1324                                                            format!("${}", planned.len() + 1 + i)
1325                                                        })
1326                                                        .collect();
1327                                                    where_clauses.push(format!(
1328                                                        "{} IN ({})",
1329                                                        quote_ident(field),
1330                                                        placeholders.join(", "),
1331                                                    ));
1332                                                    for x in arr {
1333                                                        planned.push((
1334                                                            format!("__inline_{}", planned.len()),
1335                                                            "__INLINE__".into(),
1336                                                            value_to_pg(x),
1337                                                        ));
1338                                                    }
1339                                                }
1340                                            }
1341                                        }
1342                                        _ => {}
1343                                    }
1344                                }
1345                            }
1346                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1347                        }
1348                    }
1349                }
1350            }
1351
1352            // Materialize planned -> SQL + params.
1353            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1354            for (field, op, v) in &planned {
1355                if op == "__INLINE__" {
1356                    // Already emitted via the IN-clause path; just push the value.
1357                    params.push(v.clone());
1358                } else {
1359                    let placeholder = format!("${}", params.len() + 1);
1360                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1361                    params.push(v.clone());
1362                }
1363            }
1364
1365            let where_sql = if where_clauses.is_empty() {
1366                String::new()
1367            } else {
1368                format!(" WHERE {}", where_clauses.join(" AND "))
1369            };
1370            // Default deterministic order when the caller didn't pass
1371            // `$order` — matches the SQLite path. Without this,
1372            // identical queries return rows in different orders across
1373            // backends, which makes paginated APIs flaky.
1374            let final_order = if order_clause.is_empty() {
1375                format!(" ORDER BY {}", quote_ident("id"))
1376            } else {
1377                order_clause
1378            };
1379            let sql = format!(
1380                "SELECT * FROM {}{}{}{}{}",
1381                quote_ident(entity),
1382                where_sql,
1383                final_order,
1384                limit_clause,
1385                offset_clause,
1386            );
1387
1388            Ok((sql, params))
1389        }
1390
1391        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1392        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1393        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1394        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1395        /// via `date_trunc`), and a flat-equality `where` filter.
1396        ///
1397        /// Spec format (same JSON shape used by the SQLite path):
1398        /// ```json
1399        /// { "count": "*",
1400        ///   "sum": ["amount"],
1401        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1402        ///   "where": {"status": "paid"} }
1403        /// ```
1404        ///
1405        /// `valid_columns` is used to validate every field name before it's
1406        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1407        /// `DataStore` impl in this crate) supplies the entity's column set
1408        /// from the manifest.
1409        pub fn aggregate(
1410            &mut self,
1411            entity: &str,
1412            spec: &serde_json::Value,
1413            valid_columns: &[String],
1414        ) -> Result<serde_json::Value, StorageError> {
1415            let (sql, params, column_names) =
1416                Self::build_aggregate_sql(entity, spec, valid_columns)?;
1417            let pg_params = as_pg_params(&params);
1418            let rows = self
1419                .client
1420                .query(sql.as_str(), &pg_params)
1421                .map_err(pg_err)?;
1422            Ok(aggregate_rows_to_json(&rows, &column_names))
1423        }
1424
1425        /// Build the aggregate `SELECT` SQL + bound params + the
1426        /// expected output column names. Pure: takes the entity's
1427        /// validated column list, returns text. Both the live adapter
1428        /// and the in-tx `PgTxStore` call this so spec parsing
1429        /// (validation, bucket vocabulary, where-clause translation)
1430        /// stays identical regardless of where the query runs.
1431        pub(crate) fn build_aggregate_sql(
1432            entity: &str,
1433            spec: &serde_json::Value,
1434            valid_columns: &[String],
1435        ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1436            let obj = spec.as_object().ok_or_else(|| StorageError {
1437                code: "INVALID_QUERY".into(),
1438                message: "aggregate spec must be a JSON object".into(),
1439            })?;
1440
1441            let validate = |col: &str| -> Result<(), StorageError> {
1442                if col == "id" || valid_columns.iter().any(|c| c == col) {
1443                    Ok(())
1444                } else {
1445                    Err(StorageError {
1446                        code: "UNKNOWN_COLUMN".into(),
1447                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1448                    })
1449                }
1450            };
1451
1452            let mut select_parts: Vec<String> = Vec::new();
1453            let mut result_fields: Vec<String> = Vec::new();
1454
1455            if let Some(count) = obj.get("count") {
1456                match count {
1457                    serde_json::Value::String(s) if s == "*" => {
1458                        select_parts.push("COUNT(*) AS count".into());
1459                        result_fields.push("count".into());
1460                    }
1461                    serde_json::Value::String(field) => {
1462                        validate(field)?;
1463                        let alias = format!("count_{field}");
1464                        select_parts.push(format!(
1465                            "COUNT({}) AS {}",
1466                            quote_ident(field),
1467                            quote_ident(&alias),
1468                        ));
1469                        result_fields.push(alias);
1470                    }
1471                    _ => {}
1472                }
1473            }
1474
1475            for (fn_name, prefix) in [
1476                ("sum", "sum_"),
1477                ("avg", "avg_"),
1478                ("min", "min_"),
1479                ("max", "max_"),
1480            ] {
1481                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1482                    for field in fields {
1483                        if let Some(f) = field.as_str() {
1484                            validate(f)?;
1485                            let alias = format!("{prefix}{f}");
1486                            let sql_fn = fn_name.to_uppercase();
1487                            select_parts.push(format!(
1488                                "{}({}) AS {}",
1489                                sql_fn,
1490                                quote_ident(f),
1491                                quote_ident(&alias),
1492                            ));
1493                            result_fields.push(alias);
1494                        }
1495                    }
1496                }
1497            }
1498
1499            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1500                for field in fields {
1501                    if let Some(f) = field.as_str() {
1502                        validate(f)?;
1503                        let alias = format!("count_distinct_{f}");
1504                        select_parts.push(format!(
1505                            "COUNT(DISTINCT {}) AS {}",
1506                            quote_ident(f),
1507                            quote_ident(&alias),
1508                        ));
1509                        result_fields.push(alias);
1510                    }
1511                }
1512            }
1513
1514            // groupBy: column name or { field, bucket } — same vocabulary as
1515            // the SQLite path. Buckets translate to Postgres `date_trunc`
1516            // (SQLite uses `strftime`); both collapse rows to the bucket
1517            // boundary identically.
1518            let mut group_by: Vec<String> = Vec::new();
1519            let mut group_select: Vec<String> = Vec::new();
1520            let mut group_field_names: Vec<String> = Vec::new();
1521            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1522                for g in groups {
1523                    if let Some(f) = g.as_str() {
1524                        validate(f)?;
1525                        let q = quote_ident(f);
1526                        group_by.push(q.clone());
1527                        group_select.push(q);
1528                        group_field_names.push(f.to_string());
1529                    } else if let Some(spec) = g.as_object() {
1530                        let field =
1531                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1532                                StorageError {
1533                                    code: "INVALID_QUERY".into(),
1534                                    message: "groupBy object spec requires `field`".into(),
1535                                }
1536                            })?;
1537                        validate(field)?;
1538                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1539                        let trunc_unit = match bucket {
1540                            "hour" | "day" | "week" | "month" | "year" => bucket,
1541                            _ => {
1542                                return Err(StorageError {
1543                                    code: "INVALID_QUERY".into(),
1544                                    message: format!(
1545                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1546                                    ),
1547                                });
1548                            }
1549                        };
1550                        let alias = format!("{field}_{bucket}");
1551                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1552                        group_by.push(expr.clone());
1553                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1554                        group_field_names.push(alias);
1555                    }
1556                }
1557            }
1558
1559            let mut full_select = group_select.clone();
1560            full_select.extend(select_parts.iter().cloned());
1561            if full_select.is_empty() {
1562                return Err(StorageError {
1563                    code: "INVALID_QUERY".into(),
1564                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1565                });
1566            }
1567
1568            let mut where_clauses: Vec<String> = Vec::new();
1569            let mut params: Vec<JsonParam> = Vec::new();
1570            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1571                for (k, v) in w {
1572                    validate(k)?;
1573                    let placeholder = format!("${}", params.len() + 1);
1574                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1575                    params.push(value_to_pg(v));
1576                }
1577            }
1578            let where_sql = if where_clauses.is_empty() {
1579                String::new()
1580            } else {
1581                format!(" WHERE {}", where_clauses.join(" AND "))
1582            };
1583            let group_sql = if group_by.is_empty() {
1584                String::new()
1585            } else {
1586                format!(" GROUP BY {}", group_by.join(", "))
1587            };
1588
1589            let sql = format!(
1590                "SELECT {} FROM {}{}{}",
1591                full_select.join(", "),
1592                quote_ident(entity),
1593                where_sql,
1594                group_sql,
1595            );
1596
1597            let column_names: Vec<String> = group_field_names
1598                .iter()
1599                .chain(result_fields.iter())
1600                .cloned()
1601                .collect();
1602
1603            Ok((sql, params, column_names))
1604        }
1605    }
1606
1607    /// Project rows from an aggregate `SELECT` into the
1608    /// `{ rows: [{...}] }` JSON shape both the SQLite path and the
1609    /// PG path return. Pure post-processing — works on rows produced
1610    /// from either `Client::query` or `Transaction::query`.
1611    pub fn aggregate_rows_to_json(
1612        rows: &[postgres::Row],
1613        column_names: &[String],
1614    ) -> serde_json::Value {
1615        let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1616        for row in rows {
1617            let row_json = row_to_json(row);
1618            if let serde_json::Value::Object(map) = &row_json {
1619                let mut filtered = serde_json::Map::new();
1620                for name in column_names {
1621                    if let Some(v) = map.get(name) {
1622                        filtered.insert(name.clone(), v.clone());
1623                    }
1624                }
1625                out.push(serde_json::Value::Object(filtered));
1626            } else {
1627                out.push(row_json);
1628            }
1629        }
1630        serde_json::json!({ "rows": out })
1631    }
1632
1633    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1634    pub enum TxOp<'a> {
1635        Insert {
1636            entity: &'a str,
1637            data: &'a serde_json::Value,
1638        },
1639        Update {
1640            entity: &'a str,
1641            id: &'a str,
1642            data: &'a serde_json::Value,
1643        },
1644        Delete {
1645            entity: &'a str,
1646            id: &'a str,
1647        },
1648    }
1649
1650    /// Result of a single op inside a transaction.
1651    #[derive(Debug, Clone)]
1652    pub enum TxResult {
1653        Inserted(String),
1654        Updated(bool),
1655        Deleted(bool),
1656    }
1657
1658    impl LivePostgresAdapter {
1659        /// Run `ops` inside a single Postgres transaction. Either all of them
1660        /// commit together or none of them do — there is no partial state on
1661        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1662        /// guard is dropped without `commit()` being called.
1663        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1664            let mut tx = self.client.transaction().map_err(pg_err)?;
1665            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1666
1667            for op in ops {
1668                match op {
1669                    TxOp::Insert { entity, data } => {
1670                        let (sql, values) = build_insert_sql(entity, data)?;
1671                        let id = match &values[0] {
1672                            JsonParam::Text(s) => s.clone(),
1673                            _ => {
1674                                return Err(StorageError {
1675                                    code: "PG_INTERNAL".into(),
1676                                    message: "build_insert_sql produced non-text id param".into(),
1677                                });
1678                            }
1679                        };
1680                        let params = as_pg_params(&values);
1681                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1682                        results.push(TxResult::Inserted(id));
1683                    }
1684                    TxOp::Update { entity, id, data } => {
1685                        let (sql, values) = build_update_sql(entity, id, data)?;
1686                        let params = as_pg_params(&values);
1687                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1688                        results.push(TxResult::Updated(n > 0));
1689                    }
1690                    TxOp::Delete { entity, id } => {
1691                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1692                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1693                        results.push(TxResult::Deleted(n > 0));
1694                    }
1695                }
1696            }
1697
1698            tx.commit().map_err(pg_err)?;
1699            Ok(results)
1700        }
1701    }
1702
1703    /// Lift a JSON value into a typed Postgres parameter. The previous
1704    /// implementation collapsed everything to `String`, which silently
1705    /// stringified ints/bools and turned JSON `null` into `""` for
1706    /// nullable columns. Forwarding through `JsonParam` keeps the column
1707    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1708    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1709        JsonParam::from_json(v)
1710    }
1711
1712    pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1713        use postgres::types::Type;
1714        let mut obj = serde_json::Map::new();
1715        for (i, col) in row.columns().iter().enumerate() {
1716            let name = col.name().to_string();
1717
1718            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1719            // and a panic in a query handler poisons the connection mutex,
1720            // taking down all subsequent reads on this datastore. Anything
1721            // that fails to decode becomes Null with a one-shot warning.
1722            //
1723            // Timestamps and the catch-all path explicitly DON'T request
1724            // `String` — the postgres crate uses binary protocol by default
1725            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1726            // ask for `Vec<u8>` and lossy-stringify, which works for all
1727            // text-shaped columns in either protocol.
1728            let value: serde_json::Value = match *col.type_() {
1729                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1730                    .flatten()
1731                    .map(serde_json::Value::Bool)
1732                    .unwrap_or(serde_json::Value::Null),
1733                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1734                    .flatten()
1735                    .map(|v| serde_json::Value::Number(v.into()))
1736                    .unwrap_or(serde_json::Value::Null),
1737                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1738                    .flatten()
1739                    .map(|v| serde_json::Value::Number(v.into()))
1740                    .unwrap_or(serde_json::Value::Null),
1741                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1742                    .flatten()
1743                    .map(|v| serde_json::Value::Number(v.into()))
1744                    .unwrap_or(serde_json::Value::Null),
1745                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1746                    .flatten()
1747                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1748                    .map(serde_json::Value::Number)
1749                    .unwrap_or(serde_json::Value::Null),
1750                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1751                    .flatten()
1752                    .and_then(serde_json::Number::from_f64)
1753                    .map(serde_json::Value::Number)
1754                    .unwrap_or(serde_json::Value::Null),
1755                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1756                    .flatten()
1757                    .unwrap_or(serde_json::Value::Null),
1758                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1759                    .flatten()
1760                    .map(|b| serde_json::Value::String(b64(&b)))
1761                    .unwrap_or(serde_json::Value::Null),
1762                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1763                    try_get_or_null::<Option<String>>(row, i)
1764                        .flatten()
1765                        .map(serde_json::Value::String)
1766                        .unwrap_or(serde_json::Value::Null)
1767                }
1768                Type::TIMESTAMPTZ => {
1769                    // Decode via chrono::DateTime<Utc> (postgres's
1770                    // `with-chrono-0_4` feature provides FromSql) and
1771                    // re-format as ISO 8601 — the shape pylon's clients
1772                    // expect (matches `pylon_kernel::util::now_iso`,
1773                    // so timestamps round-trip with the same surface
1774                    // across SQLite + PG).
1775                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1776                        .flatten()
1777                        .map(|dt| {
1778                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1779                        })
1780                        .unwrap_or(serde_json::Value::Null)
1781                }
1782                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1783                    .flatten()
1784                    .map(|dt| {
1785                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1786                    })
1787                    .unwrap_or(serde_json::Value::Null),
1788                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1789                    .flatten()
1790                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1791                    .unwrap_or(serde_json::Value::Null),
1792                _ => {
1793                    // Last resort: ask Postgres to render anything else as
1794                    // text via a stringifying decode through Vec<u8>. If even
1795                    // that fails (rare — Postgres types not implementing the
1796                    // text format), fall through to Null with a warning.
1797                    match row.try_get::<_, Option<String>>(i) {
1798                        Ok(Some(s)) => serde_json::Value::String(s),
1799                        Ok(None) => serde_json::Value::Null,
1800                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1801                            Ok(Some(bytes)) => serde_json::Value::String(
1802                                String::from_utf8_lossy(&bytes).into_owned(),
1803                            ),
1804                            _ => serde_json::Value::Null,
1805                        },
1806                    }
1807                }
1808            };
1809            obj.insert(name, value);
1810        }
1811        serde_json::Value::Object(obj)
1812    }
1813
1814    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1815    where
1816        T: postgres::types::FromSql<'a>,
1817    {
1818        match row.try_get::<_, T>(i) {
1819            Ok(v) => Some(v),
1820            Err(e) => {
1821                tracing::warn!(
1822                    "[postgres] decode failed for column {} ({}): {e}",
1823                    i,
1824                    row.columns()[i].name()
1825                );
1826                None
1827            }
1828        }
1829    }
1830
1831    /// Minimal base64 encoder so we don't need another dependency just for
1832    /// the BYTEA column edge case.
1833    fn b64(bytes: &[u8]) -> String {
1834        const TABLE: &[u8; 64] =
1835            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1836        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1837        let chunks = bytes.chunks(3);
1838        for chunk in chunks {
1839            let b = [
1840                chunk.first().copied().unwrap_or(0),
1841                chunk.get(1).copied().unwrap_or(0),
1842                chunk.get(2).copied().unwrap_or(0),
1843            ];
1844            out.push(TABLE[(b[0] >> 2) as usize] as char);
1845            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1846            if chunk.len() > 1 {
1847                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1848            } else {
1849                out.push('=');
1850            }
1851            if chunk.len() > 2 {
1852                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1853            } else {
1854                out.push('=');
1855            }
1856        }
1857        out
1858    }
1859
1860    fn pg_err(e: postgres::Error) -> StorageError {
1861        // postgres::Error's Display is intentionally short ("db error",
1862        // "connection error" etc.) — the actual SQLSTATE / detail lives
1863        // on the source chain. Walk the chain so the final message has
1864        // enough signal to debug a failed insert/update without
1865        // attaching a debugger.
1866        use std::error::Error;
1867        let mut detail = format!("{e}");
1868        let mut src: Option<&dyn Error> = e.source();
1869        while let Some(s) = src {
1870            detail.push_str(": ");
1871            detail.push_str(&format!("{s}"));
1872            src = s.source();
1873        }
1874        StorageError {
1875            code: "PG_QUERY_FAILED".into(),
1876            message: format!("Postgres query failed: {detail}"),
1877        }
1878    }
1879}
1880
1881// ---------------------------------------------------------------------------
1882// Tests
1883// ---------------------------------------------------------------------------
1884
1885#[cfg(test)]
1886mod tests {
1887    use super::*;
1888
1889    /// Hand-rolled fixture that matches the snapshots in the tests
1890    /// below. Decoupled from any example's `pylon.manifest.json` so
1891    /// changing an example schema doesn't bleed into adapter tests.
1892    fn test_manifest() -> AppManifest {
1893        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1894        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1895            name: name.into(),
1896            field_type: ty.into(),
1897            optional: opt,
1898            unique: uniq,
1899            crdt: None,
1900        };
1901        AppManifest {
1902            manifest_version: 1,
1903            name: "test".into(),
1904            version: "0.0.0".into(),
1905            entities: vec![
1906                ManifestEntity {
1907                    name: "User".into(),
1908                    fields: vec![
1909                        f("email", "string", false, true),
1910                        f("displayName", "string", false, false),
1911                        f("createdAt", "datetime", false, false),
1912                    ],
1913                    indexes: vec![],
1914                    relations: vec![],
1915                    search: None,
1916                    crdt: true,
1917                },
1918                ManifestEntity {
1919                    name: "Todo".into(),
1920                    fields: vec![
1921                        f("title", "string", false, false),
1922                        f("done", "bool", false, false),
1923                        f("userId", "id(User)", false, false),
1924                        f("createdAt", "datetime", false, false),
1925                    ],
1926                    indexes: vec![ManifestIndex {
1927                        name: "by_user".into(),
1928                        fields: vec!["userId".into()],
1929                        unique: false,
1930                    }],
1931                    relations: vec![],
1932                    search: None,
1933                    crdt: true,
1934                },
1935            ],
1936            queries: vec![],
1937            actions: vec![],
1938            policies: vec![],
1939            routes: vec![],
1940            auth: Default::default(),
1941        }
1942    }
1943
1944    #[test]
1945    fn pg_type_mapping() {
1946        assert_eq!(pg_column_type("string"), "TEXT");
1947        assert_eq!(pg_column_type("int"), "INTEGER");
1948        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1949        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1950        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1951        assert_eq!(pg_column_type("richtext"), "TEXT");
1952        assert_eq!(pg_column_type("id(User)"), "TEXT");
1953    }
1954
1955    #[test]
1956    fn quote_ident_simple() {
1957        assert_eq!(quote_ident("User"), "\"User\"");
1958        assert_eq!(quote_ident("email"), "\"email\"");
1959    }
1960
1961    #[test]
1962    fn quote_ident_escapes_embedded_double_quotes() {
1963        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1964        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1965    }
1966
1967    #[test]
1968    fn create_table_sql_basic() {
1969        let fields = vec![
1970            FieldSpec {
1971                name: "email".into(),
1972                field_type: "string".into(),
1973                optional: false,
1974                unique: true,
1975            },
1976            FieldSpec {
1977                name: "age".into(),
1978                field_type: "int".into(),
1979                optional: true,
1980                unique: false,
1981            },
1982        ];
1983        let sql = create_table_sql("User", &fields);
1984        assert_eq!(
1985            sql,
1986            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1987        );
1988    }
1989
1990    #[test]
1991    fn create_table_sql_escapes_identifiers() {
1992        let fields = vec![FieldSpec {
1993            name: "col\"x".into(),
1994            field_type: "string".into(),
1995            optional: false,
1996            unique: false,
1997        }];
1998        let sql = create_table_sql("my\"table", &fields);
1999        assert!(sql.contains("\"my\"\"table\""));
2000        assert!(sql.contains("\"col\"\"x\""));
2001    }
2002
2003    #[test]
2004    fn create_index_sql_unique() {
2005        let sql = create_index_sql("User", "by_email", &["email".into()], true);
2006        assert_eq!(
2007            sql,
2008            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2009        );
2010    }
2011
2012    #[test]
2013    fn create_index_sql_non_unique() {
2014        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2015        assert_eq!(
2016            sql,
2017            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2018        );
2019    }
2020
2021    #[test]
2022    fn add_column_sql_basic() {
2023        let field = FieldSpec {
2024            name: "bio".into(),
2025            field_type: "string".into(),
2026            optional: true,
2027            unique: false,
2028        };
2029        let sql = add_column_sql("User", &field);
2030        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2031    }
2032
2033    #[test]
2034    fn plan_from_manifest() {
2035        let adapter = PostgresAdapter;
2036        let manifest = test_manifest();
2037        let plan = adapter.plan_schema(&manifest).unwrap();
2038
2039        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
2040        assert!(plan.operations.iter().any(|op| matches!(
2041            op,
2042            SchemaOperation::CreateEntity { name, .. } if name == "User"
2043        )));
2044        assert!(plan.operations.iter().any(|op| matches!(
2045            op,
2046            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2047        )));
2048        assert!(plan.operations.iter().any(|op| matches!(
2049            op,
2050            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2051        )));
2052    }
2053
2054    #[test]
2055    fn plan_to_sql_produces_statements() {
2056        let adapter = PostgresAdapter;
2057        let manifest = test_manifest();
2058        let plan = adapter.plan_schema(&manifest).unwrap();
2059        let stmts = plan_to_sql(&plan).unwrap();
2060
2061        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
2062        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
2063        // declares a unique by_email index on User which lands as part of
2064        // the table. Final count: 2 tables + 2 indexes.
2065        let create_tables = stmts
2066            .iter()
2067            .filter(|s| s.starts_with("CREATE TABLE"))
2068            .count();
2069        let create_indexes = stmts
2070            .iter()
2071            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2072            .count();
2073        assert_eq!(create_tables, 2);
2074        assert!(create_indexes >= 1);
2075        assert!(stmts[0].starts_with("CREATE TABLE"));
2076        assert!(stmts[1].starts_with("CREATE TABLE"));
2077    }
2078
2079    #[test]
2080    fn plan_to_sql_rejects_unsupported() {
2081        let plan = SchemaPlan {
2082            operations: vec![SchemaOperation::RemoveEntity {
2083                name: "User".into(),
2084            }],
2085        };
2086        let result = plan_to_sql(&plan);
2087        assert!(result.is_err());
2088        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2089    }
2090
2091    #[test]
2092    fn apply_not_implemented() {
2093        let adapter = PostgresAdapter;
2094        let plan = SchemaPlan {
2095            operations: vec![SchemaOperation::Noop],
2096        };
2097        let result = adapter.apply_schema(&plan);
2098        assert!(result.is_err());
2099        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2100    }
2101
2102    #[test]
2103    fn sql_uses_quoted_identifiers() {
2104        let fields = vec![FieldSpec {
2105            name: "createdAt".into(),
2106            field_type: "datetime".into(),
2107            optional: false,
2108            unique: false,
2109        }];
2110        let sql = create_table_sql("User", &fields);
2111        // Postgres identifiers should be quoted for case-sensitivity.
2112        assert!(sql.contains("\"User\""));
2113        assert!(sql.contains("\"createdAt\""));
2114        assert!(sql.contains("TIMESTAMPTZ"));
2115    }
2116
2117    // -- Introspection SQL tests --
2118
2119    #[test]
2120    fn introspect_sql_constants_are_valid() {
2121        // Sanity checks that the SQL strings exist and look reasonable.
2122        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2123        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2124        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2125        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2126    }
2127
2128    // -- Plan from snapshot tests --
2129
2130    #[test]
2131    fn plan_from_empty_snapshot_creates_all() {
2132        let snapshot = crate::SchemaSnapshot { tables: vec![] };
2133        let manifest = test_manifest();
2134        let plan = plan_from_snapshot(&snapshot, &manifest);
2135
2136        assert!(plan.operations.iter().any(|op| matches!(
2137            op,
2138            SchemaOperation::CreateEntity { name, .. } if name == "User"
2139        )));
2140        assert!(plan.operations.iter().any(|op| matches!(
2141            op,
2142            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2143        )));
2144        assert!(plan.operations.iter().any(|op| matches!(
2145            op,
2146            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2147        )));
2148    }
2149
2150    #[test]
2151    fn plan_from_full_snapshot_is_noop() {
2152        let snapshot = crate::SchemaSnapshot {
2153            tables: vec![
2154                crate::TableSnapshot {
2155                    name: "User".into(),
2156                    columns: vec![
2157                        crate::ColumnSnapshot {
2158                            name: "id".into(),
2159                            column_type: "TEXT".into(),
2160                            notnull: true,
2161                            primary_key: true,
2162                        },
2163                        crate::ColumnSnapshot {
2164                            name: "email".into(),
2165                            column_type: "TEXT".into(),
2166                            notnull: true,
2167                            primary_key: false,
2168                        },
2169                        crate::ColumnSnapshot {
2170                            name: "displayName".into(),
2171                            column_type: "TEXT".into(),
2172                            notnull: true,
2173                            primary_key: false,
2174                        },
2175                        crate::ColumnSnapshot {
2176                            name: "createdAt".into(),
2177                            column_type: "TIMESTAMPTZ".into(),
2178                            notnull: true,
2179                            primary_key: false,
2180                        },
2181                    ],
2182                    indexes: vec![],
2183                },
2184                crate::TableSnapshot {
2185                    name: "Todo".into(),
2186                    columns: vec![
2187                        crate::ColumnSnapshot {
2188                            name: "id".into(),
2189                            column_type: "TEXT".into(),
2190                            notnull: true,
2191                            primary_key: true,
2192                        },
2193                        crate::ColumnSnapshot {
2194                            name: "title".into(),
2195                            column_type: "TEXT".into(),
2196                            notnull: true,
2197                            primary_key: false,
2198                        },
2199                        crate::ColumnSnapshot {
2200                            name: "done".into(),
2201                            column_type: "BOOLEAN".into(),
2202                            notnull: true,
2203                            primary_key: false,
2204                        },
2205                        crate::ColumnSnapshot {
2206                            name: "userId".into(),
2207                            column_type: "TEXT".into(),
2208                            notnull: true,
2209                            primary_key: false,
2210                        },
2211                        crate::ColumnSnapshot {
2212                            name: "createdAt".into(),
2213                            column_type: "TIMESTAMPTZ".into(),
2214                            notnull: true,
2215                            primary_key: false,
2216                        },
2217                    ],
2218                    indexes: vec![crate::IndexSnapshot {
2219                        name: "Todo_by_user".into(),
2220                        columns: vec!["userId".into()],
2221                        unique: false,
2222                    }],
2223                },
2224            ],
2225        };
2226        let manifest = test_manifest();
2227        let plan = plan_from_snapshot(&snapshot, &manifest);
2228        assert!(plan.is_empty());
2229    }
2230
2231    #[test]
2232    fn plan_detects_missing_column_in_snapshot() {
2233        let snapshot = crate::SchemaSnapshot {
2234            tables: vec![
2235                crate::TableSnapshot {
2236                    name: "User".into(),
2237                    columns: vec![
2238                        crate::ColumnSnapshot {
2239                            name: "id".into(),
2240                            column_type: "TEXT".into(),
2241                            notnull: true,
2242                            primary_key: true,
2243                        },
2244                        crate::ColumnSnapshot {
2245                            name: "email".into(),
2246                            column_type: "TEXT".into(),
2247                            notnull: true,
2248                            primary_key: false,
2249                        },
2250                        // missing displayName and createdAt
2251                    ],
2252                    indexes: vec![],
2253                },
2254                crate::TableSnapshot {
2255                    name: "Todo".into(),
2256                    columns: vec![
2257                        crate::ColumnSnapshot {
2258                            name: "id".into(),
2259                            column_type: "TEXT".into(),
2260                            notnull: true,
2261                            primary_key: true,
2262                        },
2263                        crate::ColumnSnapshot {
2264                            name: "title".into(),
2265                            column_type: "TEXT".into(),
2266                            notnull: true,
2267                            primary_key: false,
2268                        },
2269                        crate::ColumnSnapshot {
2270                            name: "done".into(),
2271                            column_type: "BOOLEAN".into(),
2272                            notnull: true,
2273                            primary_key: false,
2274                        },
2275                        crate::ColumnSnapshot {
2276                            name: "userId".into(),
2277                            column_type: "TEXT".into(),
2278                            notnull: true,
2279                            primary_key: false,
2280                        },
2281                        crate::ColumnSnapshot {
2282                            name: "createdAt".into(),
2283                            column_type: "TIMESTAMPTZ".into(),
2284                            notnull: true,
2285                            primary_key: false,
2286                        },
2287                    ],
2288                    indexes: vec![crate::IndexSnapshot {
2289                        name: "Todo_by_user".into(),
2290                        columns: vec!["userId".into()],
2291                        unique: false,
2292                    }],
2293                },
2294            ],
2295        };
2296        let manifest = test_manifest();
2297        let plan = plan_from_snapshot(&snapshot, &manifest);
2298
2299        let add_fields: Vec<_> = plan
2300            .operations
2301            .iter()
2302            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2303            .collect();
2304        assert_eq!(add_fields.len(), 2); // displayName + createdAt
2305    }
2306
2307    // -- CRUD helper tests (no live database required) --
2308
2309    #[test]
2310    fn json_value_to_string_handles_all_types() {
2311        assert_eq!(
2312            json_value_to_string(&serde_json::Value::String("hello".into())),
2313            "hello"
2314        );
2315        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2316        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2317        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2318        assert_eq!(
2319            json_value_to_string(&serde_json::Value::Bool(false)),
2320            "false"
2321        );
2322        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2323        // Arrays and objects get their JSON representation.
2324        assert_eq!(
2325            json_value_to_string(&serde_json::json!([1, 2, 3])),
2326            "[1,2,3]"
2327        );
2328        assert_eq!(
2329            json_value_to_string(&serde_json::json!({"a": 1})),
2330            "{\"a\":1}"
2331        );
2332    }
2333
2334    #[test]
2335    fn generate_id_returns_hex_string() {
2336        let id = generate_id();
2337        assert!(!id.is_empty());
2338        // Must be valid hex characters.
2339        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2340    }
2341
2342    #[test]
2343    fn generate_id_is_unique_across_calls() {
2344        let id1 = generate_id();
2345        let id2 = generate_id();
2346        assert_ne!(id1, id2);
2347    }
2348
2349    #[test]
2350    fn generate_id_is_lex_sortable() {
2351        // 1000 IDs back-to-back must come out in monotonically increasing
2352        // lexicographic order. This is what makes cursor pagination correct.
2353        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2354        let sorted = {
2355            let mut s = ids.clone();
2356            s.sort();
2357            s
2358        };
2359        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2360        // And every id must be the same width (otherwise lex comparison is
2361        // wrong at width boundaries).
2362        let len0 = ids[0].len();
2363        assert!(ids.iter().all(|id| id.len() == len0));
2364        ids.dedup();
2365        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2366    }
2367
2368    #[test]
2369    fn build_insert_sql_simple() {
2370        let data = serde_json::json!({
2371            "email": "alice@example.com",
2372            "displayName": "Alice"
2373        });
2374        let (sql, values) = build_insert_sql("User", &data).unwrap();
2375
2376        assert!(sql.starts_with("INSERT INTO \"User\""));
2377        assert!(sql.contains("id"));
2378        assert!(sql.contains("$1"));
2379        assert!(sql.contains("$2"));
2380        assert!(sql.contains("$3"));
2381        // First value is the generated ID — JsonParam::Text variant.
2382        match &values[0] {
2383            JsonParam::Text(s) => assert!(!s.is_empty()),
2384            other => panic!("expected Text id param, got {other:?}"),
2385        }
2386        assert_eq!(values.len(), 3); // id + 2 fields
2387    }
2388
2389    #[test]
2390    fn build_insert_sql_preserves_json_types() {
2391        let data = serde_json::json!({
2392            "n": 42,
2393            "f": 1.5,
2394            "b": true,
2395            "s": "hi",
2396            "z": null,
2397        });
2398        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2399        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2400        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2401        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2402        assert!(matches!(kinds[1], JsonParam::Float(_)));
2403        assert!(matches!(kinds[2], JsonParam::Int(42)));
2404        assert!(matches!(kinds[3], JsonParam::Text(_)));
2405        assert!(matches!(kinds[4], JsonParam::Null));
2406    }
2407
2408    #[test]
2409    fn build_insert_sql_quotes_column_names() {
2410        let data = serde_json::json!({"createdAt": "2026-01-01"});
2411        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2412        assert!(sql.contains("\"createdAt\""));
2413        assert!(sql.contains("\"Todo\""));
2414    }
2415
2416    #[test]
2417    fn build_insert_sql_rejects_non_object() {
2418        let data = serde_json::json!("not an object");
2419        let result = build_insert_sql("User", &data);
2420        assert!(result.is_err());
2421        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2422    }
2423
2424    #[test]
2425    fn build_update_sql_simple() {
2426        let data = serde_json::json!({
2427            "displayName": "Bob",
2428            "email": "bob@example.com"
2429        });
2430        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2431
2432        assert!(sql.starts_with("UPDATE \"User\" SET"));
2433        assert!(sql.contains("WHERE id = $1"));
2434        assert!(sql.contains("$2"));
2435        assert!(sql.contains("$3"));
2436        match &values[0] {
2437            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2438            other => panic!("expected Text id param, got {other:?}"),
2439        }
2440        assert_eq!(values.len(), 3); // id + 2 fields
2441    }
2442
2443    #[test]
2444    fn build_update_sql_quotes_column_names() {
2445        let data = serde_json::json!({"displayName": "Carol"});
2446        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2447        assert!(sql.contains("\"displayName\" = $2"));
2448    }
2449
2450    #[test]
2451    fn build_update_sql_rejects_non_object() {
2452        let data = serde_json::json!(42);
2453        let result = build_update_sql("User", "id1", &data);
2454        assert!(result.is_err());
2455        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2456    }
2457
2458    #[test]
2459    fn build_update_sql_rejects_empty_object() {
2460        let data = serde_json::json!({});
2461        let err = build_update_sql("User", "id1", &data).unwrap_err();
2462        assert_eq!(err.code, "PG_INVALID_DATA");
2463        assert!(err.message.contains("at least one field"));
2464    }
2465}