Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36pub(crate) fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40/// Public re-export for sibling modules (e.g. `pg_tx_store`) that need
41/// to build SQL strings against the same dialect rules. Keeping the
42/// crate-private function untouched preserves the existing call sites
43/// inside this module.
44#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46    quote_ident(name)
47}
48
49/// Public re-export of [`live::row_to_json`] for sibling modules. Used
50/// by `pg_tx_store` so transactional reads return rows in the same
51/// JSON shape as the non-transactional path.
52#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54    live::row_to_json(row)
55}
56
57/// Public re-export of the filter builder so the in-tx PgTxStore can
58/// reuse the same operator surface ($eq, $like, $in, $order, $limit,
59/// $offset, $not, $gt, $gte, $lt, $lte) as the non-tx path. Returns
60/// `(sql, params)` ready to feed to either `client.query` or
61/// `transaction.query`.
62#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64    entity: &str,
65    filter: &serde_json::Value,
66    valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68    live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71/// Public re-export of the aggregate builder. Returns
72/// `(sql, params, column_names)` — the column_names list drives the
73/// per-row projection in `aggregate_rows_to_json_pub`.
74#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76    entity: &str,
77    spec: &serde_json::Value,
78    valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80    live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83/// Public re-export of the post-processing helper that converts raw
84/// aggregate `Row`s into the `{ rows: [{...}] }` JSON shape.
85#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87    rows: &[postgres::Row],
88    column_names: &[String],
89) -> serde_json::Value {
90    live::aggregate_rows_to_json(rows, column_names)
91}
92
93// ---------------------------------------------------------------------------
94// SQL generation
95// ---------------------------------------------------------------------------
96
97/// Generate a Postgres CREATE TABLE statement.
98pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101    for field in fields {
102        let col_type = pg_column_type(&field.field_type);
103        let not_null = if field.optional { "" } else { " NOT NULL" };
104        let unique = if field.unique { " UNIQUE" } else { "" };
105        columns.push(format!(
106            "{} {}{}{}",
107            quote_ident(&field.name),
108            col_type,
109            not_null,
110            unique
111        ));
112    }
113
114    format!(
115        "CREATE TABLE IF NOT EXISTS {} ({})",
116        quote_ident(entity_name),
117        columns.join(", ")
118    )
119}
120
121/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
122/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
123/// Required-ness is tracked in the manifest; enforcement deferred.
124pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125    let col_type = pg_column_type(&field.field_type);
126    let unique = if field.unique { " UNIQUE" } else { "" };
127    format!(
128        "ALTER TABLE {} ADD COLUMN {} {}{}",
129        quote_ident(entity_name),
130        quote_ident(&field.name),
131        col_type,
132        unique
133    )
134}
135
136/// Generate a Postgres CREATE INDEX statement.
137pub fn create_index_sql(
138    entity_name: &str,
139    index_name: &str,
140    fields: &[String],
141    unique: bool,
142) -> String {
143    let unique_str = if unique { "UNIQUE " } else { "" };
144    let full_index_name = format!("{}_{}", entity_name, index_name);
145    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146    format!(
147        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148        unique_str,
149        quote_ident(&full_index_name),
150        quote_ident(entity_name),
151        quoted_fields.join(", ")
152    )
153}
154
155// ---------------------------------------------------------------------------
156// PostgresAdapter — planning-only adapter
157// ---------------------------------------------------------------------------
158
159/// A Postgres storage adapter. Currently supports planning only.
160/// No live connection — SQL generation and planning from manifest.
161pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165        // Plan from empty baseline.
166        let mut operations = Vec::new();
167
168        for entity in &target.entities {
169            let fields: Vec<FieldSpec> = entity
170                .fields
171                .iter()
172                .map(|f| FieldSpec {
173                    name: f.name.clone(),
174                    field_type: f.field_type.clone(),
175                    optional: f.optional,
176                    unique: f.unique,
177                })
178                .collect();
179
180            operations.push(SchemaOperation::CreateEntity {
181                name: entity.name.clone(),
182                fields,
183            });
184
185            for index in &entity.indexes {
186                operations.push(SchemaOperation::AddIndex {
187                    entity: entity.name.clone(),
188                    name: index.name.clone(),
189                    fields: index.fields.clone(),
190                    unique: index.unique,
191                });
192            }
193        }
194
195        if operations.is_empty() {
196            operations.push(SchemaOperation::Noop);
197        }
198
199        Ok(SchemaPlan { operations })
200    }
201
202    // apply_schema intentionally not implemented — uses default trait error.
203}
204
205/// Generate all SQL statements for a plan, in order.
206/// Useful for dry-run preview of what Postgres DDL would be executed.
207pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208    let mut statements = Vec::new();
209
210    for op in &plan.operations {
211        match op {
212            SchemaOperation::CreateEntity { name, fields } => {
213                statements.push(create_table_sql(name, fields));
214            }
215            SchemaOperation::AddField { entity, field } => {
216                statements.push(add_column_sql(entity, field));
217            }
218            SchemaOperation::AlterField {
219                entity,
220                previous,
221                target,
222            } => {
223                // Only nullable transitions today. SET / DROP NOT NULL is
224                // safe on a populated table when going from required →
225                // optional (existing rows already satisfy NOT NULL); the
226                // reverse direction (optional → required) succeeds only
227                // if every row has a non-null value, which the planner
228                // has no way to know — Postgres will fail the migration
229                // if it can't and the operator gets a clear error from
230                // the apply step.
231                if previous.optional && !target.optional {
232                    statements.push(format!(
233                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234                        quote_ident(entity),
235                        quote_ident(&target.name)
236                    ));
237                } else if !previous.optional && target.optional {
238                    statements.push(format!(
239                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240                        quote_ident(entity),
241                        quote_ident(&target.name)
242                    ));
243                }
244                // Nothing emitted when neither nullable nor type changed
245                // — falls through silently. AlterField with no actual
246                // shape change shouldn't happen in practice (the planner
247                // only emits it on real drift), but guard against
248                // emitting empty SQL just in case.
249            }
250            SchemaOperation::AddIndex {
251                entity,
252                name,
253                fields,
254                unique,
255            } => {
256                statements.push(create_index_sql(entity, name, fields, *unique));
257            }
258            SchemaOperation::CreateSearchIndex { entity, config } => {
259                #[cfg(feature = "postgres-live")]
260                {
261                    statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262                }
263                #[cfg(not(feature = "postgres-live"))]
264                {
265                    let _ = (entity, config);
266                    return Err(StorageError {
267                        code: "PG_SEARCH_FEATURE_OFF".into(),
268                        message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269                    });
270                }
271            }
272            SchemaOperation::RemoveSearchIndex { entity } => {
273                // Without the original config we don't know which
274                // facet/sort indexes were created. Drop the FTS table
275                // and the GIN index by their fixed names; per-field
276                // facet/sort indexes are dropped automatically by the
277                // entity DROP path. Operators removing search via
278                // schema diff will see leftover indexes only if the
279                // entity table itself still exists — at which point
280                // the next CREATE INDEX IF NOT EXISTS catches up.
281                //
282                // quote_ident on the synthetic table/index names so a
283                // malicious entity name with embedded `"` can't break
284                // out of the identifier.
285                statements.push(format!(
286                    "DROP TABLE IF EXISTS {} CASCADE",
287                    quote_ident(&format!("_fts_{entity}"))
288                ));
289                statements.push(format!(
290                    "DROP INDEX IF EXISTS {}",
291                    quote_ident(&format!("{entity}_fts_gin"))
292                ));
293            }
294            SchemaOperation::Noop => {}
295            other => {
296                return Err(StorageError {
297                    code: "PG_OP_UNSUPPORTED".into(),
298                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
299                });
300            }
301        }
302    }
303
304    Ok(statements)
305}
306
307// ---------------------------------------------------------------------------
308// Introspection SQL helpers
309//
310// These generate the SQL queries that a live Postgres connection would run
311// to read the current schema. No connection required — just SQL strings.
312// ---------------------------------------------------------------------------
313
314/// SQL to list user tables in the public schema.
315pub const INTROSPECT_TABLES_SQL: &str = "\
316    SELECT table_name \
317    FROM information_schema.tables \
318    WHERE table_schema = 'public' \
319      AND table_type = 'BASE TABLE' \
320      AND table_name NOT LIKE '_pylon_%' \
321    ORDER BY table_name";
322
323/// SQL to list columns for a given table.
324/// Use with parameter: table_name.
325pub const INTROSPECT_COLUMNS_SQL: &str = "\
326    SELECT column_name, data_type, is_nullable, \
327           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328            JOIN information_schema.key_column_usage kcu \
329              ON tc.constraint_name = kcu.constraint_name \
330            WHERE tc.table_name = c.table_name \
331              AND kcu.column_name = c.column_name \
332              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333    FROM information_schema.columns c \
334    WHERE table_schema = 'public' AND table_name = $1 \
335    ORDER BY ordinal_position";
336
337/// SQL to list indexes for a given table.
338/// Use with parameter: table_name.
339pub const INTROSPECT_INDEXES_SQL: &str = "\
340    SELECT i.relname as index_name, \
341           ix.indisunique as is_unique, \
342           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343    FROM pg_index ix \
344    JOIN pg_class t ON t.oid = ix.indrelid \
345    JOIN pg_class i ON i.oid = ix.indexrelid \
346    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347    JOIN pg_namespace n ON n.oid = t.relnamespace \
348    WHERE n.nspname = 'public' \
349      AND t.relname = $1 \
350      AND NOT ix.indisprimary \
351    GROUP BY i.relname, ix.indisunique \
352    ORDER BY i.relname";
353
354/// Plan from a snapshot (reuses the shared plan_from_snapshot).
355/// This allows Postgres to plan incrementally once introspection data is available.
356pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357    crate::plan_from_snapshot(snapshot, target)
358}
359
360// ---------------------------------------------------------------------------
361// CRUD SQL generation helpers (used by live adapter, testable without a DB)
362// ---------------------------------------------------------------------------
363
364/// Generate a lex-sortable, monotonic-ish unique ID.
365///
366/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
367/// of a per-process atomic counter. The counter prevents collisions when two
368/// inserts hit the same nanosecond and — critically — keeps order stable: an
369/// id minted at the same nanosecond is monotonically greater than the
370/// previous one. Width is fixed at 40 chars so lexicographic comparison
371/// matches creation order, which is what cursor pagination relies on.
372pub fn generate_id() -> String {
373    use std::sync::atomic::{AtomicU32, Ordering};
374    use std::time::{SystemTime, UNIX_EPOCH};
375    static COUNTER: AtomicU32 = AtomicU32::new(0);
376    let ts = SystemTime::now()
377        .duration_since(UNIX_EPOCH)
378        .unwrap_or_default()
379        .as_nanos();
380    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381    format!("{ts:032x}{seq:08x}")
382}
383
384/// Convert a JSON value to its string representation for use as a SQL parameter.
385///
386/// Kept for back-compat with callers that need a textual fallback (e.g.
387/// human-readable logs). New code should bind through [`JsonParam`] so
388/// integers/booleans/nulls reach Postgres in their typed form instead of
389/// collapsing to TEXT, which the driver can't coerce into INTEGER /
390/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
391/// into an empty string for FK columns.
392pub fn json_value_to_string(val: &serde_json::Value) -> String {
393    match val {
394        serde_json::Value::String(s) => s.clone(),
395        serde_json::Value::Number(n) => n.to_string(),
396        serde_json::Value::Bool(b) => b.to_string(),
397        serde_json::Value::Null => String::new(),
398        other => other.to_string(),
399    }
400}
401
402/// A typed wrapper around a JSON scalar that implements
403/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
404///
405/// The previous implementation passed every value as `String`, which broke
406/// non-text columns (the postgres driver can't bind a string literal into
407/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
408/// `null` into the empty string for nullable FKs (so `unlink` left
409/// dangling `""` references instead of NULL). `JsonParam` carries the
410/// JSON variant tag through to `to_sql` so the driver can pick the
411/// correct binary representation per column type.
412///
413/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
414/// runtime layer doesn't currently model array/object columns at the
415/// manifest level on Postgres, so anything that lands here came from
416/// caller-supplied prose that's expected to fit into a TEXT column.
417#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419    Null,
420    Text(String),
421    Int(i64),
422    Float(f64),
423    Bool(bool),
424}
425
426impl JsonParam {
427    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
428    /// that fit `i64` go through as Int; everything else goes through as
429    /// Float to preserve fractional / large-magnitude values.
430    pub fn from_json(val: &serde_json::Value) -> Self {
431        match val {
432            serde_json::Value::Null => JsonParam::Null,
433            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435            serde_json::Value::Number(n) => {
436                if let Some(i) = n.as_i64() {
437                    JsonParam::Int(i)
438                } else if let Some(f) = n.as_f64() {
439                    JsonParam::Float(f)
440                } else {
441                    JsonParam::Text(n.to_string())
442                }
443            }
444            other => JsonParam::Text(other.to_string()),
445        }
446    }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451    fn to_sql(
452        &self,
453        ty: &postgres::types::Type,
454        out: &mut bytes::BytesMut,
455    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456        use postgres::types::Type;
457
458        // Null binds as SQL NULL regardless of the column's declared
459        // type — the postgres driver treats `IsNull::Yes` as a null
460        // value of the requested type.
461        if matches!(self, JsonParam::Null) {
462            return Ok(postgres::types::IsNull::Yes);
463        }
464
465        // Match each JsonParam variant against the COLUMN's declared
466        // type so the binary encoding actually fits the target slot.
467        // Postgres rejects "binary data of wrong size" if you bind an
468        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
469        // exactly what the previous "everything is a String" path did
470        // on every non-TEXT column.
471        match (self, ty) {
472            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485            (JsonParam::Text(s), &Type::TEXT)
486            | (JsonParam::Text(s), &Type::VARCHAR)
487            | (JsonParam::Text(s), &Type::BPCHAR)
488            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490                // The runtime models datetimes as ISO 8601 strings
491                // (`pylon_kernel::util::now_iso` shape, plus
492                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
493                // binary wire format is `i64` microseconds since
494                // 2000-01-01 UTC — NOT the bytes of an ISO string.
495                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
496                // which advertised TIMESTAMPTZ format but wrote raw
497                // ASCII; Postgres rejected with "incorrect binary
498                // data format in bind parameter N". This was the
499                // OAuth-callback failure mode on pylon-cloud
500                // (User.createdAt). Parse via chrono and let the
501                // postgres crate's `with-chrono-0_4` ToSql impl
502                // emit the proper binary format.
503                let dt = chrono::DateTime::parse_from_rfc3339(s)
504                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505                    .with_timezone(&chrono::Utc);
506                dt.to_sql(ty, out)
507            }
508            (JsonParam::Text(s), &Type::TIMESTAMP) => {
509                // TIMESTAMP (no timezone) — same conversion shape but
510                // bind as NaiveDateTime so chrono picks the right
511                // binary encoding for the column.
512                let dt = chrono::DateTime::parse_from_rfc3339(s)
513                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514                    .with_timezone(&chrono::Utc)
515                    .naive_utc();
516                dt.to_sql(ty, out)
517            }
518            (JsonParam::Text(s), &Type::DATE) => {
519                let dt = chrono::DateTime::parse_from_rfc3339(s)
520                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521                    .with_timezone(&chrono::Utc)
522                    .date_naive();
523                dt.to_sql(ty, out)
524            }
525
526            // Cross-type fallback: render as text and bind into a TEXT
527            // slot, OR error if the target column doesn't accept text.
528            // Catches "manifest says INT but caller sent a stringified
529            // number" — better to fail loudly than silently coerce.
530            (other, _) => {
531                let s = match other {
532                    JsonParam::Bool(b) => b.to_string(),
533                    JsonParam::Int(n) => n.to_string(),
534                    JsonParam::Float(f) => f.to_string(),
535                    JsonParam::Text(s) => s.clone(),
536                    JsonParam::Null => unreachable!(),
537                };
538                s.to_sql(ty, out)
539            }
540        }
541    }
542
543    fn accepts(_ty: &postgres::types::Type) -> bool {
544        // Defer per-variant acceptance to to_sql_checked, which dispatches
545        // to the inner type's ToSql impl. Returning `true` here matches
546        // the postgres crate's recommended pattern for sum-type wrappers.
547        true
548    }
549
550    postgres::types::to_sql_checked!();
551}
552
553/// Build an INSERT SQL statement and collect typed parameter values.
554/// Returns `(sql, params)` where `params[0]` is the generated ID
555/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
556/// value so the postgres driver can bind them to typed columns
557/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
558/// the database as SQL NULL — the previous string-collapsing path stored
559/// `""` for nullable FKs and broke any non-text column.
560pub fn build_insert_sql(
561    entity: &str,
562    data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564    let obj = data.as_object().ok_or_else(|| StorageError {
565        code: "PG_INVALID_DATA".into(),
566        message: "Insert data must be a JSON object".into(),
567    })?;
568
569    // Honor caller-supplied `id` (used by the CRDT path that needs to
570    // share the row id with the LoroDoc snapshot key it just wrote).
571    // Fall back to a fresh ULID-shaped id when absent. A non-string
572    // `id` value is rejected explicitly — silently regenerating would
573    // mask schema bugs (e.g. a caller passing an int id) and let the
574    // CRDT snapshot key drift from the materialized row id.
575    let id = match obj.get("id") {
576        None | Some(serde_json::Value::Null) => generate_id(),
577        Some(serde_json::Value::String(s)) => s.clone(),
578        Some(other) => {
579            return Err(StorageError {
580                code: "PG_INVALID_ID".into(),
581                message: format!(
582                    "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583                     are always strings (40-char hex). Drop the `id` field to let the \
584                     server generate one, or supply a string."
585                ),
586            });
587        }
588    };
589
590    let mut col_names = vec!["id".to_string()];
591    let mut placeholders = vec!["$1".to_string()];
592    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594    let mut i = 0usize;
595    for (key, val) in obj {
596        if key == "id" {
597            // Already emitted via the synthetic first column; skipping
598            // here avoids `INSERT ... (id, id, ...)` which Postgres
599            // rejects with `duplicate key value`.
600            continue;
601        }
602        col_names.push(quote_ident(key));
603        placeholders.push(format!("${}", i + 2));
604        values.push(JsonParam::from_json(val));
605        i += 1;
606    }
607
608    let sql = format!(
609        "INSERT INTO {} ({}) VALUES ({})",
610        quote_ident(entity),
611        col_names.join(", "),
612        placeholders.join(", ")
613    );
614
615    Ok((sql, values))
616}
617
618/// Build an UPDATE SQL statement and collect typed parameter values.
619/// Returns `(sql, params)` where `params[0]` is the row ID.
620pub fn build_update_sql(
621    entity: &str,
622    id: &str,
623    data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625    let obj = data.as_object().ok_or_else(|| StorageError {
626        code: "PG_INVALID_DATA".into(),
627        message: "Update data must be a JSON object".into(),
628    })?;
629
630    if obj.is_empty() {
631        return Err(StorageError {
632            code: "PG_INVALID_DATA".into(),
633            message: "Update data must contain at least one field".into(),
634        });
635    }
636
637    let mut set_clauses = Vec::new();
638    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640    let mut i = 0usize;
641    for (key, val) in obj {
642        if key == "id" {
643            // Reject primary-key mutation. Letting `id` into the SET
644            // clause lets a client move a row out from under its CRDT
645            // sidecar (which is keyed by the original row_id) and its
646            // FTS shadow row (FK-bound to the original id). The SQLite
647            // path silently drops `id` here too — keep the same
648            // shape, but errors so the caller sees the bug.
649            return Err(StorageError {
650                code: "PG_INVALID_UPDATE".into(),
651                message:
652                    "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653                     drop the field from the patch."
654                        .into(),
655            });
656        }
657        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658        values.push(JsonParam::from_json(val));
659        i += 1;
660    }
661
662    if set_clauses.is_empty() {
663        return Err(StorageError {
664            code: "PG_INVALID_DATA".into(),
665            message: "Update data must contain at least one non-id field".into(),
666        });
667    }
668
669    let sql = format!(
670        "UPDATE {} SET {} WHERE id = $1",
671        quote_ident(entity),
672        set_clauses.join(", ")
673    );
674
675    Ok((sql, values))
676}
677
678/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
679/// at insert/update/transact call sites. The postgres driver wants a
680/// slice of trait objects; this avoids repeating the same map at each site.
681#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683    values
684        .iter()
685        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686        .collect()
687}
688
689// ---------------------------------------------------------------------------
690// Live Postgres adapter (requires "postgres-live" feature)
691// ---------------------------------------------------------------------------
692
693#[cfg(feature = "postgres-live")]
694pub mod live {
695    use super::*;
696    use crate::{
697        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698    };
699
700    /// A live Postgres adapter with a real database connection.
701    pub struct LivePostgresAdapter {
702        client: postgres::Client,
703    }
704
705    impl LivePostgresAdapter {
706        /// Borrow the underlying postgres client mutably. Used by
707        /// `PostgresDataStore::with_transaction` to start an
708        /// interactive transaction across multiple TS-function
709        /// `ctx.db` calls. `pub(crate)` because exposing raw
710        /// `&mut Client` outside pylon-storage would let callers
711        /// issue arbitrary SQL, bypassing the typed `DataStore`
712        /// surface that the rest of the framework relies on.
713        pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
714            &mut self.client
715        }
716
717        /// Connect to a Postgres database.
718        pub fn connect(url: &str) -> Result<Self, StorageError> {
719            let client =
720                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
721                    code: "PG_CONNECT_FAILED".into(),
722                    message: format!("Failed to connect to Postgres: {e}"),
723                })?;
724            Ok(Self { client })
725        }
726
727        /// Read the current schema from the live database.
728        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
729            let table_rows = self
730                .client
731                .query(INTROSPECT_TABLES_SQL, &[])
732                .map_err(pg_err)?;
733
734            let mut tables = Vec::new();
735            for row in &table_rows {
736                let table_name: String = row.get(0);
737                let columns = self.read_columns(&table_name)?;
738                let indexes = self.read_indexes(&table_name)?;
739                tables.push(TableSnapshot {
740                    name: table_name,
741                    columns,
742                    indexes,
743                });
744            }
745
746            Ok(SchemaSnapshot { tables })
747        }
748
749        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
750            let rows = self
751                .client
752                .query(INTROSPECT_COLUMNS_SQL, &[&table])
753                .map_err(pg_err)?;
754
755            let mut columns = Vec::new();
756            for row in &rows {
757                let name: String = row.get(0);
758                let data_type: String = row.get(1);
759                let is_nullable: String = row.get(2);
760                let is_pk: i64 = row.get(3);
761                columns.push(ColumnSnapshot {
762                    name,
763                    column_type: data_type,
764                    notnull: is_nullable == "NO",
765                    primary_key: is_pk > 0,
766                });
767            }
768            Ok(columns)
769        }
770
771        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
772            let rows = self
773                .client
774                .query(INTROSPECT_INDEXES_SQL, &[&table])
775                .map_err(pg_err)?;
776
777            let mut indexes = Vec::new();
778            for row in &rows {
779                let name: String = row.get(0);
780                let unique: bool = row.get(1);
781                let columns: Vec<String> = row.get(2);
782                indexes.push(IndexSnapshot {
783                    name,
784                    columns,
785                    unique,
786                });
787            }
788            Ok(indexes)
789        }
790
791        /// Plan from live database state.
792        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
793            let snapshot = self.read_schema()?;
794            Ok(crate::plan_from_snapshot(&snapshot, target))
795        }
796    }
797
798    impl StorageAdapter for LivePostgresAdapter {
799        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
800            Err(StorageError {
801                code: "PG_PLAN_NEEDS_MUTABLE".into(),
802                message: "Use plan_from_live() instead for live Postgres planning".into(),
803            })
804        }
805
806        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
807            Err(StorageError {
808                code: "PG_APPLY_USE_METHOD".into(),
809                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
810            })
811        }
812    }
813
814    impl LivePostgresAdapter {
815        /// Apply a schema plan to the live database.
816        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
817            let statements = plan_to_sql(plan)?;
818            for sql in &statements {
819                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
820            }
821            Ok(())
822        }
823
824        /// Execute a raw SQL statement against the live database. Used by
825        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
826        /// production code should go through `apply_plan` so changes are
827        /// represented in the migration history. Returns the number of
828        /// rows affected.
829        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
830            self.client.execute(sql, &[]).map_err(pg_err)
831        }
832
833        /// Insert a row. Returns the generated ID.
834        pub fn insert(
835            &mut self,
836            entity: &str,
837            data: &serde_json::Value,
838        ) -> Result<String, StorageError> {
839            let (sql, values) = build_insert_sql(entity, data)?;
840            // The first param is always the generated ID — extract it before
841            // we hand `values` off to the postgres driver as borrowed slices.
842            let id = match &values[0] {
843                JsonParam::Text(s) => s.clone(),
844                _ => {
845                    return Err(StorageError {
846                        code: "PG_INTERNAL".into(),
847                        message: "build_insert_sql produced non-text id param".into(),
848                    });
849                }
850            };
851            let params = as_pg_params(&values);
852            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
853            Ok(id)
854        }
855
856        /// Get a row by ID.
857        pub fn get_by_id(
858            &mut self,
859            entity: &str,
860            id: &str,
861        ) -> Result<Option<serde_json::Value>, StorageError> {
862            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
863            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
864
865            match rows.first() {
866                Some(row) => Ok(Some(row_to_json(row))),
867                None => Ok(None),
868            }
869        }
870
871        /// List all rows from an entity.
872        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
873            let sql = format!("SELECT * FROM {}", quote_ident(entity));
874            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
875
876            Ok(rows.iter().map(row_to_json).collect())
877        }
878
879        /// Cursor-paginated list. `after` is the last `id` from the previous
880        /// page; the result contains rows with `id > after` (lex order),
881        /// limited to `limit`. Used for sync push/pull.
882        pub fn list_after(
883            &mut self,
884            entity: &str,
885            after: Option<&str>,
886            limit: usize,
887        ) -> Result<Vec<serde_json::Value>, StorageError> {
888            // Cap limit at a sensible upper bound so a malicious client can't
889            // stream the whole table by passing limit=u64::MAX.
890            let capped: i64 = limit.min(10_000) as i64;
891            let sql = match after {
892                Some(_) => format!(
893                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
894                    quote_ident(entity)
895                ),
896                None => format!(
897                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
898                    quote_ident(entity)
899                ),
900            };
901            let rows = match after {
902                Some(cursor) => self
903                    .client
904                    .query(sql.as_str(), &[&cursor, &capped])
905                    .map_err(pg_err)?,
906                None => self
907                    .client
908                    .query(sql.as_str(), &[&capped])
909                    .map_err(pg_err)?,
910            };
911            Ok(rows.iter().map(row_to_json).collect())
912        }
913
914        /// Update a row by ID. Returns true if the row was found and updated.
915        pub fn update(
916            &mut self,
917            entity: &str,
918            id: &str,
919            data: &serde_json::Value,
920        ) -> Result<bool, StorageError> {
921            let (sql, values) = build_update_sql(entity, id, data)?;
922            let params = as_pg_params(&values);
923            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
924            Ok(rows_affected > 0)
925        }
926
927        /// Delete a row by ID. Returns true if the row was found and deleted.
928        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
929            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
930            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
931            Ok(rows_affected > 0)
932        }
933
934        /// Look up a row by `field = value`. Caller must validate `field`
935        /// against the manifest before calling — we still `quote_ident` it
936        /// but won't catch a typo against the entity definition.
937        pub fn lookup_field(
938            &mut self,
939            entity: &str,
940            field: &str,
941            value: &str,
942        ) -> Result<Option<serde_json::Value>, StorageError> {
943            let sql = format!(
944                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
945                quote_ident(entity),
946                quote_ident(field),
947            );
948            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
949            Ok(rows.first().map(row_to_json))
950        }
951
952        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
953        ///
954        /// Supported operators (parity with the SQLite path):
955        /// - Equality (`field: value`)
956        /// - `$not`: emits `field != value`
957        /// - `$gt` / `$gte` / `$lt` / `$lte`
958        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
959        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
960        ///   if/when the SQLite side adds it)
961        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
962        ///
963        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
964        ///
965        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
966        /// would need a tsvector column or a generic ILIKE OR-fold across
967        /// every text field, neither of which is wired up yet. Returns
968        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
969        /// receiving silently-broad results.
970        ///
971        /// Anything else is silently ignored (matches the in-memory fallback's
972        /// permissive behavior). Field names are validated against `valid_columns`
973        /// to prevent SQL injection — pass the entity's column set.
974        pub fn query_filtered(
975            &mut self,
976            entity: &str,
977            filter: &serde_json::Value,
978            valid_columns: &[String],
979        ) -> Result<Vec<serde_json::Value>, StorageError> {
980            let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
981            let pg_params = as_pg_params(&params);
982            let rows = self
983                .client
984                .query(sql.as_str(), &pg_params)
985                .map_err(pg_err)?;
986            Ok(rows.iter().map(row_to_json).collect())
987        }
988
989        /// Build the `SELECT ... FROM entity ...` SQL + bound params for
990        /// a `query_filtered` request. Pure: takes a manifest's column
991        /// list, returns text. Both the live adapter and the in-tx
992        /// `PgTxStore` call this so the operator surface ($eq, $like,
993        /// $in, $order, $limit, $offset) stays identical regardless of
994        /// where the query runs.
995        pub(crate) fn build_query_filtered_sql(
996            entity: &str,
997            filter: &serde_json::Value,
998            valid_columns: &[String],
999        ) -> Result<(String, Vec<JsonParam>), StorageError> {
1000            let empty = serde_json::Map::new();
1001            let obj = filter.as_object().unwrap_or(&empty);
1002
1003            let validate = |col: &str| -> Result<(), StorageError> {
1004                if col == "id" || valid_columns.iter().any(|c| c == col) {
1005                    Ok(())
1006                } else {
1007                    Err(StorageError {
1008                        code: "UNKNOWN_COLUMN".into(),
1009                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1010                    })
1011                }
1012            };
1013
1014            let mut where_clauses: Vec<String> = Vec::new();
1015            let mut order_clause = String::new();
1016            let mut limit_clause = String::new();
1017            let mut offset_clause = String::new();
1018            // Collect (col, op, value) so placeholder numbers can be assigned
1019            // in a single materialization pass after the parse loop. Values
1020            // are now JsonParam (typed) instead of String — see `value_to_pg`.
1021            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1022
1023            for (key, val) in obj {
1024                match key.as_str() {
1025                    "$search" => {
1026                        // PG full-text via the entity's `_fts_<entity>`
1027                        // shadow table: `id IN (SELECT entity_id FROM
1028                        // _fts_<E> WHERE tsv @@ plainto_tsquery(...))`.
1029                        // Mirrors the SQLite path that joins the FTS5
1030                        // virtual table; the join here is a subquery so
1031                        // it composes with arbitrary other predicates
1032                        // (`$gt`, `$in`, etc.) in the same WHERE.
1033                        let raw = match val {
1034                            serde_json::Value::String(s) => s.clone(),
1035                            other => other.to_string(),
1036                        };
1037                        // Bind as a normal text param so all the
1038                        // existing placeholder-numbering and binding
1039                        // logic applies. The SQL uses the placeholder
1040                        // inside `plainto_tsquery('english', $N)`.
1041                        // Positioning logic mirrors the `$in` arm: the
1042                        // value will land at `planned.len()+1` because
1043                        // the materialization pass below pushes one
1044                        // param per planned item in order.
1045                        let placeholder_n = planned.len() + 1;
1046                        where_clauses.push(format!(
1047                            "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1048                                       WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1049                            quote_ident(entity),
1050                        ));
1051                        // Reuse the IN-style sentinel so the
1052                        // materialization pass below pushes the param
1053                        // without re-emitting a where_clause for it.
1054                        planned.push((
1055                            format!("__search_{}", planned.len()),
1056                            "__INLINE__".into(),
1057                            JsonParam::Text(raw),
1058                        ));
1059                    }
1060                    "$order" => {
1061                        if let Some(ord) = val.as_object() {
1062                            let mut parts = Vec::new();
1063                            for (col, dir) in ord {
1064                                validate(col)?;
1065                                let d = match dir.as_str().unwrap_or("asc") {
1066                                    "desc" | "DESC" => "DESC",
1067                                    _ => "ASC",
1068                                };
1069                                parts.push(format!("{} {d}", quote_ident(col)));
1070                            }
1071                            if !parts.is_empty() {
1072                                order_clause = format!(" ORDER BY {}", parts.join(", "));
1073                            }
1074                        }
1075                    }
1076                    "$limit" => {
1077                        if let Some(n) = val.as_u64() {
1078                            limit_clause = format!(" LIMIT {}", n);
1079                        }
1080                    }
1081                    "$offset" => {
1082                        if let Some(n) = val.as_u64() {
1083                            offset_clause = format!(" OFFSET {}", n);
1084                        }
1085                    }
1086                    field => {
1087                        validate(field)?;
1088                        match val {
1089                            serde_json::Value::Object(ops) => {
1090                                for (op, v) in ops {
1091                                    match op.as_str() {
1092                                        "$not" => planned.push((
1093                                            field.into(),
1094                                            "!=".into(),
1095                                            value_to_pg(v),
1096                                        )),
1097                                        "$gt" => {
1098                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
1099                                        }
1100                                        "$gte" => planned.push((
1101                                            field.into(),
1102                                            ">=".into(),
1103                                            value_to_pg(v),
1104                                        )),
1105                                        "$lt" => {
1106                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
1107                                        }
1108                                        "$lte" => planned.push((
1109                                            field.into(),
1110                                            "<=".into(),
1111                                            value_to_pg(v),
1112                                        )),
1113                                        "$like" => {
1114                                            // Wrap in `%...%` to match the
1115                                            // SQLite path's substring
1116                                            // semantics. Pre-fix divergence:
1117                                            // SQLite wrapped, PG forwarded
1118                                            // literally — `{name: {$like: "ann"}}`
1119                                            // matched "Joanne" on SQLite but
1120                                            // nothing on PG. Caller-supplied
1121                                            // wildcards inside the value still
1122                                            // work (`%j_n%` etc.) because we
1123                                            // only add wraps, never strip.
1124                                            let raw = match v {
1125                                                serde_json::Value::String(s) => s.clone(),
1126                                                other => other.to_string(),
1127                                            };
1128                                            planned.push((
1129                                                field.into(),
1130                                                "LIKE".into(),
1131                                                JsonParam::Text(format!("%{raw}%")),
1132                                            ));
1133                                        }
1134                                        "$in" => {
1135                                            if let Some(arr) = v.as_array() {
1136                                                if arr.is_empty() {
1137                                                    // `field IN ()` is invalid
1138                                                    // SQL on PG (and on SQLite
1139                                                    // too, technically — its
1140                                                    // path also short-circuits).
1141                                                    // An empty $in matches
1142                                                    // nothing; emit a guaranteed-
1143                                                    // false predicate so the
1144                                                    // parser doesn't choke and
1145                                                    // the result set comes back
1146                                                    // empty.
1147                                                    where_clauses.push("FALSE".into());
1148                                                } else {
1149                                                    let placeholders: Vec<String> = (0..arr.len())
1150                                                        .map(|i| {
1151                                                            format!("${}", planned.len() + 1 + i)
1152                                                        })
1153                                                        .collect();
1154                                                    where_clauses.push(format!(
1155                                                        "{} IN ({})",
1156                                                        quote_ident(field),
1157                                                        placeholders.join(", "),
1158                                                    ));
1159                                                    for x in arr {
1160                                                        planned.push((
1161                                                            format!("__inline_{}", planned.len()),
1162                                                            "__INLINE__".into(),
1163                                                            value_to_pg(x),
1164                                                        ));
1165                                                    }
1166                                                }
1167                                            }
1168                                        }
1169                                        _ => {}
1170                                    }
1171                                }
1172                            }
1173                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1174                        }
1175                    }
1176                }
1177            }
1178
1179            // Materialize planned -> SQL + params.
1180            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1181            for (field, op, v) in &planned {
1182                if op == "__INLINE__" {
1183                    // Already emitted via the IN-clause path; just push the value.
1184                    params.push(v.clone());
1185                } else {
1186                    let placeholder = format!("${}", params.len() + 1);
1187                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1188                    params.push(v.clone());
1189                }
1190            }
1191
1192            let where_sql = if where_clauses.is_empty() {
1193                String::new()
1194            } else {
1195                format!(" WHERE {}", where_clauses.join(" AND "))
1196            };
1197            // Default deterministic order when the caller didn't pass
1198            // `$order` — matches the SQLite path. Without this,
1199            // identical queries return rows in different orders across
1200            // backends, which makes paginated APIs flaky.
1201            let final_order = if order_clause.is_empty() {
1202                format!(" ORDER BY {}", quote_ident("id"))
1203            } else {
1204                order_clause
1205            };
1206            let sql = format!(
1207                "SELECT * FROM {}{}{}{}{}",
1208                quote_ident(entity),
1209                where_sql,
1210                final_order,
1211                limit_clause,
1212                offset_clause,
1213            );
1214
1215            Ok((sql, params))
1216        }
1217
1218        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1219        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1220        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1221        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1222        /// via `date_trunc`), and a flat-equality `where` filter.
1223        ///
1224        /// Spec format (same JSON shape used by the SQLite path):
1225        /// ```json
1226        /// { "count": "*",
1227        ///   "sum": ["amount"],
1228        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1229        ///   "where": {"status": "paid"} }
1230        /// ```
1231        ///
1232        /// `valid_columns` is used to validate every field name before it's
1233        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1234        /// `DataStore` impl in this crate) supplies the entity's column set
1235        /// from the manifest.
1236        pub fn aggregate(
1237            &mut self,
1238            entity: &str,
1239            spec: &serde_json::Value,
1240            valid_columns: &[String],
1241        ) -> Result<serde_json::Value, StorageError> {
1242            let (sql, params, column_names) =
1243                Self::build_aggregate_sql(entity, spec, valid_columns)?;
1244            let pg_params = as_pg_params(&params);
1245            let rows = self
1246                .client
1247                .query(sql.as_str(), &pg_params)
1248                .map_err(pg_err)?;
1249            Ok(aggregate_rows_to_json(&rows, &column_names))
1250        }
1251
1252        /// Build the aggregate `SELECT` SQL + bound params + the
1253        /// expected output column names. Pure: takes the entity's
1254        /// validated column list, returns text. Both the live adapter
1255        /// and the in-tx `PgTxStore` call this so spec parsing
1256        /// (validation, bucket vocabulary, where-clause translation)
1257        /// stays identical regardless of where the query runs.
1258        pub(crate) fn build_aggregate_sql(
1259            entity: &str,
1260            spec: &serde_json::Value,
1261            valid_columns: &[String],
1262        ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1263            let obj = spec.as_object().ok_or_else(|| StorageError {
1264                code: "INVALID_QUERY".into(),
1265                message: "aggregate spec must be a JSON object".into(),
1266            })?;
1267
1268            let validate = |col: &str| -> Result<(), StorageError> {
1269                if col == "id" || valid_columns.iter().any(|c| c == col) {
1270                    Ok(())
1271                } else {
1272                    Err(StorageError {
1273                        code: "UNKNOWN_COLUMN".into(),
1274                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1275                    })
1276                }
1277            };
1278
1279            let mut select_parts: Vec<String> = Vec::new();
1280            let mut result_fields: Vec<String> = Vec::new();
1281
1282            if let Some(count) = obj.get("count") {
1283                match count {
1284                    serde_json::Value::String(s) if s == "*" => {
1285                        select_parts.push("COUNT(*) AS count".into());
1286                        result_fields.push("count".into());
1287                    }
1288                    serde_json::Value::String(field) => {
1289                        validate(field)?;
1290                        let alias = format!("count_{field}");
1291                        select_parts.push(format!(
1292                            "COUNT({}) AS {}",
1293                            quote_ident(field),
1294                            quote_ident(&alias),
1295                        ));
1296                        result_fields.push(alias);
1297                    }
1298                    _ => {}
1299                }
1300            }
1301
1302            for (fn_name, prefix) in [
1303                ("sum", "sum_"),
1304                ("avg", "avg_"),
1305                ("min", "min_"),
1306                ("max", "max_"),
1307            ] {
1308                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1309                    for field in fields {
1310                        if let Some(f) = field.as_str() {
1311                            validate(f)?;
1312                            let alias = format!("{prefix}{f}");
1313                            let sql_fn = fn_name.to_uppercase();
1314                            select_parts.push(format!(
1315                                "{}({}) AS {}",
1316                                sql_fn,
1317                                quote_ident(f),
1318                                quote_ident(&alias),
1319                            ));
1320                            result_fields.push(alias);
1321                        }
1322                    }
1323                }
1324            }
1325
1326            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1327                for field in fields {
1328                    if let Some(f) = field.as_str() {
1329                        validate(f)?;
1330                        let alias = format!("count_distinct_{f}");
1331                        select_parts.push(format!(
1332                            "COUNT(DISTINCT {}) AS {}",
1333                            quote_ident(f),
1334                            quote_ident(&alias),
1335                        ));
1336                        result_fields.push(alias);
1337                    }
1338                }
1339            }
1340
1341            // groupBy: column name or { field, bucket } — same vocabulary as
1342            // the SQLite path. Buckets translate to Postgres `date_trunc`
1343            // (SQLite uses `strftime`); both collapse rows to the bucket
1344            // boundary identically.
1345            let mut group_by: Vec<String> = Vec::new();
1346            let mut group_select: Vec<String> = Vec::new();
1347            let mut group_field_names: Vec<String> = Vec::new();
1348            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1349                for g in groups {
1350                    if let Some(f) = g.as_str() {
1351                        validate(f)?;
1352                        let q = quote_ident(f);
1353                        group_by.push(q.clone());
1354                        group_select.push(q);
1355                        group_field_names.push(f.to_string());
1356                    } else if let Some(spec) = g.as_object() {
1357                        let field =
1358                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1359                                StorageError {
1360                                    code: "INVALID_QUERY".into(),
1361                                    message: "groupBy object spec requires `field`".into(),
1362                                }
1363                            })?;
1364                        validate(field)?;
1365                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1366                        let trunc_unit = match bucket {
1367                            "hour" | "day" | "week" | "month" | "year" => bucket,
1368                            _ => {
1369                                return Err(StorageError {
1370                                    code: "INVALID_QUERY".into(),
1371                                    message: format!(
1372                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1373                                    ),
1374                                });
1375                            }
1376                        };
1377                        let alias = format!("{field}_{bucket}");
1378                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1379                        group_by.push(expr.clone());
1380                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1381                        group_field_names.push(alias);
1382                    }
1383                }
1384            }
1385
1386            let mut full_select = group_select.clone();
1387            full_select.extend(select_parts.iter().cloned());
1388            if full_select.is_empty() {
1389                return Err(StorageError {
1390                    code: "INVALID_QUERY".into(),
1391                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1392                });
1393            }
1394
1395            let mut where_clauses: Vec<String> = Vec::new();
1396            let mut params: Vec<JsonParam> = Vec::new();
1397            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1398                for (k, v) in w {
1399                    validate(k)?;
1400                    let placeholder = format!("${}", params.len() + 1);
1401                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1402                    params.push(value_to_pg(v));
1403                }
1404            }
1405            let where_sql = if where_clauses.is_empty() {
1406                String::new()
1407            } else {
1408                format!(" WHERE {}", where_clauses.join(" AND "))
1409            };
1410            let group_sql = if group_by.is_empty() {
1411                String::new()
1412            } else {
1413                format!(" GROUP BY {}", group_by.join(", "))
1414            };
1415
1416            let sql = format!(
1417                "SELECT {} FROM {}{}{}",
1418                full_select.join(", "),
1419                quote_ident(entity),
1420                where_sql,
1421                group_sql,
1422            );
1423
1424            let column_names: Vec<String> = group_field_names
1425                .iter()
1426                .chain(result_fields.iter())
1427                .cloned()
1428                .collect();
1429
1430            Ok((sql, params, column_names))
1431        }
1432    }
1433
1434    /// Project rows from an aggregate `SELECT` into the
1435    /// `{ rows: [{...}] }` JSON shape both the SQLite path and the
1436    /// PG path return. Pure post-processing — works on rows produced
1437    /// from either `Client::query` or `Transaction::query`.
1438    pub fn aggregate_rows_to_json(
1439        rows: &[postgres::Row],
1440        column_names: &[String],
1441    ) -> serde_json::Value {
1442        let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1443        for row in rows {
1444            let row_json = row_to_json(row);
1445            if let serde_json::Value::Object(map) = &row_json {
1446                let mut filtered = serde_json::Map::new();
1447                for name in column_names {
1448                    if let Some(v) = map.get(name) {
1449                        filtered.insert(name.clone(), v.clone());
1450                    }
1451                }
1452                out.push(serde_json::Value::Object(filtered));
1453            } else {
1454                out.push(row_json);
1455            }
1456        }
1457        serde_json::json!({ "rows": out })
1458    }
1459
1460    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1461    pub enum TxOp<'a> {
1462        Insert {
1463            entity: &'a str,
1464            data: &'a serde_json::Value,
1465        },
1466        Update {
1467            entity: &'a str,
1468            id: &'a str,
1469            data: &'a serde_json::Value,
1470        },
1471        Delete {
1472            entity: &'a str,
1473            id: &'a str,
1474        },
1475    }
1476
1477    /// Result of a single op inside a transaction.
1478    #[derive(Debug, Clone)]
1479    pub enum TxResult {
1480        Inserted(String),
1481        Updated(bool),
1482        Deleted(bool),
1483    }
1484
1485    impl LivePostgresAdapter {
1486        /// Run `ops` inside a single Postgres transaction. Either all of them
1487        /// commit together or none of them do — there is no partial state on
1488        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1489        /// guard is dropped without `commit()` being called.
1490        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1491            let mut tx = self.client.transaction().map_err(pg_err)?;
1492            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1493
1494            for op in ops {
1495                match op {
1496                    TxOp::Insert { entity, data } => {
1497                        let (sql, values) = build_insert_sql(entity, data)?;
1498                        let id = match &values[0] {
1499                            JsonParam::Text(s) => s.clone(),
1500                            _ => {
1501                                return Err(StorageError {
1502                                    code: "PG_INTERNAL".into(),
1503                                    message: "build_insert_sql produced non-text id param".into(),
1504                                });
1505                            }
1506                        };
1507                        let params = as_pg_params(&values);
1508                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1509                        results.push(TxResult::Inserted(id));
1510                    }
1511                    TxOp::Update { entity, id, data } => {
1512                        let (sql, values) = build_update_sql(entity, id, data)?;
1513                        let params = as_pg_params(&values);
1514                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1515                        results.push(TxResult::Updated(n > 0));
1516                    }
1517                    TxOp::Delete { entity, id } => {
1518                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1519                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1520                        results.push(TxResult::Deleted(n > 0));
1521                    }
1522                }
1523            }
1524
1525            tx.commit().map_err(pg_err)?;
1526            Ok(results)
1527        }
1528    }
1529
1530    /// Lift a JSON value into a typed Postgres parameter. The previous
1531    /// implementation collapsed everything to `String`, which silently
1532    /// stringified ints/bools and turned JSON `null` into `""` for
1533    /// nullable columns. Forwarding through `JsonParam` keeps the column
1534    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1535    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1536        JsonParam::from_json(v)
1537    }
1538
1539    pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1540        use postgres::types::Type;
1541        let mut obj = serde_json::Map::new();
1542        for (i, col) in row.columns().iter().enumerate() {
1543            let name = col.name().to_string();
1544
1545            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1546            // and a panic in a query handler poisons the connection mutex,
1547            // taking down all subsequent reads on this datastore. Anything
1548            // that fails to decode becomes Null with a one-shot warning.
1549            //
1550            // Timestamps and the catch-all path explicitly DON'T request
1551            // `String` — the postgres crate uses binary protocol by default
1552            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1553            // ask for `Vec<u8>` and lossy-stringify, which works for all
1554            // text-shaped columns in either protocol.
1555            let value: serde_json::Value = match *col.type_() {
1556                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1557                    .flatten()
1558                    .map(serde_json::Value::Bool)
1559                    .unwrap_or(serde_json::Value::Null),
1560                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1561                    .flatten()
1562                    .map(|v| serde_json::Value::Number(v.into()))
1563                    .unwrap_or(serde_json::Value::Null),
1564                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1565                    .flatten()
1566                    .map(|v| serde_json::Value::Number(v.into()))
1567                    .unwrap_or(serde_json::Value::Null),
1568                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1569                    .flatten()
1570                    .map(|v| serde_json::Value::Number(v.into()))
1571                    .unwrap_or(serde_json::Value::Null),
1572                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1573                    .flatten()
1574                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1575                    .map(serde_json::Value::Number)
1576                    .unwrap_or(serde_json::Value::Null),
1577                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1578                    .flatten()
1579                    .and_then(serde_json::Number::from_f64)
1580                    .map(serde_json::Value::Number)
1581                    .unwrap_or(serde_json::Value::Null),
1582                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1583                    .flatten()
1584                    .unwrap_or(serde_json::Value::Null),
1585                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1586                    .flatten()
1587                    .map(|b| serde_json::Value::String(b64(&b)))
1588                    .unwrap_or(serde_json::Value::Null),
1589                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1590                    try_get_or_null::<Option<String>>(row, i)
1591                        .flatten()
1592                        .map(serde_json::Value::String)
1593                        .unwrap_or(serde_json::Value::Null)
1594                }
1595                Type::TIMESTAMPTZ => {
1596                    // Decode via chrono::DateTime<Utc> (postgres's
1597                    // `with-chrono-0_4` feature provides FromSql) and
1598                    // re-format as ISO 8601 — the shape pylon's clients
1599                    // expect (matches `pylon_kernel::util::now_iso`,
1600                    // so timestamps round-trip with the same surface
1601                    // across SQLite + PG).
1602                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1603                        .flatten()
1604                        .map(|dt| {
1605                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1606                        })
1607                        .unwrap_or(serde_json::Value::Null)
1608                }
1609                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1610                    .flatten()
1611                    .map(|dt| {
1612                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1613                    })
1614                    .unwrap_or(serde_json::Value::Null),
1615                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1616                    .flatten()
1617                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1618                    .unwrap_or(serde_json::Value::Null),
1619                _ => {
1620                    // Last resort: ask Postgres to render anything else as
1621                    // text via a stringifying decode through Vec<u8>. If even
1622                    // that fails (rare — Postgres types not implementing the
1623                    // text format), fall through to Null with a warning.
1624                    match row.try_get::<_, Option<String>>(i) {
1625                        Ok(Some(s)) => serde_json::Value::String(s),
1626                        Ok(None) => serde_json::Value::Null,
1627                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1628                            Ok(Some(bytes)) => serde_json::Value::String(
1629                                String::from_utf8_lossy(&bytes).into_owned(),
1630                            ),
1631                            _ => serde_json::Value::Null,
1632                        },
1633                    }
1634                }
1635            };
1636            obj.insert(name, value);
1637        }
1638        serde_json::Value::Object(obj)
1639    }
1640
1641    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1642    where
1643        T: postgres::types::FromSql<'a>,
1644    {
1645        match row.try_get::<_, T>(i) {
1646            Ok(v) => Some(v),
1647            Err(e) => {
1648                tracing::warn!(
1649                    "[postgres] decode failed for column {} ({}): {e}",
1650                    i,
1651                    row.columns()[i].name()
1652                );
1653                None
1654            }
1655        }
1656    }
1657
1658    /// Minimal base64 encoder so we don't need another dependency just for
1659    /// the BYTEA column edge case.
1660    fn b64(bytes: &[u8]) -> String {
1661        const TABLE: &[u8; 64] =
1662            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1663        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1664        let chunks = bytes.chunks(3);
1665        for chunk in chunks {
1666            let b = [
1667                chunk.first().copied().unwrap_or(0),
1668                chunk.get(1).copied().unwrap_or(0),
1669                chunk.get(2).copied().unwrap_or(0),
1670            ];
1671            out.push(TABLE[(b[0] >> 2) as usize] as char);
1672            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1673            if chunk.len() > 1 {
1674                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1675            } else {
1676                out.push('=');
1677            }
1678            if chunk.len() > 2 {
1679                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1680            } else {
1681                out.push('=');
1682            }
1683        }
1684        out
1685    }
1686
1687    fn pg_err(e: postgres::Error) -> StorageError {
1688        // postgres::Error's Display is intentionally short ("db error",
1689        // "connection error" etc.) — the actual SQLSTATE / detail lives
1690        // on the source chain. Walk the chain so the final message has
1691        // enough signal to debug a failed insert/update without
1692        // attaching a debugger.
1693        use std::error::Error;
1694        let mut detail = format!("{e}");
1695        let mut src: Option<&dyn Error> = e.source();
1696        while let Some(s) = src {
1697            detail.push_str(": ");
1698            detail.push_str(&format!("{s}"));
1699            src = s.source();
1700        }
1701        StorageError {
1702            code: "PG_QUERY_FAILED".into(),
1703            message: format!("Postgres query failed: {detail}"),
1704        }
1705    }
1706}
1707
1708// ---------------------------------------------------------------------------
1709// Tests
1710// ---------------------------------------------------------------------------
1711
1712#[cfg(test)]
1713mod tests {
1714    use super::*;
1715
1716    /// Hand-rolled fixture that matches the snapshots in the tests
1717    /// below. Decoupled from any example's `pylon.manifest.json` so
1718    /// changing an example schema doesn't bleed into adapter tests.
1719    fn test_manifest() -> AppManifest {
1720        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1721        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1722            name: name.into(),
1723            field_type: ty.into(),
1724            optional: opt,
1725            unique: uniq,
1726            crdt: None,
1727        };
1728        AppManifest {
1729            manifest_version: 1,
1730            name: "test".into(),
1731            version: "0.0.0".into(),
1732            entities: vec![
1733                ManifestEntity {
1734                    name: "User".into(),
1735                    fields: vec![
1736                        f("email", "string", false, true),
1737                        f("displayName", "string", false, false),
1738                        f("createdAt", "datetime", false, false),
1739                    ],
1740                    indexes: vec![],
1741                    relations: vec![],
1742                    search: None,
1743                    crdt: true,
1744                },
1745                ManifestEntity {
1746                    name: "Todo".into(),
1747                    fields: vec![
1748                        f("title", "string", false, false),
1749                        f("done", "bool", false, false),
1750                        f("userId", "id(User)", false, false),
1751                        f("createdAt", "datetime", false, false),
1752                    ],
1753                    indexes: vec![ManifestIndex {
1754                        name: "by_user".into(),
1755                        fields: vec!["userId".into()],
1756                        unique: false,
1757                    }],
1758                    relations: vec![],
1759                    search: None,
1760                    crdt: true,
1761                },
1762            ],
1763            queries: vec![],
1764            actions: vec![],
1765            policies: vec![],
1766            routes: vec![],
1767            auth: Default::default(),
1768        }
1769    }
1770
1771    #[test]
1772    fn pg_type_mapping() {
1773        assert_eq!(pg_column_type("string"), "TEXT");
1774        assert_eq!(pg_column_type("int"), "INTEGER");
1775        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1776        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1777        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1778        assert_eq!(pg_column_type("richtext"), "TEXT");
1779        assert_eq!(pg_column_type("id(User)"), "TEXT");
1780    }
1781
1782    #[test]
1783    fn quote_ident_simple() {
1784        assert_eq!(quote_ident("User"), "\"User\"");
1785        assert_eq!(quote_ident("email"), "\"email\"");
1786    }
1787
1788    #[test]
1789    fn quote_ident_escapes_embedded_double_quotes() {
1790        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1791        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1792    }
1793
1794    #[test]
1795    fn create_table_sql_basic() {
1796        let fields = vec![
1797            FieldSpec {
1798                name: "email".into(),
1799                field_type: "string".into(),
1800                optional: false,
1801                unique: true,
1802            },
1803            FieldSpec {
1804                name: "age".into(),
1805                field_type: "int".into(),
1806                optional: true,
1807                unique: false,
1808            },
1809        ];
1810        let sql = create_table_sql("User", &fields);
1811        assert_eq!(
1812            sql,
1813            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1814        );
1815    }
1816
1817    #[test]
1818    fn create_table_sql_escapes_identifiers() {
1819        let fields = vec![FieldSpec {
1820            name: "col\"x".into(),
1821            field_type: "string".into(),
1822            optional: false,
1823            unique: false,
1824        }];
1825        let sql = create_table_sql("my\"table", &fields);
1826        assert!(sql.contains("\"my\"\"table\""));
1827        assert!(sql.contains("\"col\"\"x\""));
1828    }
1829
1830    #[test]
1831    fn create_index_sql_unique() {
1832        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1833        assert_eq!(
1834            sql,
1835            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1836        );
1837    }
1838
1839    #[test]
1840    fn create_index_sql_non_unique() {
1841        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1842        assert_eq!(
1843            sql,
1844            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1845        );
1846    }
1847
1848    #[test]
1849    fn add_column_sql_basic() {
1850        let field = FieldSpec {
1851            name: "bio".into(),
1852            field_type: "string".into(),
1853            optional: true,
1854            unique: false,
1855        };
1856        let sql = add_column_sql("User", &field);
1857        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1858    }
1859
1860    #[test]
1861    fn plan_from_manifest() {
1862        let adapter = PostgresAdapter;
1863        let manifest = test_manifest();
1864        let plan = adapter.plan_schema(&manifest).unwrap();
1865
1866        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1867        assert!(plan.operations.iter().any(|op| matches!(
1868            op,
1869            SchemaOperation::CreateEntity { name, .. } if name == "User"
1870        )));
1871        assert!(plan.operations.iter().any(|op| matches!(
1872            op,
1873            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1874        )));
1875        assert!(plan.operations.iter().any(|op| matches!(
1876            op,
1877            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1878        )));
1879    }
1880
1881    #[test]
1882    fn plan_to_sql_produces_statements() {
1883        let adapter = PostgresAdapter;
1884        let manifest = test_manifest();
1885        let plan = adapter.plan_schema(&manifest).unwrap();
1886        let stmts = plan_to_sql(&plan).unwrap();
1887
1888        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1889        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1890        // declares a unique by_email index on User which lands as part of
1891        // the table. Final count: 2 tables + 2 indexes.
1892        let create_tables = stmts
1893            .iter()
1894            .filter(|s| s.starts_with("CREATE TABLE"))
1895            .count();
1896        let create_indexes = stmts
1897            .iter()
1898            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1899            .count();
1900        assert_eq!(create_tables, 2);
1901        assert!(create_indexes >= 1);
1902        assert!(stmts[0].starts_with("CREATE TABLE"));
1903        assert!(stmts[1].starts_with("CREATE TABLE"));
1904    }
1905
1906    #[test]
1907    fn plan_to_sql_rejects_unsupported() {
1908        let plan = SchemaPlan {
1909            operations: vec![SchemaOperation::RemoveEntity {
1910                name: "User".into(),
1911            }],
1912        };
1913        let result = plan_to_sql(&plan);
1914        assert!(result.is_err());
1915        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1916    }
1917
1918    #[test]
1919    fn apply_not_implemented() {
1920        let adapter = PostgresAdapter;
1921        let plan = SchemaPlan {
1922            operations: vec![SchemaOperation::Noop],
1923        };
1924        let result = adapter.apply_schema(&plan);
1925        assert!(result.is_err());
1926        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1927    }
1928
1929    #[test]
1930    fn sql_uses_quoted_identifiers() {
1931        let fields = vec![FieldSpec {
1932            name: "createdAt".into(),
1933            field_type: "datetime".into(),
1934            optional: false,
1935            unique: false,
1936        }];
1937        let sql = create_table_sql("User", &fields);
1938        // Postgres identifiers should be quoted for case-sensitivity.
1939        assert!(sql.contains("\"User\""));
1940        assert!(sql.contains("\"createdAt\""));
1941        assert!(sql.contains("TIMESTAMPTZ"));
1942    }
1943
1944    // -- Introspection SQL tests --
1945
1946    #[test]
1947    fn introspect_sql_constants_are_valid() {
1948        // Sanity checks that the SQL strings exist and look reasonable.
1949        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1950        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1951        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1952        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1953    }
1954
1955    // -- Plan from snapshot tests --
1956
1957    #[test]
1958    fn plan_from_empty_snapshot_creates_all() {
1959        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1960        let manifest = test_manifest();
1961        let plan = plan_from_snapshot(&snapshot, &manifest);
1962
1963        assert!(plan.operations.iter().any(|op| matches!(
1964            op,
1965            SchemaOperation::CreateEntity { name, .. } if name == "User"
1966        )));
1967        assert!(plan.operations.iter().any(|op| matches!(
1968            op,
1969            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1970        )));
1971        assert!(plan.operations.iter().any(|op| matches!(
1972            op,
1973            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1974        )));
1975    }
1976
1977    #[test]
1978    fn plan_from_full_snapshot_is_noop() {
1979        let snapshot = crate::SchemaSnapshot {
1980            tables: vec![
1981                crate::TableSnapshot {
1982                    name: "User".into(),
1983                    columns: vec![
1984                        crate::ColumnSnapshot {
1985                            name: "id".into(),
1986                            column_type: "TEXT".into(),
1987                            notnull: true,
1988                            primary_key: true,
1989                        },
1990                        crate::ColumnSnapshot {
1991                            name: "email".into(),
1992                            column_type: "TEXT".into(),
1993                            notnull: true,
1994                            primary_key: false,
1995                        },
1996                        crate::ColumnSnapshot {
1997                            name: "displayName".into(),
1998                            column_type: "TEXT".into(),
1999                            notnull: true,
2000                            primary_key: false,
2001                        },
2002                        crate::ColumnSnapshot {
2003                            name: "createdAt".into(),
2004                            column_type: "TIMESTAMPTZ".into(),
2005                            notnull: true,
2006                            primary_key: false,
2007                        },
2008                    ],
2009                    indexes: vec![],
2010                },
2011                crate::TableSnapshot {
2012                    name: "Todo".into(),
2013                    columns: vec![
2014                        crate::ColumnSnapshot {
2015                            name: "id".into(),
2016                            column_type: "TEXT".into(),
2017                            notnull: true,
2018                            primary_key: true,
2019                        },
2020                        crate::ColumnSnapshot {
2021                            name: "title".into(),
2022                            column_type: "TEXT".into(),
2023                            notnull: true,
2024                            primary_key: false,
2025                        },
2026                        crate::ColumnSnapshot {
2027                            name: "done".into(),
2028                            column_type: "BOOLEAN".into(),
2029                            notnull: true,
2030                            primary_key: false,
2031                        },
2032                        crate::ColumnSnapshot {
2033                            name: "userId".into(),
2034                            column_type: "TEXT".into(),
2035                            notnull: true,
2036                            primary_key: false,
2037                        },
2038                        crate::ColumnSnapshot {
2039                            name: "createdAt".into(),
2040                            column_type: "TIMESTAMPTZ".into(),
2041                            notnull: true,
2042                            primary_key: false,
2043                        },
2044                    ],
2045                    indexes: vec![crate::IndexSnapshot {
2046                        name: "Todo_by_user".into(),
2047                        columns: vec!["userId".into()],
2048                        unique: false,
2049                    }],
2050                },
2051            ],
2052        };
2053        let manifest = test_manifest();
2054        let plan = plan_from_snapshot(&snapshot, &manifest);
2055        assert!(plan.is_empty());
2056    }
2057
2058    #[test]
2059    fn plan_detects_missing_column_in_snapshot() {
2060        let snapshot = crate::SchemaSnapshot {
2061            tables: vec![
2062                crate::TableSnapshot {
2063                    name: "User".into(),
2064                    columns: vec![
2065                        crate::ColumnSnapshot {
2066                            name: "id".into(),
2067                            column_type: "TEXT".into(),
2068                            notnull: true,
2069                            primary_key: true,
2070                        },
2071                        crate::ColumnSnapshot {
2072                            name: "email".into(),
2073                            column_type: "TEXT".into(),
2074                            notnull: true,
2075                            primary_key: false,
2076                        },
2077                        // missing displayName and createdAt
2078                    ],
2079                    indexes: vec![],
2080                },
2081                crate::TableSnapshot {
2082                    name: "Todo".into(),
2083                    columns: vec![
2084                        crate::ColumnSnapshot {
2085                            name: "id".into(),
2086                            column_type: "TEXT".into(),
2087                            notnull: true,
2088                            primary_key: true,
2089                        },
2090                        crate::ColumnSnapshot {
2091                            name: "title".into(),
2092                            column_type: "TEXT".into(),
2093                            notnull: true,
2094                            primary_key: false,
2095                        },
2096                        crate::ColumnSnapshot {
2097                            name: "done".into(),
2098                            column_type: "BOOLEAN".into(),
2099                            notnull: true,
2100                            primary_key: false,
2101                        },
2102                        crate::ColumnSnapshot {
2103                            name: "userId".into(),
2104                            column_type: "TEXT".into(),
2105                            notnull: true,
2106                            primary_key: false,
2107                        },
2108                        crate::ColumnSnapshot {
2109                            name: "createdAt".into(),
2110                            column_type: "TIMESTAMPTZ".into(),
2111                            notnull: true,
2112                            primary_key: false,
2113                        },
2114                    ],
2115                    indexes: vec![crate::IndexSnapshot {
2116                        name: "Todo_by_user".into(),
2117                        columns: vec!["userId".into()],
2118                        unique: false,
2119                    }],
2120                },
2121            ],
2122        };
2123        let manifest = test_manifest();
2124        let plan = plan_from_snapshot(&snapshot, &manifest);
2125
2126        let add_fields: Vec<_> = plan
2127            .operations
2128            .iter()
2129            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2130            .collect();
2131        assert_eq!(add_fields.len(), 2); // displayName + createdAt
2132    }
2133
2134    // -- CRUD helper tests (no live database required) --
2135
2136    #[test]
2137    fn json_value_to_string_handles_all_types() {
2138        assert_eq!(
2139            json_value_to_string(&serde_json::Value::String("hello".into())),
2140            "hello"
2141        );
2142        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2143        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2144        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2145        assert_eq!(
2146            json_value_to_string(&serde_json::Value::Bool(false)),
2147            "false"
2148        );
2149        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2150        // Arrays and objects get their JSON representation.
2151        assert_eq!(
2152            json_value_to_string(&serde_json::json!([1, 2, 3])),
2153            "[1,2,3]"
2154        );
2155        assert_eq!(
2156            json_value_to_string(&serde_json::json!({"a": 1})),
2157            "{\"a\":1}"
2158        );
2159    }
2160
2161    #[test]
2162    fn generate_id_returns_hex_string() {
2163        let id = generate_id();
2164        assert!(!id.is_empty());
2165        // Must be valid hex characters.
2166        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2167    }
2168
2169    #[test]
2170    fn generate_id_is_unique_across_calls() {
2171        let id1 = generate_id();
2172        let id2 = generate_id();
2173        assert_ne!(id1, id2);
2174    }
2175
2176    #[test]
2177    fn generate_id_is_lex_sortable() {
2178        // 1000 IDs back-to-back must come out in monotonically increasing
2179        // lexicographic order. This is what makes cursor pagination correct.
2180        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2181        let sorted = {
2182            let mut s = ids.clone();
2183            s.sort();
2184            s
2185        };
2186        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2187        // And every id must be the same width (otherwise lex comparison is
2188        // wrong at width boundaries).
2189        let len0 = ids[0].len();
2190        assert!(ids.iter().all(|id| id.len() == len0));
2191        ids.dedup();
2192        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2193    }
2194
2195    #[test]
2196    fn build_insert_sql_simple() {
2197        let data = serde_json::json!({
2198            "email": "alice@example.com",
2199            "displayName": "Alice"
2200        });
2201        let (sql, values) = build_insert_sql("User", &data).unwrap();
2202
2203        assert!(sql.starts_with("INSERT INTO \"User\""));
2204        assert!(sql.contains("id"));
2205        assert!(sql.contains("$1"));
2206        assert!(sql.contains("$2"));
2207        assert!(sql.contains("$3"));
2208        // First value is the generated ID — JsonParam::Text variant.
2209        match &values[0] {
2210            JsonParam::Text(s) => assert!(!s.is_empty()),
2211            other => panic!("expected Text id param, got {other:?}"),
2212        }
2213        assert_eq!(values.len(), 3); // id + 2 fields
2214    }
2215
2216    #[test]
2217    fn build_insert_sql_preserves_json_types() {
2218        let data = serde_json::json!({
2219            "n": 42,
2220            "f": 1.5,
2221            "b": true,
2222            "s": "hi",
2223            "z": null,
2224        });
2225        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2226        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2227        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2228        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2229        assert!(matches!(kinds[1], JsonParam::Float(_)));
2230        assert!(matches!(kinds[2], JsonParam::Int(42)));
2231        assert!(matches!(kinds[3], JsonParam::Text(_)));
2232        assert!(matches!(kinds[4], JsonParam::Null));
2233    }
2234
2235    #[test]
2236    fn build_insert_sql_quotes_column_names() {
2237        let data = serde_json::json!({"createdAt": "2026-01-01"});
2238        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2239        assert!(sql.contains("\"createdAt\""));
2240        assert!(sql.contains("\"Todo\""));
2241    }
2242
2243    #[test]
2244    fn build_insert_sql_rejects_non_object() {
2245        let data = serde_json::json!("not an object");
2246        let result = build_insert_sql("User", &data);
2247        assert!(result.is_err());
2248        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2249    }
2250
2251    #[test]
2252    fn build_update_sql_simple() {
2253        let data = serde_json::json!({
2254            "displayName": "Bob",
2255            "email": "bob@example.com"
2256        });
2257        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2258
2259        assert!(sql.starts_with("UPDATE \"User\" SET"));
2260        assert!(sql.contains("WHERE id = $1"));
2261        assert!(sql.contains("$2"));
2262        assert!(sql.contains("$3"));
2263        match &values[0] {
2264            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2265            other => panic!("expected Text id param, got {other:?}"),
2266        }
2267        assert_eq!(values.len(), 3); // id + 2 fields
2268    }
2269
2270    #[test]
2271    fn build_update_sql_quotes_column_names() {
2272        let data = serde_json::json!({"displayName": "Carol"});
2273        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2274        assert!(sql.contains("\"displayName\" = $2"));
2275    }
2276
2277    #[test]
2278    fn build_update_sql_rejects_non_object() {
2279        let data = serde_json::json!(42);
2280        let result = build_update_sql("User", "id1", &data);
2281        assert!(result.is_err());
2282        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2283    }
2284
2285    #[test]
2286    fn build_update_sql_rejects_empty_object() {
2287        let data = serde_json::json!({});
2288        let err = build_update_sql("User", "id1", &data).unwrap_err();
2289        assert_eq!(err.code, "PG_INVALID_DATA");
2290        assert!(err.message.contains("at least one field"));
2291    }
2292}