Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AddIndex {
166                entity,
167                name,
168                fields,
169                unique,
170            } => {
171                statements.push(create_index_sql(entity, name, fields, *unique));
172            }
173            SchemaOperation::Noop => {}
174            other => {
175                return Err(StorageError {
176                    code: "PG_OP_UNSUPPORTED".into(),
177                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
178                });
179            }
180        }
181    }
182
183    Ok(statements)
184}
185
186// ---------------------------------------------------------------------------
187// Introspection SQL helpers
188//
189// These generate the SQL queries that a live Postgres connection would run
190// to read the current schema. No connection required — just SQL strings.
191// ---------------------------------------------------------------------------
192
193/// SQL to list user tables in the public schema.
194pub const INTROSPECT_TABLES_SQL: &str = "\
195    SELECT table_name \
196    FROM information_schema.tables \
197    WHERE table_schema = 'public' \
198      AND table_type = 'BASE TABLE' \
199      AND table_name NOT LIKE '_pylon_%' \
200    ORDER BY table_name";
201
202/// SQL to list columns for a given table.
203/// Use with parameter: table_name.
204pub const INTROSPECT_COLUMNS_SQL: &str = "\
205    SELECT column_name, data_type, is_nullable, \
206           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207            JOIN information_schema.key_column_usage kcu \
208              ON tc.constraint_name = kcu.constraint_name \
209            WHERE tc.table_name = c.table_name \
210              AND kcu.column_name = c.column_name \
211              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212    FROM information_schema.columns c \
213    WHERE table_schema = 'public' AND table_name = $1 \
214    ORDER BY ordinal_position";
215
216/// SQL to list indexes for a given table.
217/// Use with parameter: table_name.
218pub const INTROSPECT_INDEXES_SQL: &str = "\
219    SELECT i.relname as index_name, \
220           ix.indisunique as is_unique, \
221           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222    FROM pg_index ix \
223    JOIN pg_class t ON t.oid = ix.indrelid \
224    JOIN pg_class i ON i.oid = ix.indexrelid \
225    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226    JOIN pg_namespace n ON n.oid = t.relnamespace \
227    WHERE n.nspname = 'public' \
228      AND t.relname = $1 \
229      AND NOT ix.indisprimary \
230    GROUP BY i.relname, ix.indisunique \
231    ORDER BY i.relname";
232
233/// Plan from a snapshot (reuses the shared plan_from_snapshot).
234/// This allows Postgres to plan incrementally once introspection data is available.
235pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236    crate::plan_from_snapshot(snapshot, target)
237}
238
239// ---------------------------------------------------------------------------
240// CRUD SQL generation helpers (used by live adapter, testable without a DB)
241// ---------------------------------------------------------------------------
242
243/// Generate a lex-sortable, monotonic-ish unique ID.
244///
245/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
246/// of a per-process atomic counter. The counter prevents collisions when two
247/// inserts hit the same nanosecond and — critically — keeps order stable: an
248/// id minted at the same nanosecond is monotonically greater than the
249/// previous one. Width is fixed at 40 chars so lexicographic comparison
250/// matches creation order, which is what cursor pagination relies on.
251pub fn generate_id() -> String {
252    use std::sync::atomic::{AtomicU32, Ordering};
253    use std::time::{SystemTime, UNIX_EPOCH};
254    static COUNTER: AtomicU32 = AtomicU32::new(0);
255    let ts = SystemTime::now()
256        .duration_since(UNIX_EPOCH)
257        .unwrap_or_default()
258        .as_nanos();
259    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260    format!("{ts:032x}{seq:08x}")
261}
262
263/// Convert a JSON value to its string representation for use as a SQL parameter.
264pub fn json_value_to_string(val: &serde_json::Value) -> String {
265    match val {
266        serde_json::Value::String(s) => s.clone(),
267        serde_json::Value::Number(n) => n.to_string(),
268        serde_json::Value::Bool(b) => b.to_string(),
269        serde_json::Value::Null => String::new(),
270        other => other.to_string(),
271    }
272}
273
274/// Build an INSERT SQL statement and collect string parameter values.
275/// Returns `(sql, values)` where `values[0]` is the generated ID.
276pub fn build_insert_sql(
277    entity: &str,
278    data: &serde_json::Value,
279) -> Result<(String, Vec<String>), StorageError> {
280    let id = generate_id();
281    let obj = data.as_object().ok_or_else(|| StorageError {
282        code: "PG_INVALID_DATA".into(),
283        message: "Insert data must be a JSON object".into(),
284    })?;
285
286    let mut col_names = vec!["id".to_string()];
287    let mut placeholders = vec!["$1".to_string()];
288    let mut values: Vec<String> = vec![id];
289
290    for (i, (key, val)) in obj.iter().enumerate() {
291        col_names.push(quote_ident(key));
292        placeholders.push(format!("${}", i + 2));
293        values.push(json_value_to_string(val));
294    }
295
296    let sql = format!(
297        "INSERT INTO {} ({}) VALUES ({})",
298        quote_ident(entity),
299        col_names.join(", "),
300        placeholders.join(", ")
301    );
302
303    Ok((sql, values))
304}
305
306/// Build an UPDATE SQL statement and collect string parameter values.
307/// Returns `(sql, values)` where `values[0]` is the row ID.
308pub fn build_update_sql(
309    entity: &str,
310    id: &str,
311    data: &serde_json::Value,
312) -> Result<(String, Vec<String>), StorageError> {
313    let obj = data.as_object().ok_or_else(|| StorageError {
314        code: "PG_INVALID_DATA".into(),
315        message: "Update data must be a JSON object".into(),
316    })?;
317
318    if obj.is_empty() {
319        return Err(StorageError {
320            code: "PG_INVALID_DATA".into(),
321            message: "Update data must contain at least one field".into(),
322        });
323    }
324
325    let mut set_clauses = Vec::new();
326    let mut values: Vec<String> = vec![id.to_string()];
327
328    for (i, (key, val)) in obj.iter().enumerate() {
329        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
330        values.push(json_value_to_string(val));
331    }
332
333    let sql = format!(
334        "UPDATE {} SET {} WHERE id = $1",
335        quote_ident(entity),
336        set_clauses.join(", ")
337    );
338
339    Ok((sql, values))
340}
341
342// ---------------------------------------------------------------------------
343// Live Postgres adapter (requires "postgres-live" feature)
344// ---------------------------------------------------------------------------
345
346#[cfg(feature = "postgres-live")]
347pub mod live {
348    use super::*;
349    use crate::{
350        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
351    };
352
353    /// A live Postgres adapter with a real database connection.
354    pub struct LivePostgresAdapter {
355        client: postgres::Client,
356    }
357
358    impl LivePostgresAdapter {
359        /// Connect to a Postgres database.
360        pub fn connect(url: &str) -> Result<Self, StorageError> {
361            let client =
362                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
363                    code: "PG_CONNECT_FAILED".into(),
364                    message: format!("Failed to connect to Postgres: {e}"),
365                })?;
366            Ok(Self { client })
367        }
368
369        /// Read the current schema from the live database.
370        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
371            let table_rows = self
372                .client
373                .query(INTROSPECT_TABLES_SQL, &[])
374                .map_err(pg_err)?;
375
376            let mut tables = Vec::new();
377            for row in &table_rows {
378                let table_name: String = row.get(0);
379                let columns = self.read_columns(&table_name)?;
380                let indexes = self.read_indexes(&table_name)?;
381                tables.push(TableSnapshot {
382                    name: table_name,
383                    columns,
384                    indexes,
385                });
386            }
387
388            Ok(SchemaSnapshot { tables })
389        }
390
391        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
392            let rows = self
393                .client
394                .query(INTROSPECT_COLUMNS_SQL, &[&table])
395                .map_err(pg_err)?;
396
397            let mut columns = Vec::new();
398            for row in &rows {
399                let name: String = row.get(0);
400                let data_type: String = row.get(1);
401                let is_nullable: String = row.get(2);
402                let is_pk: i64 = row.get(3);
403                columns.push(ColumnSnapshot {
404                    name,
405                    column_type: data_type,
406                    notnull: is_nullable == "NO",
407                    primary_key: is_pk > 0,
408                });
409            }
410            Ok(columns)
411        }
412
413        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
414            let rows = self
415                .client
416                .query(INTROSPECT_INDEXES_SQL, &[&table])
417                .map_err(pg_err)?;
418
419            let mut indexes = Vec::new();
420            for row in &rows {
421                let name: String = row.get(0);
422                let unique: bool = row.get(1);
423                let columns: Vec<String> = row.get(2);
424                indexes.push(IndexSnapshot {
425                    name,
426                    columns,
427                    unique,
428                });
429            }
430            Ok(indexes)
431        }
432
433        /// Plan from live database state.
434        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
435            let snapshot = self.read_schema()?;
436            Ok(crate::plan_from_snapshot(&snapshot, target))
437        }
438    }
439
440    impl StorageAdapter for LivePostgresAdapter {
441        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
442            Err(StorageError {
443                code: "PG_PLAN_NEEDS_MUTABLE".into(),
444                message: "Use plan_from_live() instead for live Postgres planning".into(),
445            })
446        }
447
448        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
449            Err(StorageError {
450                code: "PG_APPLY_USE_METHOD".into(),
451                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
452            })
453        }
454    }
455
456    impl LivePostgresAdapter {
457        /// Apply a schema plan to the live database.
458        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
459            let statements = plan_to_sql(plan)?;
460            for sql in &statements {
461                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
462            }
463            Ok(())
464        }
465
466        /// Insert a row. Returns the generated ID.
467        pub fn insert(
468            &mut self,
469            entity: &str,
470            data: &serde_json::Value,
471        ) -> Result<String, StorageError> {
472            let (sql, values) = build_insert_sql(entity, data)?;
473            let id = values[0].clone();
474
475            let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
476                .iter()
477                .map(|v| v as &(dyn postgres::types::ToSql + Sync))
478                .collect();
479
480            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
481            Ok(id)
482        }
483
484        /// Get a row by ID.
485        pub fn get_by_id(
486            &mut self,
487            entity: &str,
488            id: &str,
489        ) -> Result<Option<serde_json::Value>, StorageError> {
490            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
491            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
492
493            match rows.first() {
494                Some(row) => Ok(Some(row_to_json(row))),
495                None => Ok(None),
496            }
497        }
498
499        /// List all rows from an entity.
500        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
501            let sql = format!("SELECT * FROM {}", quote_ident(entity));
502            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
503
504            Ok(rows.iter().map(row_to_json).collect())
505        }
506
507        /// Cursor-paginated list. `after` is the last `id` from the previous
508        /// page; the result contains rows with `id > after` (lex order),
509        /// limited to `limit`. Used for sync push/pull.
510        pub fn list_after(
511            &mut self,
512            entity: &str,
513            after: Option<&str>,
514            limit: usize,
515        ) -> Result<Vec<serde_json::Value>, StorageError> {
516            // Cap limit at a sensible upper bound so a malicious client can't
517            // stream the whole table by passing limit=u64::MAX.
518            let capped: i64 = limit.min(10_000) as i64;
519            let sql = match after {
520                Some(_) => format!(
521                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
522                    quote_ident(entity)
523                ),
524                None => format!(
525                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
526                    quote_ident(entity)
527                ),
528            };
529            let rows = match after {
530                Some(cursor) => self
531                    .client
532                    .query(sql.as_str(), &[&cursor, &capped])
533                    .map_err(pg_err)?,
534                None => self
535                    .client
536                    .query(sql.as_str(), &[&capped])
537                    .map_err(pg_err)?,
538            };
539            Ok(rows.iter().map(row_to_json).collect())
540        }
541
542        /// Update a row by ID. Returns true if the row was found and updated.
543        pub fn update(
544            &mut self,
545            entity: &str,
546            id: &str,
547            data: &serde_json::Value,
548        ) -> Result<bool, StorageError> {
549            let (sql, values) = build_update_sql(entity, id, data)?;
550
551            let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
552                .iter()
553                .map(|v| v as &(dyn postgres::types::ToSql + Sync))
554                .collect();
555
556            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
557            Ok(rows_affected > 0)
558        }
559
560        /// Delete a row by ID. Returns true if the row was found and deleted.
561        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
562            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
563            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
564            Ok(rows_affected > 0)
565        }
566
567        /// Look up a row by `field = value`. Caller must validate `field`
568        /// against the manifest before calling — we still `quote_ident` it
569        /// but won't catch a typo against the entity definition.
570        pub fn lookup_field(
571            &mut self,
572            entity: &str,
573            field: &str,
574            value: &str,
575        ) -> Result<Option<serde_json::Value>, StorageError> {
576            let sql = format!(
577                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
578                quote_ident(entity),
579                quote_ident(field),
580            );
581            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
582            Ok(rows.first().map(row_to_json))
583        }
584
585        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
586        ///
587        /// Supported operators: equality (`field: value`), `$gt`, `$gte`,
588        /// `$lt`, `$lte`, `$like`, `$in: [..]`, plus the meta operators
589        /// `$order: { field: "asc"|"desc" }`, `$limit`, `$offset`.
590        ///
591        /// Anything else is silently ignored (matches the in-memory fallback's
592        /// permissive behavior). Field names are validated against `valid_columns`
593        /// to prevent SQL injection — pass the entity's column set.
594        pub fn query_filtered(
595            &mut self,
596            entity: &str,
597            filter: &serde_json::Value,
598            valid_columns: &[String],
599        ) -> Result<Vec<serde_json::Value>, StorageError> {
600            let empty = serde_json::Map::new();
601            let obj = filter.as_object().unwrap_or(&empty);
602
603            let validate = |col: &str| -> Result<(), StorageError> {
604                if col == "id" || valid_columns.iter().any(|c| c == col) {
605                    Ok(())
606                } else {
607                    Err(StorageError {
608                        code: "UNKNOWN_COLUMN".into(),
609                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
610                    })
611                }
612            };
613
614            let mut where_clauses: Vec<String> = Vec::new();
615            let mut order_clause = String::new();
616            let mut limit_clause = String::new();
617            let mut offset_clause = String::new();
618            // Collect (col, op, value) so placeholder numbers can be assigned
619            // in a single materialization pass after the parse loop.
620            let mut planned: Vec<(String, String, String)> = Vec::new();
621
622            for (key, val) in obj {
623                match key.as_str() {
624                    "$order" => {
625                        if let Some(ord) = val.as_object() {
626                            let mut parts = Vec::new();
627                            for (col, dir) in ord {
628                                validate(col)?;
629                                let d = match dir.as_str().unwrap_or("asc") {
630                                    "desc" | "DESC" => "DESC",
631                                    _ => "ASC",
632                                };
633                                parts.push(format!("{} {d}", quote_ident(col)));
634                            }
635                            if !parts.is_empty() {
636                                order_clause = format!(" ORDER BY {}", parts.join(", "));
637                            }
638                        }
639                    }
640                    "$limit" => {
641                        if let Some(n) = val.as_u64() {
642                            limit_clause = format!(" LIMIT {}", n);
643                        }
644                    }
645                    "$offset" => {
646                        if let Some(n) = val.as_u64() {
647                            offset_clause = format!(" OFFSET {}", n);
648                        }
649                    }
650                    field => {
651                        validate(field)?;
652                        match val {
653                            serde_json::Value::Object(ops) => {
654                                for (op, v) in ops {
655                                    match op.as_str() {
656                                        "$gt" => {
657                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
658                                        }
659                                        "$gte" => planned.push((
660                                            field.into(),
661                                            ">=".into(),
662                                            value_to_pg(v),
663                                        )),
664                                        "$lt" => {
665                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
666                                        }
667                                        "$lte" => planned.push((
668                                            field.into(),
669                                            "<=".into(),
670                                            value_to_pg(v),
671                                        )),
672                                        "$like" => planned.push((
673                                            field.into(),
674                                            "LIKE".into(),
675                                            value_to_pg(v),
676                                        )),
677                                        "$in" => {
678                                            if let Some(arr) = v.as_array() {
679                                                let placeholders: Vec<String> = (0..arr.len())
680                                                    .map(|i| format!("${}", planned.len() + 1 + i))
681                                                    .collect();
682                                                where_clauses.push(format!(
683                                                    "{} IN ({})",
684                                                    quote_ident(field),
685                                                    placeholders.join(", "),
686                                                ));
687                                                for x in arr {
688                                                    planned.push((
689                                                        format!("__inline_{}", planned.len()),
690                                                        "__INLINE__".into(),
691                                                        value_to_pg(x),
692                                                    ));
693                                                }
694                                            }
695                                        }
696                                        _ => {}
697                                    }
698                                }
699                            }
700                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
701                        }
702                    }
703                }
704            }
705
706            // Materialize planned -> SQL + params.
707            let mut params: Vec<String> = Vec::with_capacity(planned.len());
708            for (field, op, v) in &planned {
709                if op == "__INLINE__" {
710                    // Already emitted via the IN-clause path; just push the value.
711                    params.push(v.clone());
712                } else {
713                    let placeholder = format!("${}", params.len() + 1);
714                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
715                    params.push(v.clone());
716                }
717            }
718
719            let where_sql = if where_clauses.is_empty() {
720                String::new()
721            } else {
722                format!(" WHERE {}", where_clauses.join(" AND "))
723            };
724            let sql = format!(
725                "SELECT * FROM {}{}{}{}{}",
726                quote_ident(entity),
727                where_sql,
728                order_clause,
729                limit_clause,
730                offset_clause,
731            );
732
733            let pg_params: Vec<&(dyn postgres::types::ToSql + Sync)> = params
734                .iter()
735                .map(|s| s as &(dyn postgres::types::ToSql + Sync))
736                .collect();
737
738            let rows = self
739                .client
740                .query(sql.as_str(), &pg_params)
741                .map_err(pg_err)?;
742            Ok(rows.iter().map(row_to_json).collect())
743        }
744    }
745
746    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
747    pub enum TxOp<'a> {
748        Insert {
749            entity: &'a str,
750            data: &'a serde_json::Value,
751        },
752        Update {
753            entity: &'a str,
754            id: &'a str,
755            data: &'a serde_json::Value,
756        },
757        Delete {
758            entity: &'a str,
759            id: &'a str,
760        },
761    }
762
763    /// Result of a single op inside a transaction.
764    #[derive(Debug, Clone)]
765    pub enum TxResult {
766        Inserted(String),
767        Updated(bool),
768        Deleted(bool),
769    }
770
771    impl LivePostgresAdapter {
772        /// Run `ops` inside a single Postgres transaction. Either all of them
773        /// commit together or none of them do — there is no partial state on
774        /// failure. The ROLLBACK happens implicitly when the `Transaction`
775        /// guard is dropped without `commit()` being called.
776        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
777            let mut tx = self.client.transaction().map_err(pg_err)?;
778            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
779
780            for op in ops {
781                match op {
782                    TxOp::Insert { entity, data } => {
783                        let (sql, values) = build_insert_sql(entity, data)?;
784                        let id = values[0].clone();
785                        let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
786                            .iter()
787                            .map(|v| v as &(dyn postgres::types::ToSql + Sync))
788                            .collect();
789                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
790                        results.push(TxResult::Inserted(id));
791                    }
792                    TxOp::Update { entity, id, data } => {
793                        let (sql, values) = build_update_sql(entity, id, data)?;
794                        let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
795                            .iter()
796                            .map(|v| v as &(dyn postgres::types::ToSql + Sync))
797                            .collect();
798                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
799                        results.push(TxResult::Updated(n > 0));
800                    }
801                    TxOp::Delete { entity, id } => {
802                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
803                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
804                        results.push(TxResult::Deleted(n > 0));
805                    }
806                }
807            }
808
809            tx.commit().map_err(pg_err)?;
810            Ok(results)
811        }
812    }
813
814    fn value_to_pg(v: &serde_json::Value) -> String {
815        match v {
816            serde_json::Value::String(s) => s.clone(),
817            serde_json::Value::Number(n) => n.to_string(),
818            serde_json::Value::Bool(b) => b.to_string(),
819            serde_json::Value::Null => String::new(),
820            other => other.to_string(),
821        }
822    }
823
824    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
825        use postgres::types::Type;
826        let mut obj = serde_json::Map::new();
827        for (i, col) in row.columns().iter().enumerate() {
828            let name = col.name().to_string();
829
830            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
831            // and a panic in a query handler poisons the connection mutex,
832            // taking down all subsequent reads on this datastore. Anything
833            // that fails to decode becomes Null with a one-shot warning.
834            //
835            // Timestamps and the catch-all path explicitly DON'T request
836            // `String` — the postgres crate uses binary protocol by default
837            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
838            // ask for `Vec<u8>` and lossy-stringify, which works for all
839            // text-shaped columns in either protocol.
840            let value: serde_json::Value = match *col.type_() {
841                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
842                    .flatten()
843                    .map(serde_json::Value::Bool)
844                    .unwrap_or(serde_json::Value::Null),
845                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
846                    .flatten()
847                    .map(|v| serde_json::Value::Number(v.into()))
848                    .unwrap_or(serde_json::Value::Null),
849                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
850                    .flatten()
851                    .map(|v| serde_json::Value::Number(v.into()))
852                    .unwrap_or(serde_json::Value::Null),
853                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
854                    .flatten()
855                    .map(|v| serde_json::Value::Number(v.into()))
856                    .unwrap_or(serde_json::Value::Null),
857                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
858                    .flatten()
859                    .and_then(|v| serde_json::Number::from_f64(v as f64))
860                    .map(serde_json::Value::Number)
861                    .unwrap_or(serde_json::Value::Null),
862                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
863                    .flatten()
864                    .and_then(serde_json::Number::from_f64)
865                    .map(serde_json::Value::Number)
866                    .unwrap_or(serde_json::Value::Null),
867                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
868                    .flatten()
869                    .unwrap_or(serde_json::Value::Null),
870                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
871                    .flatten()
872                    .map(|b| serde_json::Value::String(b64(&b)))
873                    .unwrap_or(serde_json::Value::Null),
874                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
875                    try_get_or_null::<Option<String>>(row, i)
876                        .flatten()
877                        .map(serde_json::Value::String)
878                        .unwrap_or(serde_json::Value::Null)
879                }
880                _ => {
881                    // Last resort: ask Postgres to render anything else as
882                    // text via a stringifying decode through Vec<u8>. If even
883                    // that fails (rare — Postgres types not implementing the
884                    // text format), fall through to Null with a warning.
885                    match row.try_get::<_, Option<String>>(i) {
886                        Ok(Some(s)) => serde_json::Value::String(s),
887                        Ok(None) => serde_json::Value::Null,
888                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
889                            Ok(Some(bytes)) => serde_json::Value::String(
890                                String::from_utf8_lossy(&bytes).into_owned(),
891                            ),
892                            _ => serde_json::Value::Null,
893                        },
894                    }
895                }
896            };
897            obj.insert(name, value);
898        }
899        serde_json::Value::Object(obj)
900    }
901
902    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
903    where
904        T: postgres::types::FromSql<'a>,
905    {
906        match row.try_get::<_, T>(i) {
907            Ok(v) => Some(v),
908            Err(e) => {
909                tracing::warn!(
910                    "[postgres] decode failed for column {} ({}): {e}",
911                    i,
912                    row.columns()[i].name()
913                );
914                None
915            }
916        }
917    }
918
919    /// Minimal base64 encoder so we don't need another dependency just for
920    /// the BYTEA column edge case.
921    fn b64(bytes: &[u8]) -> String {
922        const TABLE: &[u8; 64] =
923            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
924        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
925        let chunks = bytes.chunks(3);
926        for chunk in chunks {
927            let b = [
928                chunk.first().copied().unwrap_or(0),
929                chunk.get(1).copied().unwrap_or(0),
930                chunk.get(2).copied().unwrap_or(0),
931            ];
932            out.push(TABLE[(b[0] >> 2) as usize] as char);
933            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
934            if chunk.len() > 1 {
935                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
936            } else {
937                out.push('=');
938            }
939            if chunk.len() > 2 {
940                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
941            } else {
942                out.push('=');
943            }
944        }
945        out
946    }
947
948    fn pg_err(e: postgres::Error) -> StorageError {
949        StorageError {
950            code: "PG_QUERY_FAILED".into(),
951            message: format!("Postgres query failed: {e}"),
952        }
953    }
954}
955
956// ---------------------------------------------------------------------------
957// Tests
958// ---------------------------------------------------------------------------
959
960#[cfg(test)]
961mod tests {
962    use super::*;
963
964    fn test_manifest() -> AppManifest {
965        serde_json::from_str(include_str!(
966            "../../../examples/todo-app/pylon.manifest.json"
967        ))
968        .unwrap()
969    }
970
971    #[test]
972    fn pg_type_mapping() {
973        assert_eq!(pg_column_type("string"), "TEXT");
974        assert_eq!(pg_column_type("int"), "INTEGER");
975        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
976        assert_eq!(pg_column_type("bool"), "BOOLEAN");
977        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
978        assert_eq!(pg_column_type("richtext"), "TEXT");
979        assert_eq!(pg_column_type("id(User)"), "TEXT");
980    }
981
982    #[test]
983    fn quote_ident_simple() {
984        assert_eq!(quote_ident("User"), "\"User\"");
985        assert_eq!(quote_ident("email"), "\"email\"");
986    }
987
988    #[test]
989    fn quote_ident_escapes_embedded_double_quotes() {
990        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
991        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
992    }
993
994    #[test]
995    fn create_table_sql_basic() {
996        let fields = vec![
997            FieldSpec {
998                name: "email".into(),
999                field_type: "string".into(),
1000                optional: false,
1001                unique: true,
1002            },
1003            FieldSpec {
1004                name: "age".into(),
1005                field_type: "int".into(),
1006                optional: true,
1007                unique: false,
1008            },
1009        ];
1010        let sql = create_table_sql("User", &fields);
1011        assert_eq!(
1012            sql,
1013            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1014        );
1015    }
1016
1017    #[test]
1018    fn create_table_sql_escapes_identifiers() {
1019        let fields = vec![FieldSpec {
1020            name: "col\"x".into(),
1021            field_type: "string".into(),
1022            optional: false,
1023            unique: false,
1024        }];
1025        let sql = create_table_sql("my\"table", &fields);
1026        assert!(sql.contains("\"my\"\"table\""));
1027        assert!(sql.contains("\"col\"\"x\""));
1028    }
1029
1030    #[test]
1031    fn create_index_sql_unique() {
1032        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1033        assert_eq!(
1034            sql,
1035            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1036        );
1037    }
1038
1039    #[test]
1040    fn create_index_sql_non_unique() {
1041        let sql = create_index_sql("Todo", "by_author", &["authorId".into()], false);
1042        assert_eq!(
1043            sql,
1044            "CREATE INDEX IF NOT EXISTS \"Todo_by_author\" ON \"Todo\" (\"authorId\")"
1045        );
1046    }
1047
1048    #[test]
1049    fn add_column_sql_basic() {
1050        let field = FieldSpec {
1051            name: "bio".into(),
1052            field_type: "string".into(),
1053            optional: true,
1054            unique: false,
1055        };
1056        let sql = add_column_sql("User", &field);
1057        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1058    }
1059
1060    #[test]
1061    fn plan_from_manifest() {
1062        let adapter = PostgresAdapter;
1063        let manifest = test_manifest();
1064        let plan = adapter.plan_schema(&manifest).unwrap();
1065
1066        // Should have CreateEntity for User and Todo, plus AddIndex for by_author.
1067        assert!(plan.operations.iter().any(|op| matches!(
1068            op,
1069            SchemaOperation::CreateEntity { name, .. } if name == "User"
1070        )));
1071        assert!(plan.operations.iter().any(|op| matches!(
1072            op,
1073            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1074        )));
1075        assert!(plan.operations.iter().any(|op| matches!(
1076            op,
1077            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_author"
1078        )));
1079    }
1080
1081    #[test]
1082    fn plan_to_sql_produces_statements() {
1083        let adapter = PostgresAdapter;
1084        let manifest = test_manifest();
1085        let plan = adapter.plan_schema(&manifest).unwrap();
1086        let stmts = plan_to_sql(&plan).unwrap();
1087
1088        assert_eq!(stmts.len(), 3); // 2 CREATE TABLE + 1 CREATE INDEX
1089        assert!(stmts[0].starts_with("CREATE TABLE"));
1090        assert!(stmts[1].starts_with("CREATE TABLE"));
1091        assert!(stmts[2].starts_with("CREATE INDEX"));
1092    }
1093
1094    #[test]
1095    fn plan_to_sql_rejects_unsupported() {
1096        let plan = SchemaPlan {
1097            operations: vec![SchemaOperation::RemoveEntity {
1098                name: "User".into(),
1099            }],
1100        };
1101        let result = plan_to_sql(&plan);
1102        assert!(result.is_err());
1103        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1104    }
1105
1106    #[test]
1107    fn apply_not_implemented() {
1108        let adapter = PostgresAdapter;
1109        let plan = SchemaPlan {
1110            operations: vec![SchemaOperation::Noop],
1111        };
1112        let result = adapter.apply_schema(&plan);
1113        assert!(result.is_err());
1114        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1115    }
1116
1117    #[test]
1118    fn sql_uses_quoted_identifiers() {
1119        let fields = vec![FieldSpec {
1120            name: "createdAt".into(),
1121            field_type: "datetime".into(),
1122            optional: false,
1123            unique: false,
1124        }];
1125        let sql = create_table_sql("User", &fields);
1126        // Postgres identifiers should be quoted for case-sensitivity.
1127        assert!(sql.contains("\"User\""));
1128        assert!(sql.contains("\"createdAt\""));
1129        assert!(sql.contains("TIMESTAMPTZ"));
1130    }
1131
1132    // -- Introspection SQL tests --
1133
1134    #[test]
1135    fn introspect_sql_constants_are_valid() {
1136        // Sanity checks that the SQL strings exist and look reasonable.
1137        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1138        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1139        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1140        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1141    }
1142
1143    // -- Plan from snapshot tests --
1144
1145    #[test]
1146    fn plan_from_empty_snapshot_creates_all() {
1147        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1148        let manifest = test_manifest();
1149        let plan = plan_from_snapshot(&snapshot, &manifest);
1150
1151        assert!(plan.operations.iter().any(|op| matches!(
1152            op,
1153            SchemaOperation::CreateEntity { name, .. } if name == "User"
1154        )));
1155        assert!(plan.operations.iter().any(|op| matches!(
1156            op,
1157            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1158        )));
1159        assert!(plan.operations.iter().any(|op| matches!(
1160            op,
1161            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_author"
1162        )));
1163    }
1164
1165    #[test]
1166    fn plan_from_full_snapshot_is_noop() {
1167        let snapshot = crate::SchemaSnapshot {
1168            tables: vec![
1169                crate::TableSnapshot {
1170                    name: "User".into(),
1171                    columns: vec![
1172                        crate::ColumnSnapshot {
1173                            name: "id".into(),
1174                            column_type: "TEXT".into(),
1175                            notnull: true,
1176                            primary_key: true,
1177                        },
1178                        crate::ColumnSnapshot {
1179                            name: "email".into(),
1180                            column_type: "TEXT".into(),
1181                            notnull: true,
1182                            primary_key: false,
1183                        },
1184                        crate::ColumnSnapshot {
1185                            name: "displayName".into(),
1186                            column_type: "TEXT".into(),
1187                            notnull: true,
1188                            primary_key: false,
1189                        },
1190                        crate::ColumnSnapshot {
1191                            name: "createdAt".into(),
1192                            column_type: "TIMESTAMPTZ".into(),
1193                            notnull: true,
1194                            primary_key: false,
1195                        },
1196                    ],
1197                    indexes: vec![],
1198                },
1199                crate::TableSnapshot {
1200                    name: "Todo".into(),
1201                    columns: vec![
1202                        crate::ColumnSnapshot {
1203                            name: "id".into(),
1204                            column_type: "TEXT".into(),
1205                            notnull: true,
1206                            primary_key: true,
1207                        },
1208                        crate::ColumnSnapshot {
1209                            name: "title".into(),
1210                            column_type: "TEXT".into(),
1211                            notnull: true,
1212                            primary_key: false,
1213                        },
1214                        crate::ColumnSnapshot {
1215                            name: "done".into(),
1216                            column_type: "BOOLEAN".into(),
1217                            notnull: true,
1218                            primary_key: false,
1219                        },
1220                        crate::ColumnSnapshot {
1221                            name: "authorId".into(),
1222                            column_type: "TEXT".into(),
1223                            notnull: true,
1224                            primary_key: false,
1225                        },
1226                        crate::ColumnSnapshot {
1227                            name: "createdAt".into(),
1228                            column_type: "TIMESTAMPTZ".into(),
1229                            notnull: true,
1230                            primary_key: false,
1231                        },
1232                    ],
1233                    indexes: vec![crate::IndexSnapshot {
1234                        name: "Todo_by_author".into(),
1235                        columns: vec!["authorId".into()],
1236                        unique: false,
1237                    }],
1238                },
1239            ],
1240        };
1241        let manifest = test_manifest();
1242        let plan = plan_from_snapshot(&snapshot, &manifest);
1243        assert!(plan.is_empty());
1244    }
1245
1246    #[test]
1247    fn plan_detects_missing_column_in_snapshot() {
1248        let snapshot = crate::SchemaSnapshot {
1249            tables: vec![
1250                crate::TableSnapshot {
1251                    name: "User".into(),
1252                    columns: vec![
1253                        crate::ColumnSnapshot {
1254                            name: "id".into(),
1255                            column_type: "TEXT".into(),
1256                            notnull: true,
1257                            primary_key: true,
1258                        },
1259                        crate::ColumnSnapshot {
1260                            name: "email".into(),
1261                            column_type: "TEXT".into(),
1262                            notnull: true,
1263                            primary_key: false,
1264                        },
1265                        // missing displayName and createdAt
1266                    ],
1267                    indexes: vec![],
1268                },
1269                crate::TableSnapshot {
1270                    name: "Todo".into(),
1271                    columns: vec![
1272                        crate::ColumnSnapshot {
1273                            name: "id".into(),
1274                            column_type: "TEXT".into(),
1275                            notnull: true,
1276                            primary_key: true,
1277                        },
1278                        crate::ColumnSnapshot {
1279                            name: "title".into(),
1280                            column_type: "TEXT".into(),
1281                            notnull: true,
1282                            primary_key: false,
1283                        },
1284                        crate::ColumnSnapshot {
1285                            name: "done".into(),
1286                            column_type: "BOOLEAN".into(),
1287                            notnull: true,
1288                            primary_key: false,
1289                        },
1290                        crate::ColumnSnapshot {
1291                            name: "authorId".into(),
1292                            column_type: "TEXT".into(),
1293                            notnull: true,
1294                            primary_key: false,
1295                        },
1296                        crate::ColumnSnapshot {
1297                            name: "createdAt".into(),
1298                            column_type: "TIMESTAMPTZ".into(),
1299                            notnull: true,
1300                            primary_key: false,
1301                        },
1302                    ],
1303                    indexes: vec![crate::IndexSnapshot {
1304                        name: "Todo_by_author".into(),
1305                        columns: vec!["authorId".into()],
1306                        unique: false,
1307                    }],
1308                },
1309            ],
1310        };
1311        let manifest = test_manifest();
1312        let plan = plan_from_snapshot(&snapshot, &manifest);
1313
1314        let add_fields: Vec<_> = plan
1315            .operations
1316            .iter()
1317            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1318            .collect();
1319        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1320    }
1321
1322    // -- CRUD helper tests (no live database required) --
1323
1324    #[test]
1325    fn json_value_to_string_handles_all_types() {
1326        assert_eq!(
1327            json_value_to_string(&serde_json::Value::String("hello".into())),
1328            "hello"
1329        );
1330        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1331        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1332        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1333        assert_eq!(
1334            json_value_to_string(&serde_json::Value::Bool(false)),
1335            "false"
1336        );
1337        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1338        // Arrays and objects get their JSON representation.
1339        assert_eq!(
1340            json_value_to_string(&serde_json::json!([1, 2, 3])),
1341            "[1,2,3]"
1342        );
1343        assert_eq!(
1344            json_value_to_string(&serde_json::json!({"a": 1})),
1345            "{\"a\":1}"
1346        );
1347    }
1348
1349    #[test]
1350    fn generate_id_returns_hex_string() {
1351        let id = generate_id();
1352        assert!(!id.is_empty());
1353        // Must be valid hex characters.
1354        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1355    }
1356
1357    #[test]
1358    fn generate_id_is_unique_across_calls() {
1359        let id1 = generate_id();
1360        let id2 = generate_id();
1361        assert_ne!(id1, id2);
1362    }
1363
1364    #[test]
1365    fn generate_id_is_lex_sortable() {
1366        // 1000 IDs back-to-back must come out in monotonically increasing
1367        // lexicographic order. This is what makes cursor pagination correct.
1368        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1369        let sorted = {
1370            let mut s = ids.clone();
1371            s.sort();
1372            s
1373        };
1374        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1375        // And every id must be the same width (otherwise lex comparison is
1376        // wrong at width boundaries).
1377        let len0 = ids[0].len();
1378        assert!(ids.iter().all(|id| id.len() == len0));
1379        ids.dedup();
1380        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1381    }
1382
1383    #[test]
1384    fn build_insert_sql_simple() {
1385        let data = serde_json::json!({
1386            "email": "alice@example.com",
1387            "displayName": "Alice"
1388        });
1389        let (sql, values) = build_insert_sql("User", &data).unwrap();
1390
1391        assert!(sql.starts_with("INSERT INTO \"User\""));
1392        assert!(sql.contains("id"));
1393        assert!(sql.contains("$1"));
1394        assert!(sql.contains("$2"));
1395        assert!(sql.contains("$3"));
1396        // First value is the generated ID.
1397        assert!(!values[0].is_empty());
1398        assert_eq!(values.len(), 3); // id + 2 fields
1399    }
1400
1401    #[test]
1402    fn build_insert_sql_quotes_column_names() {
1403        let data = serde_json::json!({"createdAt": "2026-01-01"});
1404        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1405        assert!(sql.contains("\"createdAt\""));
1406        assert!(sql.contains("\"Todo\""));
1407    }
1408
1409    #[test]
1410    fn build_insert_sql_rejects_non_object() {
1411        let data = serde_json::json!("not an object");
1412        let result = build_insert_sql("User", &data);
1413        assert!(result.is_err());
1414        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1415    }
1416
1417    #[test]
1418    fn build_update_sql_simple() {
1419        let data = serde_json::json!({
1420            "displayName": "Bob",
1421            "email": "bob@example.com"
1422        });
1423        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1424
1425        assert!(sql.starts_with("UPDATE \"User\" SET"));
1426        assert!(sql.contains("WHERE id = $1"));
1427        assert!(sql.contains("$2"));
1428        assert!(sql.contains("$3"));
1429        assert_eq!(values[0], "abc123");
1430        assert_eq!(values.len(), 3); // id + 2 fields
1431    }
1432
1433    #[test]
1434    fn build_update_sql_quotes_column_names() {
1435        let data = serde_json::json!({"displayName": "Carol"});
1436        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1437        assert!(sql.contains("\"displayName\" = $2"));
1438    }
1439
1440    #[test]
1441    fn build_update_sql_rejects_non_object() {
1442        let data = serde_json::json!(42);
1443        let result = build_update_sql("User", "id1", &data);
1444        assert!(result.is_err());
1445        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1446    }
1447
1448    #[test]
1449    fn build_update_sql_rejects_empty_object() {
1450        let data = serde_json::json!({});
1451        let err = build_update_sql("User", "id1", &data).unwrap_err();
1452        assert_eq!(err.code, "PG_INVALID_DATA");
1453        assert!(err.message.contains("at least one field"));
1454    }
1455}