Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AddIndex {
166                entity,
167                name,
168                fields,
169                unique,
170            } => {
171                statements.push(create_index_sql(entity, name, fields, *unique));
172            }
173            SchemaOperation::Noop => {}
174            other => {
175                return Err(StorageError {
176                    code: "PG_OP_UNSUPPORTED".into(),
177                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
178                });
179            }
180        }
181    }
182
183    Ok(statements)
184}
185
186// ---------------------------------------------------------------------------
187// Introspection SQL helpers
188//
189// These generate the SQL queries that a live Postgres connection would run
190// to read the current schema. No connection required — just SQL strings.
191// ---------------------------------------------------------------------------
192
193/// SQL to list user tables in the public schema.
194pub const INTROSPECT_TABLES_SQL: &str = "\
195    SELECT table_name \
196    FROM information_schema.tables \
197    WHERE table_schema = 'public' \
198      AND table_type = 'BASE TABLE' \
199      AND table_name NOT LIKE '_pylon_%' \
200    ORDER BY table_name";
201
202/// SQL to list columns for a given table.
203/// Use with parameter: table_name.
204pub const INTROSPECT_COLUMNS_SQL: &str = "\
205    SELECT column_name, data_type, is_nullable, \
206           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207            JOIN information_schema.key_column_usage kcu \
208              ON tc.constraint_name = kcu.constraint_name \
209            WHERE tc.table_name = c.table_name \
210              AND kcu.column_name = c.column_name \
211              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212    FROM information_schema.columns c \
213    WHERE table_schema = 'public' AND table_name = $1 \
214    ORDER BY ordinal_position";
215
216/// SQL to list indexes for a given table.
217/// Use with parameter: table_name.
218pub const INTROSPECT_INDEXES_SQL: &str = "\
219    SELECT i.relname as index_name, \
220           ix.indisunique as is_unique, \
221           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222    FROM pg_index ix \
223    JOIN pg_class t ON t.oid = ix.indrelid \
224    JOIN pg_class i ON i.oid = ix.indexrelid \
225    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226    JOIN pg_namespace n ON n.oid = t.relnamespace \
227    WHERE n.nspname = 'public' \
228      AND t.relname = $1 \
229      AND NOT ix.indisprimary \
230    GROUP BY i.relname, ix.indisunique \
231    ORDER BY i.relname";
232
233/// Plan from a snapshot (reuses the shared plan_from_snapshot).
234/// This allows Postgres to plan incrementally once introspection data is available.
235pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236    crate::plan_from_snapshot(snapshot, target)
237}
238
239// ---------------------------------------------------------------------------
240// CRUD SQL generation helpers (used by live adapter, testable without a DB)
241// ---------------------------------------------------------------------------
242
243/// Generate a lex-sortable, monotonic-ish unique ID.
244///
245/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
246/// of a per-process atomic counter. The counter prevents collisions when two
247/// inserts hit the same nanosecond and — critically — keeps order stable: an
248/// id minted at the same nanosecond is monotonically greater than the
249/// previous one. Width is fixed at 40 chars so lexicographic comparison
250/// matches creation order, which is what cursor pagination relies on.
251pub fn generate_id() -> String {
252    use std::sync::atomic::{AtomicU32, Ordering};
253    use std::time::{SystemTime, UNIX_EPOCH};
254    static COUNTER: AtomicU32 = AtomicU32::new(0);
255    let ts = SystemTime::now()
256        .duration_since(UNIX_EPOCH)
257        .unwrap_or_default()
258        .as_nanos();
259    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260    format!("{ts:032x}{seq:08x}")
261}
262
263/// Convert a JSON value to its string representation for use as a SQL parameter.
264pub fn json_value_to_string(val: &serde_json::Value) -> String {
265    match val {
266        serde_json::Value::String(s) => s.clone(),
267        serde_json::Value::Number(n) => n.to_string(),
268        serde_json::Value::Bool(b) => b.to_string(),
269        serde_json::Value::Null => String::new(),
270        other => other.to_string(),
271    }
272}
273
274/// Build an INSERT SQL statement and collect string parameter values.
275/// Returns `(sql, values)` where `values[0]` is the generated ID.
276pub fn build_insert_sql(
277    entity: &str,
278    data: &serde_json::Value,
279) -> Result<(String, Vec<String>), StorageError> {
280    let id = generate_id();
281    let obj = data.as_object().ok_or_else(|| StorageError {
282        code: "PG_INVALID_DATA".into(),
283        message: "Insert data must be a JSON object".into(),
284    })?;
285
286    let mut col_names = vec!["id".to_string()];
287    let mut placeholders = vec!["$1".to_string()];
288    let mut values: Vec<String> = vec![id];
289
290    for (i, (key, val)) in obj.iter().enumerate() {
291        col_names.push(quote_ident(key));
292        placeholders.push(format!("${}", i + 2));
293        values.push(json_value_to_string(val));
294    }
295
296    let sql = format!(
297        "INSERT INTO {} ({}) VALUES ({})",
298        quote_ident(entity),
299        col_names.join(", "),
300        placeholders.join(", ")
301    );
302
303    Ok((sql, values))
304}
305
306/// Build an UPDATE SQL statement and collect string parameter values.
307/// Returns `(sql, values)` where `values[0]` is the row ID.
308pub fn build_update_sql(
309    entity: &str,
310    id: &str,
311    data: &serde_json::Value,
312) -> Result<(String, Vec<String>), StorageError> {
313    let obj = data.as_object().ok_or_else(|| StorageError {
314        code: "PG_INVALID_DATA".into(),
315        message: "Update data must be a JSON object".into(),
316    })?;
317
318    if obj.is_empty() {
319        return Err(StorageError {
320            code: "PG_INVALID_DATA".into(),
321            message: "Update data must contain at least one field".into(),
322        });
323    }
324
325    let mut set_clauses = Vec::new();
326    let mut values: Vec<String> = vec![id.to_string()];
327
328    for (i, (key, val)) in obj.iter().enumerate() {
329        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
330        values.push(json_value_to_string(val));
331    }
332
333    let sql = format!(
334        "UPDATE {} SET {} WHERE id = $1",
335        quote_ident(entity),
336        set_clauses.join(", ")
337    );
338
339    Ok((sql, values))
340}
341
342// ---------------------------------------------------------------------------
343// Live Postgres adapter (requires "postgres-live" feature)
344// ---------------------------------------------------------------------------
345
346#[cfg(feature = "postgres-live")]
347pub mod live {
348    use super::*;
349    use crate::{
350        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
351    };
352
353    /// A live Postgres adapter with a real database connection.
354    pub struct LivePostgresAdapter {
355        client: postgres::Client,
356    }
357
358    impl LivePostgresAdapter {
359        /// Connect to a Postgres database.
360        pub fn connect(url: &str) -> Result<Self, StorageError> {
361            let client =
362                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
363                    code: "PG_CONNECT_FAILED".into(),
364                    message: format!("Failed to connect to Postgres: {e}"),
365                })?;
366            Ok(Self { client })
367        }
368
369        /// Read the current schema from the live database.
370        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
371            let table_rows = self
372                .client
373                .query(INTROSPECT_TABLES_SQL, &[])
374                .map_err(pg_err)?;
375
376            let mut tables = Vec::new();
377            for row in &table_rows {
378                let table_name: String = row.get(0);
379                let columns = self.read_columns(&table_name)?;
380                let indexes = self.read_indexes(&table_name)?;
381                tables.push(TableSnapshot {
382                    name: table_name,
383                    columns,
384                    indexes,
385                });
386            }
387
388            Ok(SchemaSnapshot { tables })
389        }
390
391        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
392            let rows = self
393                .client
394                .query(INTROSPECT_COLUMNS_SQL, &[&table])
395                .map_err(pg_err)?;
396
397            let mut columns = Vec::new();
398            for row in &rows {
399                let name: String = row.get(0);
400                let data_type: String = row.get(1);
401                let is_nullable: String = row.get(2);
402                let is_pk: i64 = row.get(3);
403                columns.push(ColumnSnapshot {
404                    name,
405                    column_type: data_type,
406                    notnull: is_nullable == "NO",
407                    primary_key: is_pk > 0,
408                });
409            }
410            Ok(columns)
411        }
412
413        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
414            let rows = self
415                .client
416                .query(INTROSPECT_INDEXES_SQL, &[&table])
417                .map_err(pg_err)?;
418
419            let mut indexes = Vec::new();
420            for row in &rows {
421                let name: String = row.get(0);
422                let unique: bool = row.get(1);
423                let columns: Vec<String> = row.get(2);
424                indexes.push(IndexSnapshot {
425                    name,
426                    columns,
427                    unique,
428                });
429            }
430            Ok(indexes)
431        }
432
433        /// Plan from live database state.
434        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
435            let snapshot = self.read_schema()?;
436            Ok(crate::plan_from_snapshot(&snapshot, target))
437        }
438    }
439
440    impl StorageAdapter for LivePostgresAdapter {
441        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
442            Err(StorageError {
443                code: "PG_PLAN_NEEDS_MUTABLE".into(),
444                message: "Use plan_from_live() instead for live Postgres planning".into(),
445            })
446        }
447
448        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
449            Err(StorageError {
450                code: "PG_APPLY_USE_METHOD".into(),
451                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
452            })
453        }
454    }
455
456    impl LivePostgresAdapter {
457        /// Apply a schema plan to the live database.
458        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
459            let statements = plan_to_sql(plan)?;
460            for sql in &statements {
461                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
462            }
463            Ok(())
464        }
465
466        /// Insert a row. Returns the generated ID.
467        pub fn insert(
468            &mut self,
469            entity: &str,
470            data: &serde_json::Value,
471        ) -> Result<String, StorageError> {
472            let (sql, values) = build_insert_sql(entity, data)?;
473            let id = values[0].clone();
474
475            let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
476                .iter()
477                .map(|v| v as &(dyn postgres::types::ToSql + Sync))
478                .collect();
479
480            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
481            Ok(id)
482        }
483
484        /// Get a row by ID.
485        pub fn get_by_id(
486            &mut self,
487            entity: &str,
488            id: &str,
489        ) -> Result<Option<serde_json::Value>, StorageError> {
490            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
491            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
492
493            match rows.first() {
494                Some(row) => Ok(Some(row_to_json(row))),
495                None => Ok(None),
496            }
497        }
498
499        /// List all rows from an entity.
500        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
501            let sql = format!("SELECT * FROM {}", quote_ident(entity));
502            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
503
504            Ok(rows.iter().map(row_to_json).collect())
505        }
506
507        /// Cursor-paginated list. `after` is the last `id` from the previous
508        /// page; the result contains rows with `id > after` (lex order),
509        /// limited to `limit`. Used for sync push/pull.
510        pub fn list_after(
511            &mut self,
512            entity: &str,
513            after: Option<&str>,
514            limit: usize,
515        ) -> Result<Vec<serde_json::Value>, StorageError> {
516            // Cap limit at a sensible upper bound so a malicious client can't
517            // stream the whole table by passing limit=u64::MAX.
518            let capped: i64 = limit.min(10_000) as i64;
519            let sql = match after {
520                Some(_) => format!(
521                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
522                    quote_ident(entity)
523                ),
524                None => format!(
525                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
526                    quote_ident(entity)
527                ),
528            };
529            let rows = match after {
530                Some(cursor) => self
531                    .client
532                    .query(sql.as_str(), &[&cursor, &capped])
533                    .map_err(pg_err)?,
534                None => self
535                    .client
536                    .query(sql.as_str(), &[&capped])
537                    .map_err(pg_err)?,
538            };
539            Ok(rows.iter().map(row_to_json).collect())
540        }
541
542        /// Update a row by ID. Returns true if the row was found and updated.
543        pub fn update(
544            &mut self,
545            entity: &str,
546            id: &str,
547            data: &serde_json::Value,
548        ) -> Result<bool, StorageError> {
549            let (sql, values) = build_update_sql(entity, id, data)?;
550
551            let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
552                .iter()
553                .map(|v| v as &(dyn postgres::types::ToSql + Sync))
554                .collect();
555
556            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
557            Ok(rows_affected > 0)
558        }
559
560        /// Delete a row by ID. Returns true if the row was found and deleted.
561        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
562            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
563            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
564            Ok(rows_affected > 0)
565        }
566
567        /// Look up a row by `field = value`. Caller must validate `field`
568        /// against the manifest before calling — we still `quote_ident` it
569        /// but won't catch a typo against the entity definition.
570        pub fn lookup_field(
571            &mut self,
572            entity: &str,
573            field: &str,
574            value: &str,
575        ) -> Result<Option<serde_json::Value>, StorageError> {
576            let sql = format!(
577                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
578                quote_ident(entity),
579                quote_ident(field),
580            );
581            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
582            Ok(rows.first().map(row_to_json))
583        }
584
585        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
586        ///
587        /// Supported operators: equality (`field: value`), `$gt`, `$gte`,
588        /// `$lt`, `$lte`, `$like`, `$in: [..]`, plus the meta operators
589        /// `$order: { field: "asc"|"desc" }`, `$limit`, `$offset`.
590        ///
591        /// Anything else is silently ignored (matches the in-memory fallback's
592        /// permissive behavior). Field names are validated against `valid_columns`
593        /// to prevent SQL injection — pass the entity's column set.
594        pub fn query_filtered(
595            &mut self,
596            entity: &str,
597            filter: &serde_json::Value,
598            valid_columns: &[String],
599        ) -> Result<Vec<serde_json::Value>, StorageError> {
600            let empty = serde_json::Map::new();
601            let obj = filter.as_object().unwrap_or(&empty);
602
603            let validate = |col: &str| -> Result<(), StorageError> {
604                if col == "id" || valid_columns.iter().any(|c| c == col) {
605                    Ok(())
606                } else {
607                    Err(StorageError {
608                        code: "UNKNOWN_COLUMN".into(),
609                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
610                    })
611                }
612            };
613
614            let mut where_clauses: Vec<String> = Vec::new();
615            let mut order_clause = String::new();
616            let mut limit_clause = String::new();
617            let mut offset_clause = String::new();
618            // Collect (col, op, value) so placeholder numbers can be assigned
619            // in a single materialization pass after the parse loop.
620            let mut planned: Vec<(String, String, String)> = Vec::new();
621
622            for (key, val) in obj {
623                match key.as_str() {
624                    "$order" => {
625                        if let Some(ord) = val.as_object() {
626                            let mut parts = Vec::new();
627                            for (col, dir) in ord {
628                                validate(col)?;
629                                let d = match dir.as_str().unwrap_or("asc") {
630                                    "desc" | "DESC" => "DESC",
631                                    _ => "ASC",
632                                };
633                                parts.push(format!("{} {d}", quote_ident(col)));
634                            }
635                            if !parts.is_empty() {
636                                order_clause = format!(" ORDER BY {}", parts.join(", "));
637                            }
638                        }
639                    }
640                    "$limit" => {
641                        if let Some(n) = val.as_u64() {
642                            limit_clause = format!(" LIMIT {}", n);
643                        }
644                    }
645                    "$offset" => {
646                        if let Some(n) = val.as_u64() {
647                            offset_clause = format!(" OFFSET {}", n);
648                        }
649                    }
650                    field => {
651                        validate(field)?;
652                        match val {
653                            serde_json::Value::Object(ops) => {
654                                for (op, v) in ops {
655                                    match op.as_str() {
656                                        "$gt" => {
657                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
658                                        }
659                                        "$gte" => planned.push((
660                                            field.into(),
661                                            ">=".into(),
662                                            value_to_pg(v),
663                                        )),
664                                        "$lt" => {
665                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
666                                        }
667                                        "$lte" => planned.push((
668                                            field.into(),
669                                            "<=".into(),
670                                            value_to_pg(v),
671                                        )),
672                                        "$like" => planned.push((
673                                            field.into(),
674                                            "LIKE".into(),
675                                            value_to_pg(v),
676                                        )),
677                                        "$in" => {
678                                            if let Some(arr) = v.as_array() {
679                                                let placeholders: Vec<String> = (0..arr.len())
680                                                    .map(|i| format!("${}", planned.len() + 1 + i))
681                                                    .collect();
682                                                where_clauses.push(format!(
683                                                    "{} IN ({})",
684                                                    quote_ident(field),
685                                                    placeholders.join(", "),
686                                                ));
687                                                for x in arr {
688                                                    planned.push((
689                                                        format!("__inline_{}", planned.len()),
690                                                        "__INLINE__".into(),
691                                                        value_to_pg(x),
692                                                    ));
693                                                }
694                                            }
695                                        }
696                                        _ => {}
697                                    }
698                                }
699                            }
700                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
701                        }
702                    }
703                }
704            }
705
706            // Materialize planned -> SQL + params.
707            let mut params: Vec<String> = Vec::with_capacity(planned.len());
708            for (field, op, v) in &planned {
709                if op == "__INLINE__" {
710                    // Already emitted via the IN-clause path; just push the value.
711                    params.push(v.clone());
712                } else {
713                    let placeholder = format!("${}", params.len() + 1);
714                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
715                    params.push(v.clone());
716                }
717            }
718
719            let where_sql = if where_clauses.is_empty() {
720                String::new()
721            } else {
722                format!(" WHERE {}", where_clauses.join(" AND "))
723            };
724            let sql = format!(
725                "SELECT * FROM {}{}{}{}{}",
726                quote_ident(entity),
727                where_sql,
728                order_clause,
729                limit_clause,
730                offset_clause,
731            );
732
733            let pg_params: Vec<&(dyn postgres::types::ToSql + Sync)> = params
734                .iter()
735                .map(|s| s as &(dyn postgres::types::ToSql + Sync))
736                .collect();
737
738            let rows = self
739                .client
740                .query(sql.as_str(), &pg_params)
741                .map_err(pg_err)?;
742            Ok(rows.iter().map(row_to_json).collect())
743        }
744    }
745
746    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
747    pub enum TxOp<'a> {
748        Insert {
749            entity: &'a str,
750            data: &'a serde_json::Value,
751        },
752        Update {
753            entity: &'a str,
754            id: &'a str,
755            data: &'a serde_json::Value,
756        },
757        Delete {
758            entity: &'a str,
759            id: &'a str,
760        },
761    }
762
763    /// Result of a single op inside a transaction.
764    #[derive(Debug, Clone)]
765    pub enum TxResult {
766        Inserted(String),
767        Updated(bool),
768        Deleted(bool),
769    }
770
771    impl LivePostgresAdapter {
772        /// Run `ops` inside a single Postgres transaction. Either all of them
773        /// commit together or none of them do — there is no partial state on
774        /// failure. The ROLLBACK happens implicitly when the `Transaction`
775        /// guard is dropped without `commit()` being called.
776        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
777            let mut tx = self.client.transaction().map_err(pg_err)?;
778            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
779
780            for op in ops {
781                match op {
782                    TxOp::Insert { entity, data } => {
783                        let (sql, values) = build_insert_sql(entity, data)?;
784                        let id = values[0].clone();
785                        let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
786                            .iter()
787                            .map(|v| v as &(dyn postgres::types::ToSql + Sync))
788                            .collect();
789                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
790                        results.push(TxResult::Inserted(id));
791                    }
792                    TxOp::Update { entity, id, data } => {
793                        let (sql, values) = build_update_sql(entity, id, data)?;
794                        let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
795                            .iter()
796                            .map(|v| v as &(dyn postgres::types::ToSql + Sync))
797                            .collect();
798                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
799                        results.push(TxResult::Updated(n > 0));
800                    }
801                    TxOp::Delete { entity, id } => {
802                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
803                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
804                        results.push(TxResult::Deleted(n > 0));
805                    }
806                }
807            }
808
809            tx.commit().map_err(pg_err)?;
810            Ok(results)
811        }
812    }
813
814    fn value_to_pg(v: &serde_json::Value) -> String {
815        match v {
816            serde_json::Value::String(s) => s.clone(),
817            serde_json::Value::Number(n) => n.to_string(),
818            serde_json::Value::Bool(b) => b.to_string(),
819            serde_json::Value::Null => String::new(),
820            other => other.to_string(),
821        }
822    }
823
824    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
825        use postgres::types::Type;
826        let mut obj = serde_json::Map::new();
827        for (i, col) in row.columns().iter().enumerate() {
828            let name = col.name().to_string();
829
830            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
831            // and a panic in a query handler poisons the connection mutex,
832            // taking down all subsequent reads on this datastore. Anything
833            // that fails to decode becomes Null with a one-shot warning.
834            //
835            // Timestamps and the catch-all path explicitly DON'T request
836            // `String` — the postgres crate uses binary protocol by default
837            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
838            // ask for `Vec<u8>` and lossy-stringify, which works for all
839            // text-shaped columns in either protocol.
840            let value: serde_json::Value = match *col.type_() {
841                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
842                    .flatten()
843                    .map(serde_json::Value::Bool)
844                    .unwrap_or(serde_json::Value::Null),
845                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
846                    .flatten()
847                    .map(|v| serde_json::Value::Number(v.into()))
848                    .unwrap_or(serde_json::Value::Null),
849                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
850                    .flatten()
851                    .map(|v| serde_json::Value::Number(v.into()))
852                    .unwrap_or(serde_json::Value::Null),
853                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
854                    .flatten()
855                    .map(|v| serde_json::Value::Number(v.into()))
856                    .unwrap_or(serde_json::Value::Null),
857                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
858                    .flatten()
859                    .and_then(|v| serde_json::Number::from_f64(v as f64))
860                    .map(serde_json::Value::Number)
861                    .unwrap_or(serde_json::Value::Null),
862                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
863                    .flatten()
864                    .and_then(serde_json::Number::from_f64)
865                    .map(serde_json::Value::Number)
866                    .unwrap_or(serde_json::Value::Null),
867                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
868                    .flatten()
869                    .unwrap_or(serde_json::Value::Null),
870                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
871                    .flatten()
872                    .map(|b| serde_json::Value::String(b64(&b)))
873                    .unwrap_or(serde_json::Value::Null),
874                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
875                    try_get_or_null::<Option<String>>(row, i)
876                        .flatten()
877                        .map(serde_json::Value::String)
878                        .unwrap_or(serde_json::Value::Null)
879                }
880                _ => {
881                    // Last resort: ask Postgres to render anything else as
882                    // text via a stringifying decode through Vec<u8>. If even
883                    // that fails (rare — Postgres types not implementing the
884                    // text format), fall through to Null with a warning.
885                    match row.try_get::<_, Option<String>>(i) {
886                        Ok(Some(s)) => serde_json::Value::String(s),
887                        Ok(None) => serde_json::Value::Null,
888                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
889                            Ok(Some(bytes)) => serde_json::Value::String(
890                                String::from_utf8_lossy(&bytes).into_owned(),
891                            ),
892                            _ => serde_json::Value::Null,
893                        },
894                    }
895                }
896            };
897            obj.insert(name, value);
898        }
899        serde_json::Value::Object(obj)
900    }
901
902    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
903    where
904        T: postgres::types::FromSql<'a>,
905    {
906        match row.try_get::<_, T>(i) {
907            Ok(v) => Some(v),
908            Err(e) => {
909                tracing::warn!(
910                    "[postgres] decode failed for column {} ({}): {e}",
911                    i,
912                    row.columns()[i].name()
913                );
914                None
915            }
916        }
917    }
918
919    /// Minimal base64 encoder so we don't need another dependency just for
920    /// the BYTEA column edge case.
921    fn b64(bytes: &[u8]) -> String {
922        const TABLE: &[u8; 64] =
923            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
924        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
925        let chunks = bytes.chunks(3);
926        for chunk in chunks {
927            let b = [
928                chunk.first().copied().unwrap_or(0),
929                chunk.get(1).copied().unwrap_or(0),
930                chunk.get(2).copied().unwrap_or(0),
931            ];
932            out.push(TABLE[(b[0] >> 2) as usize] as char);
933            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
934            if chunk.len() > 1 {
935                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
936            } else {
937                out.push('=');
938            }
939            if chunk.len() > 2 {
940                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
941            } else {
942                out.push('=');
943            }
944        }
945        out
946    }
947
948    fn pg_err(e: postgres::Error) -> StorageError {
949        StorageError {
950            code: "PG_QUERY_FAILED".into(),
951            message: format!("Postgres query failed: {e}"),
952        }
953    }
954}
955
956// ---------------------------------------------------------------------------
957// Tests
958// ---------------------------------------------------------------------------
959
960#[cfg(test)]
961mod tests {
962    use super::*;
963
964    /// Hand-rolled fixture that matches the snapshots in the tests
965    /// below. Decoupled from any example's `pylon.manifest.json` so
966    /// changing an example schema doesn't bleed into adapter tests.
967    fn test_manifest() -> AppManifest {
968        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
969        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
970            name: name.into(),
971            field_type: ty.into(),
972            optional: opt,
973            unique: uniq,
974            crdt: None,
975        };
976        AppManifest {
977            manifest_version: 1,
978            name: "test".into(),
979            version: "0.0.0".into(),
980            entities: vec![
981                ManifestEntity {
982                    name: "User".into(),
983                    fields: vec![
984                        f("email", "string", false, true),
985                        f("displayName", "string", false, false),
986                        f("createdAt", "datetime", false, false),
987                    ],
988                    indexes: vec![],
989                    relations: vec![],
990                    search: None,
991                    crdt: true,
992                },
993                ManifestEntity {
994                    name: "Todo".into(),
995                    fields: vec![
996                        f("title", "string", false, false),
997                        f("done", "bool", false, false),
998                        f("userId", "id(User)", false, false),
999                        f("createdAt", "datetime", false, false),
1000                    ],
1001                    indexes: vec![ManifestIndex {
1002                        name: "by_user".into(),
1003                        fields: vec!["userId".into()],
1004                        unique: false,
1005                    }],
1006                    relations: vec![],
1007                    search: None,
1008                    crdt: true,
1009                },
1010            ],
1011            queries: vec![],
1012            actions: vec![],
1013            policies: vec![],
1014            routes: vec![],
1015        }
1016    }
1017
1018    #[test]
1019    fn pg_type_mapping() {
1020        assert_eq!(pg_column_type("string"), "TEXT");
1021        assert_eq!(pg_column_type("int"), "INTEGER");
1022        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1023        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1024        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1025        assert_eq!(pg_column_type("richtext"), "TEXT");
1026        assert_eq!(pg_column_type("id(User)"), "TEXT");
1027    }
1028
1029    #[test]
1030    fn quote_ident_simple() {
1031        assert_eq!(quote_ident("User"), "\"User\"");
1032        assert_eq!(quote_ident("email"), "\"email\"");
1033    }
1034
1035    #[test]
1036    fn quote_ident_escapes_embedded_double_quotes() {
1037        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1038        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1039    }
1040
1041    #[test]
1042    fn create_table_sql_basic() {
1043        let fields = vec![
1044            FieldSpec {
1045                name: "email".into(),
1046                field_type: "string".into(),
1047                optional: false,
1048                unique: true,
1049            },
1050            FieldSpec {
1051                name: "age".into(),
1052                field_type: "int".into(),
1053                optional: true,
1054                unique: false,
1055            },
1056        ];
1057        let sql = create_table_sql("User", &fields);
1058        assert_eq!(
1059            sql,
1060            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1061        );
1062    }
1063
1064    #[test]
1065    fn create_table_sql_escapes_identifiers() {
1066        let fields = vec![FieldSpec {
1067            name: "col\"x".into(),
1068            field_type: "string".into(),
1069            optional: false,
1070            unique: false,
1071        }];
1072        let sql = create_table_sql("my\"table", &fields);
1073        assert!(sql.contains("\"my\"\"table\""));
1074        assert!(sql.contains("\"col\"\"x\""));
1075    }
1076
1077    #[test]
1078    fn create_index_sql_unique() {
1079        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1080        assert_eq!(
1081            sql,
1082            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1083        );
1084    }
1085
1086    #[test]
1087    fn create_index_sql_non_unique() {
1088        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1089        assert_eq!(
1090            sql,
1091            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1092        );
1093    }
1094
1095    #[test]
1096    fn add_column_sql_basic() {
1097        let field = FieldSpec {
1098            name: "bio".into(),
1099            field_type: "string".into(),
1100            optional: true,
1101            unique: false,
1102        };
1103        let sql = add_column_sql("User", &field);
1104        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1105    }
1106
1107    #[test]
1108    fn plan_from_manifest() {
1109        let adapter = PostgresAdapter;
1110        let manifest = test_manifest();
1111        let plan = adapter.plan_schema(&manifest).unwrap();
1112
1113        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1114        assert!(plan.operations.iter().any(|op| matches!(
1115            op,
1116            SchemaOperation::CreateEntity { name, .. } if name == "User"
1117        )));
1118        assert!(plan.operations.iter().any(|op| matches!(
1119            op,
1120            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1121        )));
1122        assert!(plan.operations.iter().any(|op| matches!(
1123            op,
1124            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1125        )));
1126    }
1127
1128    #[test]
1129    fn plan_to_sql_produces_statements() {
1130        let adapter = PostgresAdapter;
1131        let manifest = test_manifest();
1132        let plan = adapter.plan_schema(&manifest).unwrap();
1133        let stmts = plan_to_sql(&plan).unwrap();
1134
1135        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1136        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1137        // declares a unique by_email index on User which lands as part of
1138        // the table. Final count: 2 tables + 2 indexes.
1139        let create_tables = stmts
1140            .iter()
1141            .filter(|s| s.starts_with("CREATE TABLE"))
1142            .count();
1143        let create_indexes = stmts
1144            .iter()
1145            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1146            .count();
1147        assert_eq!(create_tables, 2);
1148        assert!(create_indexes >= 1);
1149        assert!(stmts[0].starts_with("CREATE TABLE"));
1150        assert!(stmts[1].starts_with("CREATE TABLE"));
1151    }
1152
1153    #[test]
1154    fn plan_to_sql_rejects_unsupported() {
1155        let plan = SchemaPlan {
1156            operations: vec![SchemaOperation::RemoveEntity {
1157                name: "User".into(),
1158            }],
1159        };
1160        let result = plan_to_sql(&plan);
1161        assert!(result.is_err());
1162        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1163    }
1164
1165    #[test]
1166    fn apply_not_implemented() {
1167        let adapter = PostgresAdapter;
1168        let plan = SchemaPlan {
1169            operations: vec![SchemaOperation::Noop],
1170        };
1171        let result = adapter.apply_schema(&plan);
1172        assert!(result.is_err());
1173        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1174    }
1175
1176    #[test]
1177    fn sql_uses_quoted_identifiers() {
1178        let fields = vec![FieldSpec {
1179            name: "createdAt".into(),
1180            field_type: "datetime".into(),
1181            optional: false,
1182            unique: false,
1183        }];
1184        let sql = create_table_sql("User", &fields);
1185        // Postgres identifiers should be quoted for case-sensitivity.
1186        assert!(sql.contains("\"User\""));
1187        assert!(sql.contains("\"createdAt\""));
1188        assert!(sql.contains("TIMESTAMPTZ"));
1189    }
1190
1191    // -- Introspection SQL tests --
1192
1193    #[test]
1194    fn introspect_sql_constants_are_valid() {
1195        // Sanity checks that the SQL strings exist and look reasonable.
1196        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1197        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1198        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1199        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1200    }
1201
1202    // -- Plan from snapshot tests --
1203
1204    #[test]
1205    fn plan_from_empty_snapshot_creates_all() {
1206        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1207        let manifest = test_manifest();
1208        let plan = plan_from_snapshot(&snapshot, &manifest);
1209
1210        assert!(plan.operations.iter().any(|op| matches!(
1211            op,
1212            SchemaOperation::CreateEntity { name, .. } if name == "User"
1213        )));
1214        assert!(plan.operations.iter().any(|op| matches!(
1215            op,
1216            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1217        )));
1218        assert!(plan.operations.iter().any(|op| matches!(
1219            op,
1220            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1221        )));
1222    }
1223
1224    #[test]
1225    fn plan_from_full_snapshot_is_noop() {
1226        let snapshot = crate::SchemaSnapshot {
1227            tables: vec![
1228                crate::TableSnapshot {
1229                    name: "User".into(),
1230                    columns: vec![
1231                        crate::ColumnSnapshot {
1232                            name: "id".into(),
1233                            column_type: "TEXT".into(),
1234                            notnull: true,
1235                            primary_key: true,
1236                        },
1237                        crate::ColumnSnapshot {
1238                            name: "email".into(),
1239                            column_type: "TEXT".into(),
1240                            notnull: true,
1241                            primary_key: false,
1242                        },
1243                        crate::ColumnSnapshot {
1244                            name: "displayName".into(),
1245                            column_type: "TEXT".into(),
1246                            notnull: true,
1247                            primary_key: false,
1248                        },
1249                        crate::ColumnSnapshot {
1250                            name: "createdAt".into(),
1251                            column_type: "TIMESTAMPTZ".into(),
1252                            notnull: true,
1253                            primary_key: false,
1254                        },
1255                    ],
1256                    indexes: vec![],
1257                },
1258                crate::TableSnapshot {
1259                    name: "Todo".into(),
1260                    columns: vec![
1261                        crate::ColumnSnapshot {
1262                            name: "id".into(),
1263                            column_type: "TEXT".into(),
1264                            notnull: true,
1265                            primary_key: true,
1266                        },
1267                        crate::ColumnSnapshot {
1268                            name: "title".into(),
1269                            column_type: "TEXT".into(),
1270                            notnull: true,
1271                            primary_key: false,
1272                        },
1273                        crate::ColumnSnapshot {
1274                            name: "done".into(),
1275                            column_type: "BOOLEAN".into(),
1276                            notnull: true,
1277                            primary_key: false,
1278                        },
1279                        crate::ColumnSnapshot {
1280                            name: "userId".into(),
1281                            column_type: "TEXT".into(),
1282                            notnull: true,
1283                            primary_key: false,
1284                        },
1285                        crate::ColumnSnapshot {
1286                            name: "createdAt".into(),
1287                            column_type: "TIMESTAMPTZ".into(),
1288                            notnull: true,
1289                            primary_key: false,
1290                        },
1291                    ],
1292                    indexes: vec![crate::IndexSnapshot {
1293                        name: "Todo_by_user".into(),
1294                        columns: vec!["userId".into()],
1295                        unique: false,
1296                    }],
1297                },
1298            ],
1299        };
1300        let manifest = test_manifest();
1301        let plan = plan_from_snapshot(&snapshot, &manifest);
1302        assert!(plan.is_empty());
1303    }
1304
1305    #[test]
1306    fn plan_detects_missing_column_in_snapshot() {
1307        let snapshot = crate::SchemaSnapshot {
1308            tables: vec![
1309                crate::TableSnapshot {
1310                    name: "User".into(),
1311                    columns: vec![
1312                        crate::ColumnSnapshot {
1313                            name: "id".into(),
1314                            column_type: "TEXT".into(),
1315                            notnull: true,
1316                            primary_key: true,
1317                        },
1318                        crate::ColumnSnapshot {
1319                            name: "email".into(),
1320                            column_type: "TEXT".into(),
1321                            notnull: true,
1322                            primary_key: false,
1323                        },
1324                        // missing displayName and createdAt
1325                    ],
1326                    indexes: vec![],
1327                },
1328                crate::TableSnapshot {
1329                    name: "Todo".into(),
1330                    columns: vec![
1331                        crate::ColumnSnapshot {
1332                            name: "id".into(),
1333                            column_type: "TEXT".into(),
1334                            notnull: true,
1335                            primary_key: true,
1336                        },
1337                        crate::ColumnSnapshot {
1338                            name: "title".into(),
1339                            column_type: "TEXT".into(),
1340                            notnull: true,
1341                            primary_key: false,
1342                        },
1343                        crate::ColumnSnapshot {
1344                            name: "done".into(),
1345                            column_type: "BOOLEAN".into(),
1346                            notnull: true,
1347                            primary_key: false,
1348                        },
1349                        crate::ColumnSnapshot {
1350                            name: "userId".into(),
1351                            column_type: "TEXT".into(),
1352                            notnull: true,
1353                            primary_key: false,
1354                        },
1355                        crate::ColumnSnapshot {
1356                            name: "createdAt".into(),
1357                            column_type: "TIMESTAMPTZ".into(),
1358                            notnull: true,
1359                            primary_key: false,
1360                        },
1361                    ],
1362                    indexes: vec![crate::IndexSnapshot {
1363                        name: "Todo_by_user".into(),
1364                        columns: vec!["userId".into()],
1365                        unique: false,
1366                    }],
1367                },
1368            ],
1369        };
1370        let manifest = test_manifest();
1371        let plan = plan_from_snapshot(&snapshot, &manifest);
1372
1373        let add_fields: Vec<_> = plan
1374            .operations
1375            .iter()
1376            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1377            .collect();
1378        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1379    }
1380
1381    // -- CRUD helper tests (no live database required) --
1382
1383    #[test]
1384    fn json_value_to_string_handles_all_types() {
1385        assert_eq!(
1386            json_value_to_string(&serde_json::Value::String("hello".into())),
1387            "hello"
1388        );
1389        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1390        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1391        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1392        assert_eq!(
1393            json_value_to_string(&serde_json::Value::Bool(false)),
1394            "false"
1395        );
1396        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1397        // Arrays and objects get their JSON representation.
1398        assert_eq!(
1399            json_value_to_string(&serde_json::json!([1, 2, 3])),
1400            "[1,2,3]"
1401        );
1402        assert_eq!(
1403            json_value_to_string(&serde_json::json!({"a": 1})),
1404            "{\"a\":1}"
1405        );
1406    }
1407
1408    #[test]
1409    fn generate_id_returns_hex_string() {
1410        let id = generate_id();
1411        assert!(!id.is_empty());
1412        // Must be valid hex characters.
1413        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1414    }
1415
1416    #[test]
1417    fn generate_id_is_unique_across_calls() {
1418        let id1 = generate_id();
1419        let id2 = generate_id();
1420        assert_ne!(id1, id2);
1421    }
1422
1423    #[test]
1424    fn generate_id_is_lex_sortable() {
1425        // 1000 IDs back-to-back must come out in monotonically increasing
1426        // lexicographic order. This is what makes cursor pagination correct.
1427        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1428        let sorted = {
1429            let mut s = ids.clone();
1430            s.sort();
1431            s
1432        };
1433        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1434        // And every id must be the same width (otherwise lex comparison is
1435        // wrong at width boundaries).
1436        let len0 = ids[0].len();
1437        assert!(ids.iter().all(|id| id.len() == len0));
1438        ids.dedup();
1439        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1440    }
1441
1442    #[test]
1443    fn build_insert_sql_simple() {
1444        let data = serde_json::json!({
1445            "email": "alice@example.com",
1446            "displayName": "Alice"
1447        });
1448        let (sql, values) = build_insert_sql("User", &data).unwrap();
1449
1450        assert!(sql.starts_with("INSERT INTO \"User\""));
1451        assert!(sql.contains("id"));
1452        assert!(sql.contains("$1"));
1453        assert!(sql.contains("$2"));
1454        assert!(sql.contains("$3"));
1455        // First value is the generated ID.
1456        assert!(!values[0].is_empty());
1457        assert_eq!(values.len(), 3); // id + 2 fields
1458    }
1459
1460    #[test]
1461    fn build_insert_sql_quotes_column_names() {
1462        let data = serde_json::json!({"createdAt": "2026-01-01"});
1463        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1464        assert!(sql.contains("\"createdAt\""));
1465        assert!(sql.contains("\"Todo\""));
1466    }
1467
1468    #[test]
1469    fn build_insert_sql_rejects_non_object() {
1470        let data = serde_json::json!("not an object");
1471        let result = build_insert_sql("User", &data);
1472        assert!(result.is_err());
1473        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1474    }
1475
1476    #[test]
1477    fn build_update_sql_simple() {
1478        let data = serde_json::json!({
1479            "displayName": "Bob",
1480            "email": "bob@example.com"
1481        });
1482        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1483
1484        assert!(sql.starts_with("UPDATE \"User\" SET"));
1485        assert!(sql.contains("WHERE id = $1"));
1486        assert!(sql.contains("$2"));
1487        assert!(sql.contains("$3"));
1488        assert_eq!(values[0], "abc123");
1489        assert_eq!(values.len(), 3); // id + 2 fields
1490    }
1491
1492    #[test]
1493    fn build_update_sql_quotes_column_names() {
1494        let data = serde_json::json!({"displayName": "Carol"});
1495        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1496        assert!(sql.contains("\"displayName\" = $2"));
1497    }
1498
1499    #[test]
1500    fn build_update_sql_rejects_non_object() {
1501        let data = serde_json::json!(42);
1502        let result = build_update_sql("User", "id1", &data);
1503        assert!(result.is_err());
1504        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1505    }
1506
1507    #[test]
1508    fn build_update_sql_rejects_empty_object() {
1509        let data = serde_json::json!({});
1510        let err = build_update_sql("User", "id1", &data).unwrap_err();
1511        assert_eq!(err.code, "PG_INVALID_DATA");
1512        assert!(err.message.contains("at least one field"));
1513    }
1514}