Skip to main content

architect_sdk/sql/
builder.rs

1//! Builds parameterized INSERT, SELECT, UPDATE, DELETE from resolved entity.
2
3use crate::config::{IncludeDirection, PkType, ResolvedEntity};
4use crate::db::{type_category_from_cast, CanonicalType, Dialect, TypeCategory};
5use crate::error::AppError;
6use crate::extensible_fields::ExtensibleRegistry;
7use crate::sql::rsql::{FilterNode, RsqlOp, SortSpec};
8use serde_json::Value;
9use std::collections::HashMap;
10
11/// Describes one include for single-query list: name, direction, related entity, our key column, their key column.
12pub struct IncludeSelect<'a> {
13    pub name: &'a str,
14    pub direction: IncludeDirection,
15    pub related: &'a ResolvedEntity,
16    pub our_key: &'a str,
17    pub their_key: &'a str,
18}
19
20/// Quote identifier for PostgreSQL (safe: only from config).
21fn quoted(s: &str) -> String {
22    format!("\"{}\"", s.replace('"', "\"\""))
23}
24
25/// Full qualified table name.
26fn qualified_table(schema: &str, table: &str) -> String {
27    format!("{}.{}", quoted(schema), quoted(table))
28}
29
30pub struct QueryBuf {
31    pub sql: String,
32    pub params: Vec<Value>,
33}
34
35impl QueryBuf {
36    fn new() -> Self {
37        QueryBuf {
38            sql: String::new(),
39            params: Vec::new(),
40        }
41    }
42
43    fn push_param(&mut self, v: Value) -> u32 {
44        let n = self.params.len() as u32 + 1;
45        self.params.push(v);
46        n
47    }
48}
49
50/// SELECT list: each column as-is, except custom enum (schema.typename), numeric, time, and timetz
51/// as col::text so sqlx returns String.
52fn select_column_list(entity: &ResolvedEntity) -> String {
53    entity
54        .columns
55        .iter()
56        .map(|c| {
57            let q = quoted(&c.name);
58            let pg_type = c.pg_type.as_deref().unwrap_or("");
59            if pg_type.contains('.')
60                || pg_type == "numeric"
61                || pg_type == "time"
62                || pg_type == "timetz"
63            {
64                format!("{}::text", q)
65            } else {
66                q
67            }
68        })
69        .collect::<Vec<_>>()
70        .join(", ")
71}
72
73/// Resolve schema: override if present, else entity's schema.
74fn resolve_schema<'a>(entity: &'a ResolvedEntity, schema_override: Option<&'a str>) -> &'a str {
75    schema_override.unwrap_or(&entity.schema_name)
76}
77
78/// Postgres array columns: API accepts JSON `["a","b"]`; bind as array literal + `$n::varchar(255)[]` etc.
79pub fn coerce_json_value_for_pg_array(val: Value, pg_type: Option<&str>) -> Value {
80    if !pg_type.is_some_and(|t| t.ends_with("[]")) {
81        return val;
82    }
83    match val {
84        Value::Null => Value::Null,
85        Value::Array(items) => {
86            let mut out = String::from('{');
87            for (i, v) in items.iter().enumerate() {
88                if i > 0 {
89                    out.push(',');
90                }
91                match v {
92                    Value::Null => out.push_str("NULL"),
93                    other => {
94                        let elem = match other {
95                            Value::String(s) => s.clone(),
96                            Value::Number(n) => n.to_string(),
97                            Value::Bool(b) => b.to_string(),
98                            _ => serde_json::to_string(other).unwrap_or_else(|_| "{}".to_string()),
99                        };
100                        out.push('"');
101                        for ch in elem.chars() {
102                            if ch == '"' || ch == '\\' {
103                                out.push('\\');
104                            }
105                            out.push(ch);
106                        }
107                        out.push('"');
108                    }
109                }
110            }
111            out.push('}');
112            Value::String(out)
113        }
114        // multipart/form-data sends every field as a scalar string, so an array column
115        // arrives as a single comma-separated string (e.g. "id1, id2"). Split it into
116        // elements — trimming whitespace and dropping empties — so it binds as a real
117        // array. A string with no comma becomes a single-element array (clients can send
118        // `"id"` instead of `["id"]`). JSON clients send a proper `Value::Array` and hit
119        // the arm above, so their comma-containing values are never split.
120        Value::String(s) => {
121            let items: Vec<Value> = s
122                .split(',')
123                .map(|part| part.trim())
124                .filter(|part| !part.is_empty())
125                .map(|part| Value::String(part.to_string()))
126                .collect();
127            coerce_json_value_for_pg_array(Value::Array(items), pg_type)
128        }
129        // Other scalar JSON values (number, bool) → single-element array for convenience.
130        other => coerce_json_value_for_pg_array(Value::Array(vec![other]), pg_type),
131    }
132}
133
134/// Placeholder for PK in WHERE (e.g. $1, $1::uuid, $1::bigint) so the bound value — which
135/// always travels over the wire as TEXT — is cast to the column type. Without this, a numeric
136/// PK comparison fails with `operator does not exist: bigint = text`.
137fn pk_placeholder(entity: &ResolvedEntity, param_num: usize, dialect: &dyn Dialect) -> String {
138    let ph = dialect.placeholder(param_num);
139    let canonical = match &entity.pk_type {
140        PkType::Uuid => crate::db::CanonicalType::Uuid,
141        PkType::BigInt => crate::db::CanonicalType::BigInt,
142        PkType::Int => crate::db::CanonicalType::Int,
143        PkType::Text => return ph,
144    };
145    match dialect.cast_name(&canonical) {
146        Some(cast) => dialect.cast_expr(&ph, &cast),
147        None => ph,
148    }
149}
150
151// ─── RSQL → SQL ───────────────────────────────────────────────────────────────
152
153fn op_valid_for_category(op: &RsqlOp, category: TypeCategory) -> bool {
154    match category {
155        TypeCategory::Text => matches!(
156            op,
157            RsqlOp::Eq
158                | RsqlOp::Neq
159                | RsqlOp::In
160                | RsqlOp::Out
161                | RsqlOp::Like
162                | RsqlOp::Ilike
163                | RsqlOp::Contains
164                | RsqlOp::Starts
165                | RsqlOp::Ends
166                | RsqlOp::Null(_)
167        ),
168        TypeCategory::Int | TypeCategory::Float => matches!(
169            op,
170            RsqlOp::Eq
171                | RsqlOp::Neq
172                | RsqlOp::Gt
173                | RsqlOp::Ge
174                | RsqlOp::Lt
175                | RsqlOp::Le
176                | RsqlOp::Between
177                | RsqlOp::In
178                | RsqlOp::Out
179                | RsqlOp::Null(_)
180        ),
181        TypeCategory::Bool => matches!(op, RsqlOp::Eq | RsqlOp::Neq | RsqlOp::Null(_)),
182        TypeCategory::Uuid => matches!(
183            op,
184            RsqlOp::Eq | RsqlOp::Neq | RsqlOp::In | RsqlOp::Out | RsqlOp::Null(_)
185        ),
186        TypeCategory::Date | TypeCategory::Timestamp | TypeCategory::Time => matches!(
187            op,
188            RsqlOp::Eq
189                | RsqlOp::Neq
190                | RsqlOp::Gt
191                | RsqlOp::Ge
192                | RsqlOp::Lt
193                | RsqlOp::Le
194                | RsqlOp::Between
195                | RsqlOp::In
196                | RsqlOp::Out
197                | RsqlOp::Null(_)
198        ),
199        // JSON, bytes, arrays, custom types: allow all operators.
200        TypeCategory::Json | TypeCategory::Bytes | TypeCategory::Other => true,
201    }
202}
203
204/// Dialect-independent SQL type name for a canonical type, suitable for both RSQL
205/// operator-category classification (via `type_category_from_cast`) and Postgres placeholder
206/// casts. Returns `None` for text-like types (which need no cast). Used for extensible-field keys
207/// whose declared type comes from the KV registry rather than `ColumnInfo.pg_type`.
208fn canonical_cast_str(t: &CanonicalType) -> Option<&'static str> {
209    match t {
210        CanonicalType::SmallInt => Some("smallint"),
211        CanonicalType::Int | CanonicalType::Serial => Some("integer"),
212        CanonicalType::BigInt | CanonicalType::BigSerial => Some("bigint"),
213        CanonicalType::Real => Some("real"),
214        CanonicalType::Double => Some("double precision"),
215        CanonicalType::Decimal(_) => Some("numeric"),
216        CanonicalType::Boolean => Some("boolean"),
217        CanonicalType::Uuid => Some("uuid"),
218        CanonicalType::Json | CanonicalType::Jsonb => Some("jsonb"),
219        CanonicalType::Timestamp => Some("timestamptz"),
220        CanonicalType::TimestampNtz => Some("timestamp"),
221        CanonicalType::Date => Some("date"),
222        CanonicalType::Time => Some("time"),
223        CanonicalType::Timetz => Some("timetz"),
224        _ => None,
225    }
226}
227
228fn make_placeholder(n: usize, cast: Option<&str>, dialect: &dyn Dialect) -> String {
229    let ph = dialect.placeholder(n);
230    match cast {
231        Some(t) => dialect.cast_expr(&ph, t),
232        None => ph,
233    }
234}
235
236/// Build the SQL fragment for a single RSQL leaf condition.
237/// `qcol` is an already-quoted (and optionally qualified) column expression.
238/// `pg_type` drives operator validation and placeholder casting.
239/// `field_label` is used only in error messages (e.g. "bay" or "transport_unit.bay").
240fn build_leaf_sql(
241    qcol: &str,
242    pg_type: Option<&str>,
243    op: &RsqlOp,
244    values: &[String],
245    q: &mut QueryBuf,
246    field_label: &str,
247    dialect: &dyn Dialect,
248) -> Result<String, AppError> {
249    let category = type_category_from_cast(pg_type.unwrap_or("text"));
250    if !op_valid_for_category(op, category) {
251        return Err(AppError::Validation(format!(
252            "operator {} is not valid for {:?} field '{}' (type: {})",
253            op.display(),
254            category,
255            field_label,
256            pg_type.unwrap_or("text")
257        )));
258    }
259    let cast = if matches!(
260        op,
261        RsqlOp::Like | RsqlOp::Ilike | RsqlOp::Contains | RsqlOp::Starts | RsqlOp::Ends
262    ) {
263        None
264    } else {
265        pg_type
266    };
267    match op {
268        RsqlOp::Null(is_null) => Ok(if *is_null {
269            format!("{} IS NULL", qcol)
270        } else {
271            format!("{} IS NOT NULL", qcol)
272        }),
273        RsqlOp::Eq | RsqlOp::Neq | RsqlOp::Gt | RsqlOp::Ge | RsqlOp::Lt | RsqlOp::Le => {
274            let v = values.first().cloned().unwrap_or_default();
275            let n = q.push_param(Value::String(v));
276            let ph = make_placeholder(n as usize, cast, dialect);
277            let cmp = match op {
278                RsqlOp::Eq => "=",
279                RsqlOp::Neq => "!=",
280                RsqlOp::Gt => ">",
281                RsqlOp::Ge => ">=",
282                RsqlOp::Lt => "<",
283                RsqlOp::Le => "<=",
284                _ => unreachable!(),
285            };
286            Ok(format!("{} {} {}", qcol, cmp, ph))
287        }
288        RsqlOp::Like => {
289            let v = values.first().cloned().unwrap_or_default();
290            let n = q.push_param(Value::String(v));
291            Ok(format!("{} LIKE {}", qcol, dialect.placeholder(n as usize)))
292        }
293        RsqlOp::Ilike => {
294            let v = values.first().cloned().unwrap_or_default();
295            let n = q.push_param(Value::String(v));
296            let ph = dialect.placeholder(n as usize);
297            Ok(dialect.case_insensitive_like(qcol, &ph))
298        }
299        RsqlOp::Contains => {
300            let v = values.first().cloned().unwrap_or_default();
301            let n = q.push_param(Value::String(format!("%{}%", v)));
302            let ph = dialect.placeholder(n as usize);
303            Ok(dialect.case_insensitive_like(qcol, &ph))
304        }
305        RsqlOp::Starts => {
306            let v = values.first().cloned().unwrap_or_default();
307            let n = q.push_param(Value::String(format!("{}%", v)));
308            let ph = dialect.placeholder(n as usize);
309            Ok(dialect.case_insensitive_like(qcol, &ph))
310        }
311        RsqlOp::Ends => {
312            let v = values.first().cloned().unwrap_or_default();
313            let n = q.push_param(Value::String(format!("%{}", v)));
314            let ph = dialect.placeholder(n as usize);
315            Ok(dialect.case_insensitive_like(qcol, &ph))
316        }
317        RsqlOp::In => {
318            if values.is_empty() {
319                return Err(AppError::Validation(format!(
320                    "=in= requires at least one value for field '{}'",
321                    field_label
322                )));
323            }
324            let phs: Vec<String> = values
325                .iter()
326                .map(|v| {
327                    let n = q.push_param(Value::String(v.clone()));
328                    make_placeholder(n as usize, cast, dialect)
329                })
330                .collect();
331            Ok(format!("{} IN ({})", qcol, phs.join(", ")))
332        }
333        RsqlOp::Out => {
334            if values.is_empty() {
335                return Err(AppError::Validation(format!(
336                    "=out= requires at least one value for field '{}'",
337                    field_label
338                )));
339            }
340            let phs: Vec<String> = values
341                .iter()
342                .map(|v| {
343                    let n = q.push_param(Value::String(v.clone()));
344                    make_placeholder(n as usize, cast, dialect)
345                })
346                .collect();
347            Ok(format!("{} NOT IN ({})", qcol, phs.join(", ")))
348        }
349        RsqlOp::Between => {
350            if values.len() != 2 {
351                return Err(AppError::Validation(format!(
352                    "=between= requires exactly 2 values for field '{}', got {}",
353                    field_label,
354                    values.len()
355                )));
356            }
357            let n1 = q.push_param(Value::String(values[0].clone()));
358            let n2 = q.push_param(Value::String(values[1].clone()));
359            Ok(format!(
360                "{} BETWEEN {} AND {}",
361                qcol,
362                make_placeholder(n1 as usize, cast, dialect),
363                make_placeholder(n2 as usize, cast, dialect)
364            ))
365        }
366        #[allow(unreachable_patterns)]
367        RsqlOp::Null(_) => unreachable!(),
368    }
369}
370
371/// Convert a `FilterNode` tree into a SQL WHERE fragment (no leading `WHERE`).
372/// All values are pushed as parameters into `q`; identifiers come only from
373/// config (never from user input) so SQL injection is structurally impossible.
374///
375/// `col_qualifier` is an optional table alias prefix, e.g. `"main."` for aliased queries.
376///
377/// `filter_includes` supplies the related-entity metadata needed to generate
378/// EXISTS subqueries for dotted-field filters like `transport_unit.bay=contains=bay23`.
379#[allow(clippy::too_many_arguments)]
380pub fn rsql_to_sql(
381    node: &FilterNode,
382    entity: &ResolvedEntity,
383    q: &mut QueryBuf,
384    col_qualifier: Option<&str>,
385    filter_includes: &[IncludeSelect<'_>],
386    schema_override: Option<&str>,
387    dialect: &dyn Dialect,
388    registry: Option<&ExtensibleRegistry>,
389) -> Result<String, AppError> {
390    match node {
391        FilterNode::And(children) => {
392            let parts: Result<Vec<_>, _> = children
393                .iter()
394                .map(|c| {
395                    rsql_to_sql(
396                        c,
397                        entity,
398                        q,
399                        col_qualifier,
400                        filter_includes,
401                        schema_override,
402                        dialect,
403                        registry,
404                    )
405                })
406                .collect();
407            Ok(format!("({})", parts?.join(" AND ")))
408        }
409        FilterNode::Or(children) => {
410            let parts: Result<Vec<_>, _> = children
411                .iter()
412                .map(|c| {
413                    rsql_to_sql(
414                        c,
415                        entity,
416                        q,
417                        col_qualifier,
418                        filter_includes,
419                        schema_override,
420                        dialect,
421                        registry,
422                    )
423                })
424                .collect();
425            Ok(format!("({})", parts?.join(" OR ")))
426        }
427        FilterNode::Leaf { field, op, values } => {
428            // Dotted field: first check for a extensible-fields bag (`<extensible_col>.<key>`),
429            // then fall back to related-entity include semantics (`<include>.<field>`).
430            if let Some(dot_pos) = field.find('.') {
431                let head = &field[..dot_pos];
432                let key = &field[dot_pos + 1..];
433
434                if entity.extensible_columns.iter().any(|c| c == head) {
435                    let def = registry.and_then(|r| r.field(head, key)).ok_or_else(|| {
436                        AppError::Validation(format!(
437                            "unknown extensible field '{}' (not declared in the registry)",
438                            field
439                        ))
440                    })?;
441                    if !def.filterable {
442                        return Err(AppError::Validation(format!(
443                            "extensible field '{}' is not filterable",
444                            field
445                        )));
446                    }
447                    let canonical = def.canonical();
448                    // The canonical cast string drives both operator-category validation and the
449                    // Postgres placeholder cast. It is dialect-independent on purpose: MySQL/SQLite
450                    // `cast_name` is always None, which would otherwise misclassify a numeric
451                    // extensible field as text and reject `=gt=`.
452                    let cf_cast = canonical_cast_str(&canonical);
453                    let base_col = match col_qualifier {
454                        Some(pfx) => format!("{}{}", pfx, quoted(head)),
455                        None => quoted(head),
456                    };
457                    let json_expr = dialect.json_extract_typed(&base_col, key, &canonical);
458                    return build_leaf_sql(&json_expr, cf_cast, op, values, q, field, dialect);
459                }
460
461                let include_name = head;
462                let sub_field = key;
463
464                let inc = filter_includes
465                    .iter()
466                    .find(|i| i.name == include_name)
467                    .ok_or_else(|| AppError::Validation(format!(
468                        "filter on '{}': '{}' is not a known include — add it to the include= parameter or ensure the relationship is configured",
469                        field, include_name
470                    )))?;
471
472                let col_info = inc
473                    .related
474                    .columns
475                    .iter()
476                    .find(|c| c.name == sub_field)
477                    .ok_or_else(|| {
478                        AppError::Validation(format!(
479                            "unknown filter field '{}' on related entity '{}'",
480                            sub_field, include_name
481                        ))
482                    })?;
483
484                let rel_schema = schema_override.unwrap_or(inc.related.schema_name.as_str());
485                let rel_table = qualified_table(rel_schema, &inc.related.table_name);
486
487                // FK join condition: related.their_key = main.our_key
488                let join_cond = match col_qualifier {
489                    Some(pfx) => {
490                        format!("{} = {}{}", quoted(inc.their_key), pfx, quoted(inc.our_key))
491                    }
492                    None => format!("{} = {}", quoted(inc.their_key), quoted(inc.our_key)),
493                };
494
495                let field_cond = build_leaf_sql(
496                    &quoted(sub_field),
497                    col_info.pg_type.as_deref(),
498                    op,
499                    values,
500                    q,
501                    field,
502                    dialect,
503                )?;
504
505                return Ok(format!(
506                    "EXISTS (SELECT 1 FROM {} WHERE {} AND {})",
507                    rel_table, join_cond, field_cond
508                ));
509            }
510
511            // Plain field: look up in main entity
512            let col_info = entity
513                .columns
514                .iter()
515                .find(|c| c.name == *field)
516                .ok_or_else(|| AppError::Validation(format!("unknown filter field '{}'", field)))?;
517
518            let qcol = match col_qualifier {
519                Some(pfx) => format!("{}{}", pfx, quoted(field)),
520                None => quoted(field),
521            };
522
523            build_leaf_sql(
524                &qcol,
525                col_info.pg_type.as_deref(),
526                op,
527                values,
528                q,
529                field,
530                dialect,
531            )
532        }
533    }
534}
535
536/// Build ORDER BY clause from sort specs, falling back to pk ASC when empty.
537///
538/// A sort field may be a plain column, or a extensible-field key via the `<extensible_col>.<key>`
539/// syntax — resolved against the per-tenant `registry` and emitted as a typed JSON extraction.
540/// Unknown plain columns are silently skipped (back-compatible); a extensible-field sort that is
541/// unknown or not sortable is a hard error.
542fn build_order_by(
543    sort: &[SortSpec],
544    entity: &ResolvedEntity,
545    col_qualifier: Option<&str>,
546    dialect: &dyn Dialect,
547    registry: Option<&ExtensibleRegistry>,
548) -> Result<String, AppError> {
549    let pk = &entity.pk_columns[0];
550    let col_names: std::collections::HashSet<&str> =
551        entity.columns.iter().map(|c| c.name.as_str()).collect();
552
553    let dir = |desc: bool| if desc { "DESC" } else { "ASC" };
554    let qualify = |name: &str| match col_qualifier {
555        Some(pfx) => format!("{}{}", pfx, quoted(name)),
556        None => quoted(name),
557    };
558
559    let mut parts: Vec<String> = Vec::new();
560    for s in sort {
561        // Custom-field sort: `<extensible_col>.<key>`.
562        if let Some(dot_pos) = s.field.find('.') {
563            let head = &s.field[..dot_pos];
564            let key = &s.field[dot_pos + 1..];
565            if entity.extensible_columns.iter().any(|c| c == head) {
566                let def = registry.and_then(|r| r.field(head, key)).ok_or_else(|| {
567                    AppError::Validation(format!(
568                        "unknown extensible field '{}' in sort (not declared in the registry)",
569                        s.field
570                    ))
571                })?;
572                if !def.sortable {
573                    return Err(AppError::Validation(format!(
574                        "extensible field '{}' is not sortable",
575                        s.field
576                    )));
577                }
578                let canonical = def.canonical();
579                let json_expr = dialect.json_extract_typed(&qualify(head), key, &canonical);
580                parts.push(format!("{} {}", json_expr, dir(s.desc)));
581                continue;
582            }
583            // Not a extensible-fields column: fall through and let the plain-column filter drop it.
584        }
585        if col_names.contains(s.field.as_str()) {
586            parts.push(format!("{} {}", qualify(&s.field), dir(s.desc)));
587        }
588    }
589
590    if parts.is_empty() {
591        Ok(format!(" ORDER BY {}", qualify(pk)))
592    } else {
593        Ok(format!(" ORDER BY {}", parts.join(", ")))
594    }
595}
596
597/// SELECT by primary key (single column PK only). Caller adds id as sole param.
598pub fn select_by_id(
599    entity: &ResolvedEntity,
600    schema_override: Option<&str>,
601    dialect: &dyn Dialect,
602) -> QueryBuf {
603    let mut q = QueryBuf::new();
604    let schema = resolve_schema(entity, schema_override);
605    let table = qualified_table(schema, &entity.table_name);
606    let pk = &entity.pk_columns[0];
607    let cols = select_column_list(entity);
608    let ph = pk_placeholder(entity, 1, dialect);
609    q.sql = format!(
610        "SELECT {} FROM {} WHERE {} = {}",
611        cols,
612        table,
613        quoted(pk),
614        ph
615    );
616    q
617}
618
619/// SELECT list with includes in a single query: main table aliased as "main", each include as a scalar subquery (json_agg for to_many, row_to_json for to_one).
620/// `includes` drives the scalar subqueries (response data); `filter_includes` is the superset used
621/// for EXISTS generation when the filter references dotted fields like `transport_unit.bay`.
622#[allow(clippy::too_many_arguments)]
623pub fn select_list_with_includes(
624    entity: &ResolvedEntity,
625    filter: Option<&FilterNode>,
626    sort: &[SortSpec],
627    limit: Option<u32>,
628    offset: Option<u32>,
629    includes: &[IncludeSelect<'_>],
630    filter_includes: &[IncludeSelect<'_>],
631    schema_override: Option<&str>,
632    dialect: &dyn Dialect,
633    registry: Option<&ExtensibleRegistry>,
634) -> Result<QueryBuf, AppError> {
635    let mut q = QueryBuf::new();
636    let schema = resolve_schema(entity, schema_override);
637    let table = qualified_table(schema, &entity.table_name);
638    const MAIN_ALIAS: &str = "main";
639    let main_qualifier = format!("{}.", MAIN_ALIAS);
640
641    let main_cols: Vec<String> = entity
642        .columns
643        .iter()
644        .map(|c| {
645            let q = quoted(&c.name);
646            let pg_type = c.pg_type.as_deref().unwrap_or("");
647            let expr = if pg_type.contains('.')
648                || pg_type == "numeric"
649                || pg_type == "time"
650                || pg_type == "timetz"
651            {
652                format!("{}.{}::text", MAIN_ALIAS, q)
653            } else {
654                format!("{}.{}", MAIN_ALIAS, q)
655            };
656            format!("{} AS {}", expr, q)
657        })
658        .collect();
659
660    let mut select_parts = main_cols;
661    for inc in includes {
662        let rel_schema = resolve_schema(inc.related, schema_override);
663        let rel_table = qualified_table(rel_schema, &inc.related.table_name);
664        let sub_from = format!(
665            "{} WHERE {} = {}.{}",
666            rel_table,
667            quoted(inc.their_key),
668            MAIN_ALIAS,
669            quoted(inc.our_key)
670        );
671        let rel_col_exprs: Vec<String> = inc
672            .related
673            .columns
674            .iter()
675            .map(|c| dialect.quote_ident(&c.name))
676            .collect();
677        let subquery = match inc.direction {
678            IncludeDirection::ToOne => dialect.to_one_subquery(&rel_col_exprs, &sub_from),
679            IncludeDirection::ToMany => dialect.to_many_subquery(&rel_col_exprs, &sub_from),
680        };
681        select_parts.push(format!("{} AS {}", subquery, quoted(inc.name)));
682    }
683
684    let where_clause = match filter {
685        Some(node) => {
686            let frag = rsql_to_sql(
687                node,
688                entity,
689                &mut q,
690                Some(&main_qualifier),
691                filter_includes,
692                schema_override,
693                dialect,
694                registry,
695            )?;
696            format!(" WHERE {}", frag)
697        }
698        None => String::new(),
699    };
700    let order_clause = build_order_by(sort, entity, Some(&main_qualifier), dialect, registry)?;
701    let limit_clause = limit
702        .map(|n| format!(" LIMIT {}", n.min(1000)))
703        .unwrap_or_default();
704    let offset_clause = offset.map(|n| format!(" OFFSET {}", n)).unwrap_or_default();
705
706    q.sql = format!(
707        "SELECT {} FROM {} {}{}{}{}{}",
708        select_parts.join(", "),
709        table,
710        MAIN_ALIAS,
711        where_clause,
712        order_clause,
713        limit_clause,
714        offset_clause
715    );
716    Ok(q)
717}
718
719/// SELECT list with optional RSQL filter and sort specs.
720/// `filter_includes` is needed when the filter contains dotted-field conditions
721/// (e.g. `transport_unit.bay=contains=bay23`) that generate EXISTS subqueries.
722/// Pass an empty slice when there are no such filters.
723#[allow(clippy::too_many_arguments)]
724pub fn select_list(
725    entity: &ResolvedEntity,
726    filter: Option<&FilterNode>,
727    sort: &[SortSpec],
728    limit: Option<u32>,
729    offset: Option<u32>,
730    filter_includes: &[IncludeSelect<'_>],
731    schema_override: Option<&str>,
732    dialect: &dyn Dialect,
733    registry: Option<&ExtensibleRegistry>,
734) -> Result<QueryBuf, AppError> {
735    let mut q = QueryBuf::new();
736    let schema = resolve_schema(entity, schema_override);
737    let table = qualified_table(schema, &entity.table_name);
738
739    let where_clause = match filter {
740        Some(node) => {
741            let frag = rsql_to_sql(
742                node,
743                entity,
744                &mut q,
745                None,
746                filter_includes,
747                schema_override,
748                dialect,
749                registry,
750            )?;
751            format!(" WHERE {}", frag)
752        }
753        None => String::new(),
754    };
755    let order_clause = build_order_by(sort, entity, None, dialect, registry)?;
756    let limit_clause = limit
757        .map(|n| format!(" LIMIT {}", n.min(1000)))
758        .unwrap_or_default();
759    let offset_clause = offset.map(|n| format!(" OFFSET {}", n)).unwrap_or_default();
760    let cols = select_column_list(entity);
761    q.sql = format!(
762        "SELECT {} FROM {}{}{}{}{}",
763        cols, table, where_clause, order_clause, limit_clause, offset_clause
764    );
765    Ok(q)
766}
767
768/// SELECT * FROM entity WHERE column IN ($1, $2, ...) ORDER BY pk. Used for batch-fetching related rows (to_many or to_one by key).
769pub fn select_by_column_in(
770    entity: &ResolvedEntity,
771    column_name: &str,
772    values: &[Value],
773    schema_override: Option<&str>,
774    dialect: &dyn Dialect,
775) -> QueryBuf {
776    let mut q = QueryBuf::new();
777    let schema = resolve_schema(entity, schema_override);
778    let table = qualified_table(schema, &entity.table_name);
779    let pk = &entity.pk_columns[0];
780    if values.is_empty() {
781        let cols = select_column_list(entity);
782        q.sql = format!("SELECT {} FROM {} WHERE 1 = 0", cols, table);
783        return q;
784    }
785    let placeholders: Vec<String> = values
786        .iter()
787        .map(|v| {
788            let n = q.push_param(v.clone());
789            entity
790                .columns
791                .iter()
792                .find(|c| c.name == column_name)
793                .and_then(|c| c.pg_type.as_deref())
794                .map(|t| dialect.cast_expr(&dialect.placeholder(n as usize), t))
795                .unwrap_or_else(|| dialect.placeholder(n as usize))
796        })
797        .collect();
798    let cols = select_column_list(entity);
799    q.sql = format!(
800        "SELECT {} FROM {} WHERE {} IN ({}) ORDER BY {}",
801        cols,
802        table,
803        quoted(column_name),
804        placeholders.join(", "),
805        quoted(pk)
806    );
807    q
808}
809
810/// INSERT: columns and placeholders from entity; values from body. Excludes PK if has_default.
811/// Omits columns with DB default when body does not provide a value (so DB uses default).
812/// Uses SQL cast (e.g. $n::timestamptz) for timestamp columns so string values bind correctly.
813/// When `rls_tenant_id` is Some, appends tenant_id column and value (for RLS strategy).
814pub fn insert(
815    entity: &ResolvedEntity,
816    body: &HashMap<String, Value>,
817    include_pk: bool,
818    schema_override: Option<&str>,
819    rls_tenant_id: Option<&str>,
820    caller_user_id: Option<&str>,
821    dialect: &dyn Dialect,
822) -> QueryBuf {
823    let mut q = QueryBuf::new();
824    let schema = resolve_schema(entity, schema_override);
825    let table = qualified_table(schema, &entity.table_name);
826    let mut cols = Vec::new();
827    let mut placeholders = Vec::new();
828    for c in &entity.columns {
829        let name = &c.name;
830        if c.pk_type.is_some() && !include_pk {
831            continue;
832        }
833        // archive_field may only be written via the dedicated archive endpoint, never via POST/create.
834        if entity.archive_field.as_deref().is_some_and(|af| name == af) {
835            continue;
836        }
837        // updated_by is only meaningful on updates, leave NULL on insert.
838        if name == "updated_by" {
839            continue;
840        }
841        let val = if name == "created_by" {
842            caller_user_id
843                .map(|uid| Value::String(uid.to_string()))
844                .or_else(|| body.get(name).cloned())
845        } else {
846            body.get(name).cloned()
847        };
848        if val.is_none() && c.has_default {
849            continue;
850        }
851        let val = val.unwrap_or(Value::Null);
852        let val = coerce_json_value_for_pg_array(val, c.pg_type.as_deref());
853        let param_num = q.push_param(val);
854        let ph = c
855            .pg_type
856            .as_deref()
857            .map(|t| dialect.cast_expr(&dialect.placeholder(param_num as usize), t))
858            .unwrap_or_else(|| dialect.placeholder(param_num as usize));
859        cols.push(quoted(name));
860        placeholders.push(ph);
861    }
862    if let Some(tid) = rls_tenant_id {
863        let param_num = q.push_param(Value::String(tid.to_string()));
864        cols.push(quoted("tenant_id"));
865        placeholders.push(dialect.placeholder(param_num as usize));
866    }
867    let col_list = select_column_list(entity);
868    let ret = dialect.returning_clause(&col_list);
869    let suffix = if ret.is_empty() {
870        String::new()
871    } else {
872        format!(" {}", ret)
873    };
874    q.sql = format!(
875        "INSERT INTO {} ({}) VALUES ({}){}",
876        table,
877        cols.join(", "),
878        placeholders.join(", "),
879        suffix
880    );
881    q
882}
883
884/// UPDATE by id: SET only columns present in body (and in entity columns).
885/// Uses SQL cast for timestamp columns so string values bind correctly.
886pub fn update(
887    entity: &ResolvedEntity,
888    id: &Value,
889    body: &HashMap<String, Value>,
890    schema_override: Option<&str>,
891    caller_user_id: Option<&str>,
892    dialect: &dyn Dialect,
893) -> QueryBuf {
894    let mut q = QueryBuf::new();
895    let schema = resolve_schema(entity, schema_override);
896    let table = qualified_table(schema, &entity.table_name);
897    let pk = &entity.pk_columns[0];
898    let col_by_name: std::collections::HashMap<_, _> = entity
899        .columns
900        .iter()
901        .map(|c| (c.name.as_str(), c))
902        .collect();
903    let mut sets = Vec::new();
904    for (k, v) in body {
905        if *k == *pk {
906            continue;
907        }
908        if k == "tenant_id" {
909            continue;
910        }
911        // archive_field may only be written via the dedicated archive endpoint, never via PATCH.
912        if entity.archive_field.as_deref().is_some_and(|af| k == af) {
913            continue;
914        }
915        let Some(c) = col_by_name.get(k.as_str()) else {
916            continue;
917        };
918        let v = coerce_json_value_for_pg_array(v.clone(), c.pg_type.as_deref());
919        let param_num = q.push_param(v);
920        let rhs = c
921            .pg_type
922            .as_deref()
923            .map(|t| dialect.cast_expr(&dialect.placeholder(param_num as usize), t))
924            .unwrap_or_else(|| dialect.placeholder(param_num as usize));
925        sets.push(format!("{} = {}", quoted(k), rhs));
926    }
927    sets.push(format!("{} = {}", quoted("updated_at"), dialect.now_fn()));
928    if let Some(uid) = caller_user_id {
929        if entity.columns.iter().any(|c| c.name == "updated_by") {
930            let param_num = q.push_param(Value::String(uid.to_string()));
931            sets.push(format!(
932                "{} = {}",
933                quoted("updated_by"),
934                dialect.placeholder(param_num as usize)
935            ));
936        }
937    }
938    if sets.is_empty() {
939        let cols = select_column_list(entity);
940        let ph = pk_placeholder(entity, 1, dialect);
941        q.sql = format!(
942            "SELECT {} FROM {} WHERE {} = {}",
943            cols,
944            table,
945            quoted(pk),
946            ph
947        );
948        q.params.push(id.clone());
949        return q;
950    }
951    let set_clause = sets.join(", ");
952    let id_param = q.params.len() + 1;
953    q.params.push(id.clone());
954    let ph = pk_placeholder(entity, id_param, dialect);
955    let col_list = select_column_list(entity);
956    let ret = dialect.returning_clause(&col_list);
957    let suffix = if ret.is_empty() {
958        String::new()
959    } else {
960        format!(" {}", ret)
961    };
962    q.sql = format!(
963        "UPDATE {} SET {} WHERE {} = {}{}",
964        table,
965        set_clause,
966        quoted(pk),
967        ph,
968        suffix
969    );
970    q
971}
972
973/// DELETE by id.
974pub fn delete(
975    entity: &ResolvedEntity,
976    schema_override: Option<&str>,
977    dialect: &dyn Dialect,
978) -> QueryBuf {
979    let mut q = QueryBuf::new();
980    let schema = resolve_schema(entity, schema_override);
981    let table = qualified_table(schema, &entity.table_name);
982    let pk = &entity.pk_columns[0];
983    let ph = pk_placeholder(entity, 1, dialect);
984    q.params.push(Value::Null);
985    let col_list = select_column_list(entity);
986    let ret = dialect.returning_clause(&col_list);
987    let suffix = if ret.is_empty() {
988        String::new()
989    } else {
990        format!(" {}", ret)
991    };
992    q.sql = format!(
993        "DELETE FROM {} WHERE {} = {}{}",
994        table,
995        quoted(pk),
996        ph,
997        suffix
998    );
999    q
1000}
1001
1002/// UPDATE by id: clear archive_field (set to NULL) where it is currently NOT NULL.
1003/// Returns the updated row or None (record not found or not archived).
1004pub fn unarchive(
1005    entity: &ResolvedEntity,
1006    archive_field: &str,
1007    schema_override: Option<&str>,
1008    dialect: &dyn Dialect,
1009) -> QueryBuf {
1010    let mut q = QueryBuf::new();
1011    let schema = resolve_schema(entity, schema_override);
1012    let table = qualified_table(schema, &entity.table_name);
1013    let pk = &entity.pk_columns[0];
1014    let ph = pk_placeholder(entity, 1, dialect);
1015    q.params.push(Value::Null); // placeholder; caller passes real id via execute_returning_one_with_params_exec
1016    let col_list = select_column_list(entity);
1017    let ret = dialect.returning_clause(&col_list);
1018    let suffix = if ret.is_empty() {
1019        String::new()
1020    } else {
1021        format!(" {}", ret)
1022    };
1023    q.sql = format!(
1024        "UPDATE {} SET {} = NULL WHERE {} = {} AND {} IS NOT NULL{}",
1025        table,
1026        quoted(archive_field),
1027        quoted(pk),
1028        ph,
1029        quoted(archive_field),
1030        suffix
1031    );
1032    q
1033}
1034
1035// ─── Row Versioning Builders ──────────────────────────────────────────────────
1036
1037/// INSERT INTO {table}_history: copy the current row from the main table before an update/delete.
1038/// Uses a single INSERT ... SELECT so the snapshot is atomic and never goes through the app layer.
1039/// Binds: $1 = operation text ("update" | "delete"), $2 = pk value.
1040pub fn insert_history_snapshot(
1041    entity: &ResolvedEntity,
1042    operation: &str,
1043    schema_override: Option<&str>,
1044    dialect: &dyn Dialect,
1045) -> QueryBuf {
1046    let mut q = QueryBuf::new();
1047    let schema = resolve_schema(entity, schema_override);
1048    let main_table = qualified_table(schema, &entity.table_name);
1049    let history_table = qualified_table(schema, &format!("{}_history", entity.table_name));
1050    let pk = &entity.pk_columns[0];
1051
1052    // $1 = operation, $2 = pk id
1053    let op_ph = dialect.placeholder(1);
1054    let pk_ph = pk_placeholder(entity, 2, dialect);
1055
1056    let col_names: Vec<String> = entity.columns.iter().map(|c| quoted(&c.name)).collect();
1057    let col_list = col_names.join(", ");
1058
1059    q.sql = format!(
1060        "INSERT INTO {history} (\
1061            \"_version\", \"_operation\", \"_recorded_at\", \"_valid_from\", \"_valid_to\", {cols}\
1062        ) \
1063        SELECT \
1064            COALESCE(\"_version\", 1), {op_ph}, {now}, \"updated_at\", {now}, {cols} \
1065        FROM {main} \
1066        WHERE {pk_q} = {pk_ph}",
1067        history = history_table,
1068        cols = col_list,
1069        op_ph = op_ph,
1070        now = dialect.now_fn(),
1071        main = main_table,
1072        pk_q = quoted(pk),
1073        pk_ph = pk_ph,
1074    );
1075    q.params.push(Value::String(operation.to_string()));
1076    q.params.push(Value::Null); // placeholder; caller replaces with real id
1077    q
1078}
1079
1080/// SELECT all history rows for a given pk, ordered newest first.
1081/// Binds: $1 = pk value.
1082pub fn select_history_list(
1083    entity: &ResolvedEntity,
1084    schema_override: Option<&str>,
1085    dialect: &dyn Dialect,
1086) -> QueryBuf {
1087    let mut q = QueryBuf::new();
1088    let schema = resolve_schema(entity, schema_override);
1089    let history_table = qualified_table(schema, &format!("{}_history", entity.table_name));
1090    let pk = &entity.pk_columns[0];
1091    let pk_ph = pk_placeholder(entity, 1, dialect);
1092    q.sql = format!(
1093        "SELECT * FROM {} WHERE {} = {} ORDER BY {} DESC",
1094        history_table,
1095        quoted(pk),
1096        pk_ph,
1097        quoted("_version")
1098    );
1099    q.params.push(Value::Null); // placeholder; caller passes real id
1100    q
1101}
1102
1103/// SELECT a specific version from history for a given pk.
1104/// Binds: $1 = pk value, $2 = version (bigint).
1105pub fn select_history_by_version(
1106    entity: &ResolvedEntity,
1107    schema_override: Option<&str>,
1108    dialect: &dyn Dialect,
1109) -> QueryBuf {
1110    let mut q = QueryBuf::new();
1111    let schema = resolve_schema(entity, schema_override);
1112    let history_table = qualified_table(schema, &format!("{}_history", entity.table_name));
1113    let pk = &entity.pk_columns[0];
1114    let pk_ph = pk_placeholder(entity, 1, dialect);
1115    let v_ph = dialect.placeholder(2);
1116    q.sql = format!(
1117        "SELECT * FROM {} WHERE {} = {} AND {} = {}",
1118        history_table,
1119        quoted(pk),
1120        pk_ph,
1121        quoted("_version"),
1122        v_ph
1123    );
1124    q.params.push(Value::Null); // placeholder for pk
1125    q.params.push(Value::Null); // placeholder for version
1126    q
1127}
1128
1129/// DELETE old history rows beyond keep_versions for a given pk.
1130/// Binds: $1 = pk value, $2 = keep_versions (bigint).
1131pub fn prune_history(
1132    entity: &ResolvedEntity,
1133    schema_override: Option<&str>,
1134    dialect: &dyn Dialect,
1135) -> QueryBuf {
1136    let mut q = QueryBuf::new();
1137    let schema = resolve_schema(entity, schema_override);
1138    let history_table = qualified_table(schema, &format!("{}_history", entity.table_name));
1139    let pk = &entity.pk_columns[0];
1140    let pk_ph = pk_placeholder(entity, 1, dialect);
1141    let keep_ph = dialect.placeholder(2);
1142    q.sql = format!(
1143        "DELETE FROM {tbl} WHERE {pk_q} = {pk_ph} \
1144         AND \"_history_id\" NOT IN (\
1145             SELECT \"_history_id\" FROM {tbl} WHERE {pk_q} = {pk_ph} \
1146             ORDER BY \"_version\" DESC LIMIT {keep_ph}\
1147         )",
1148        tbl = history_table,
1149        pk_q = quoted(pk),
1150        pk_ph = pk_ph,
1151        keep_ph = keep_ph,
1152    );
1153    q.params.push(Value::Null); // pk placeholder
1154    q.params.push(Value::Null); // keep_versions placeholder
1155    q
1156}
1157
1158// ─── History builder unit tests ───────────────────────────────────────────────
1159
1160#[cfg(test)]
1161mod versioning_tests {
1162    use super::*;
1163    use crate::config::resolved::{ColumnInfo, PkType, ResolvedEntity};
1164    use std::collections::{HashMap, HashSet};
1165
1166    struct PgDialect;
1167    impl crate::db::Dialect for PgDialect {
1168        fn name(&self) -> &'static str {
1169            "postgres"
1170        }
1171        fn placeholder(&self, n: usize) -> String {
1172            format!("${}", n)
1173        }
1174        fn quote_ident(&self, s: &str) -> String {
1175            format!("\"{}\"", s)
1176        }
1177        fn ddl_type(&self, _: &crate::db::CanonicalType) -> String {
1178            "TEXT".into()
1179        }
1180        fn cast_name(&self, _: &crate::db::CanonicalType) -> Option<String> {
1181            None
1182        }
1183        fn type_category(&self, _: &crate::db::CanonicalType) -> crate::db::TypeCategory {
1184            crate::db::TypeCategory::Text
1185        }
1186        fn type_support(&self, _: &crate::db::CanonicalType) -> crate::db::TypeSupport {
1187            crate::db::TypeSupport::Native("text")
1188        }
1189        fn cast_expr(&self, expr: &str, _: &str) -> String {
1190            expr.to_string()
1191        }
1192        fn now_fn(&self) -> &'static str {
1193            "NOW()"
1194        }
1195        fn sys_timestamp_type(&self) -> &'static str {
1196            "TIMESTAMPTZ"
1197        }
1198        fn audit_timestamp_type(&self) -> &'static str {
1199            "TIMESTAMPTZ"
1200        }
1201        fn sys_bigserial_type(&self) -> &'static str {
1202            "BIGSERIAL"
1203        }
1204        fn sys_bytes_type(&self) -> &'static str {
1205            "BYTEA"
1206        }
1207        fn sys_json_type(&self) -> &'static str {
1208            "JSONB"
1209        }
1210        fn uuid_default_expr(&self) -> &'static str {
1211            "gen_random_uuid()"
1212        }
1213        fn returning_clause(&self, cols: &str) -> String {
1214            format!("RETURNING {}", cols)
1215        }
1216        fn upsert_conflict(&self, _: &[&str], _: &str) -> String {
1217            String::new()
1218        }
1219        fn to_one_subquery(&self, _col_exprs: &[String], from_clause: &str) -> String {
1220            format!("(SELECT row_to_json(t) FROM ({}) t)", from_clause)
1221        }
1222        fn to_many_subquery(&self, _col_exprs: &[String], from_clause: &str) -> String {
1223            format!("(SELECT json_agg(t) FROM ({}) t)", from_clause)
1224        }
1225        fn supports_schemas(&self) -> bool {
1226            true
1227        }
1228        fn supports_rls(&self) -> bool {
1229            true
1230        }
1231        fn supports_named_enum_types(&self) -> bool {
1232            true
1233        }
1234        fn supports_index_include(&self) -> bool {
1235            true
1236        }
1237        fn set_tenant_session_sql(&self, _: &str) -> Option<String> {
1238            None
1239        }
1240        fn json_extract_text(&self, col: &str, key: &str) -> String {
1241            format!("({} ->> '{}')", col, key.replace('\'', "''"))
1242        }
1243        fn json_extract_typed(
1244            &self,
1245            col: &str,
1246            key: &str,
1247            _t: &crate::db::CanonicalType,
1248        ) -> String {
1249            self.json_extract_text(col, key)
1250        }
1251        fn case_insensitive_like(&self, col: &str, placeholder: &str) -> String {
1252            format!("{} ILIKE {}", col, placeholder)
1253        }
1254    }
1255
1256    fn make_entity() -> ResolvedEntity {
1257        ResolvedEntity {
1258            table_id: "t1".into(),
1259            schema_name: "myschema".into(),
1260            table_name: "users".into(),
1261            path_segment: "users".into(),
1262            pk_columns: vec!["id".into()],
1263            pk_type: PkType::Uuid,
1264            columns: vec![
1265                ColumnInfo {
1266                    name: "id".into(),
1267                    pk_type: Some(PkType::Uuid),
1268                    nullable: false,
1269                    has_default: true,
1270                    pg_type: Some("uuid".into()),
1271                    is_asset: false,
1272                    asset_is_array: false,
1273                    asset_config: None,
1274                },
1275                ColumnInfo {
1276                    name: "name".into(),
1277                    pk_type: None,
1278                    nullable: true,
1279                    has_default: false,
1280                    pg_type: None,
1281                    is_asset: false,
1282                    asset_is_array: false,
1283                    asset_config: None,
1284                },
1285                ColumnInfo {
1286                    name: "updated_at".into(),
1287                    pk_type: None,
1288                    nullable: false,
1289                    has_default: true,
1290                    pg_type: Some("timestamptz".into()),
1291                    is_asset: false,
1292                    asset_is_array: false,
1293                    asset_config: None,
1294                },
1295            ],
1296            operations: vec![],
1297            sensitive_columns: HashSet::new(),
1298            includes: vec![],
1299            validation: HashMap::new(),
1300            events: vec![],
1301            archive_field: None,
1302            package_id: String::new(),
1303            audit_log: false,
1304            parent_ref_column: None,
1305            versioning: None,
1306            mcp: None,
1307            extensible_columns: vec![],
1308        }
1309    }
1310
1311    #[test]
1312    fn insert_history_snapshot_inserts_into_history_table() {
1313        let entity = make_entity();
1314        let d = PgDialect;
1315        let q = insert_history_snapshot(&entity, "update", None, &d);
1316        assert!(q.sql.contains("INSERT INTO"));
1317        assert!(q.sql.contains("_history"));
1318        assert!(q.sql.contains("_version"));
1319        assert!(q.sql.contains("_operation"));
1320        assert!(q.sql.contains("\"name\""));
1321        assert_eq!(q.params[0], Value::String("update".into()));
1322    }
1323
1324    #[test]
1325    fn insert_history_snapshot_uses_select_not_application_values() {
1326        let entity = make_entity();
1327        let d = PgDialect;
1328        let q = insert_history_snapshot(&entity, "delete", None, &d);
1329        assert!(q.sql.contains("SELECT"));
1330        assert!(q.sql.contains("FROM"));
1331    }
1332
1333    #[test]
1334    fn select_history_list_orders_by_version_desc() {
1335        let entity = make_entity();
1336        let d = PgDialect;
1337        let q = select_history_list(&entity, None, &d);
1338        assert!(q.sql.contains("ORDER BY"));
1339        assert!(q.sql.contains("_version"));
1340        assert!(q.sql.contains("DESC"));
1341        assert_eq!(q.params.len(), 1);
1342    }
1343
1344    #[test]
1345    fn select_history_by_version_has_two_params() {
1346        let entity = make_entity();
1347        let d = PgDialect;
1348        let q = select_history_by_version(&entity, None, &d);
1349        assert!(q.sql.contains("$1"));
1350        assert!(q.sql.contains("$2"));
1351        assert_eq!(q.params.len(), 2);
1352    }
1353
1354    #[test]
1355    fn prune_history_contains_limit() {
1356        let entity = make_entity();
1357        let d = PgDialect;
1358        let q = prune_history(&entity, None, &d);
1359        assert!(q.sql.to_uppercase().contains("LIMIT"));
1360        assert!(q.sql.contains("$2"));
1361    }
1362
1363    #[test]
1364    fn history_table_uses_entity_schema() {
1365        let entity = make_entity();
1366        let d = PgDialect;
1367        let q = select_history_list(&entity, None, &d);
1368        assert!(q.sql.contains("\"myschema\""));
1369        assert!(q.sql.contains("\"users_history\""));
1370    }
1371
1372    #[test]
1373    fn schema_override_is_respected() {
1374        let entity = make_entity();
1375        let d = PgDialect;
1376        let q = select_history_list(&entity, Some("tenant1"), &d);
1377        assert!(q.sql.contains("\"tenant1\""));
1378        assert!(!q.sql.contains("\"myschema\""));
1379    }
1380
1381    #[test]
1382    fn coerce_array_splits_comma_separated_string() {
1383        // multipart sends a single field as one comma-separated string.
1384        let v =
1385            coerce_json_value_for_pg_array(Value::String("id1, id2".to_string()), Some("uuid[]"));
1386        assert_eq!(v, Value::String("{\"id1\",\"id2\"}".to_string()));
1387    }
1388
1389    #[test]
1390    fn coerce_array_single_string_is_one_element() {
1391        let v = coerce_json_value_for_pg_array(Value::String("id1".to_string()), Some("uuid[]"));
1392        assert_eq!(v, Value::String("{\"id1\"}".to_string()));
1393    }
1394
1395    #[test]
1396    fn coerce_array_drops_empty_segments() {
1397        let v = coerce_json_value_for_pg_array(
1398            Value::String("id1, , id2,".to_string()),
1399            Some("text[]"),
1400        );
1401        assert_eq!(v, Value::String("{\"id1\",\"id2\"}".to_string()));
1402    }
1403
1404    #[test]
1405    fn coerce_array_json_array_is_not_split() {
1406        // JSON clients send a real array; a comma inside an element is preserved.
1407        let v = coerce_json_value_for_pg_array(
1408            Value::Array(vec![Value::String("a,b".to_string())]),
1409            Some("text[]"),
1410        );
1411        assert_eq!(v, Value::String("{\"a,b\"}".to_string()));
1412    }
1413
1414    #[test]
1415    fn coerce_array_noop_for_non_array_column() {
1416        let v = coerce_json_value_for_pg_array(Value::String("id1, id2".to_string()), Some("uuid"));
1417        assert_eq!(v, Value::String("id1, id2".to_string()));
1418    }
1419
1420    #[cfg(feature = "postgres")]
1421    fn entity_with_pk(pk_type: PkType) -> ResolvedEntity {
1422        let mut e = make_entity();
1423        e.pk_type = pk_type;
1424        e
1425    }
1426
1427    #[cfg(feature = "postgres")]
1428    #[test]
1429    fn select_by_id_casts_uuid_pk() {
1430        let d = crate::db::PostgresDialect;
1431        let q = select_by_id(&entity_with_pk(PkType::Uuid), None, &d);
1432        assert!(q.sql.contains("\"id\" = $1::uuid"), "got: {}", q.sql);
1433    }
1434
1435    #[cfg(feature = "postgres")]
1436    #[test]
1437    fn select_by_id_casts_bigint_pk() {
1438        // Auto-number (BIGSERIAL) PKs resolve to PkType::BigInt; bound values arrive as TEXT,
1439        // so the placeholder must be cast or Postgres errors with `bigint = text`.
1440        let d = crate::db::PostgresDialect;
1441        let q = select_by_id(&entity_with_pk(PkType::BigInt), None, &d);
1442        assert!(q.sql.contains("\"id\" = $1::bigint"), "got: {}", q.sql);
1443    }
1444
1445    #[cfg(feature = "postgres")]
1446    #[test]
1447    fn select_by_id_casts_int_pk() {
1448        let d = crate::db::PostgresDialect;
1449        let q = select_by_id(&entity_with_pk(PkType::Int), None, &d);
1450        assert!(q.sql.contains("\"id\" = $1::integer"), "got: {}", q.sql);
1451    }
1452
1453    #[cfg(feature = "postgres")]
1454    #[test]
1455    fn select_by_id_leaves_text_pk_uncast() {
1456        let d = crate::db::PostgresDialect;
1457        let q = select_by_id(&entity_with_pk(PkType::Text), None, &d);
1458        assert!(q.sql.contains("\"id\" = $1"), "got: {}", q.sql);
1459        assert!(
1460            !q.sql.contains("$1::"),
1461            "text PK should not be cast: {}",
1462            q.sql
1463        );
1464    }
1465
1466    fn entity_with_bag() -> ResolvedEntity {
1467        let mut e = make_entity();
1468        e.extensible_columns = vec!["attributes".into()];
1469        e
1470    }
1471
1472    fn ext_registry() -> ExtensibleRegistry {
1473        ExtensibleRegistry::from_value(serde_json::json!({
1474            "attributes": [
1475                {"key": "warrantyMonths", "type": "int"},
1476                {"key": "energyRating", "type": "text"},
1477                {"key": "notes", "type": "text", "filterable": false, "sortable": false}
1478            ]
1479        }))
1480        .unwrap()
1481    }
1482
1483    #[cfg(feature = "postgres")]
1484    #[test]
1485    fn rsql_filters_and_sorts_on_extensible_field() {
1486        use crate::sql::rsql::{parse_rsql, parse_sort};
1487        let d = crate::db::PostgresDialect;
1488        let e = entity_with_bag();
1489        let reg = ext_registry();
1490        let filter = parse_rsql("attributes.warrantyMonths=ge=12").unwrap();
1491        let sort = parse_sort("-attributes.warrantyMonths");
1492        let q = select_list(
1493            &e,
1494            Some(&filter),
1495            &sort,
1496            Some(10),
1497            Some(0),
1498            &[],
1499            None,
1500            &d,
1501            Some(&reg),
1502        )
1503        .unwrap();
1504        assert!(
1505            q.sql
1506                .contains("(\"attributes\" ->> 'warrantyMonths')::integer >= $1::integer"),
1507            "got: {}",
1508            q.sql
1509        );
1510        assert!(
1511            q.sql
1512                .contains("ORDER BY (\"attributes\" ->> 'warrantyMonths')::integer DESC"),
1513            "got: {}",
1514            q.sql
1515        );
1516        assert_eq!(q.params.len(), 1);
1517    }
1518
1519    #[test]
1520    fn rsql_text_extensible_field_uses_case_insensitive_like() {
1521        let d = PgDialect;
1522        let e = entity_with_bag();
1523        let reg = ext_registry();
1524        let filter = crate::sql::rsql::parse_rsql("attributes.energyRating=contains=plus").unwrap();
1525        let q = select_list(
1526            &e,
1527            Some(&filter),
1528            &[],
1529            None,
1530            None,
1531            &[],
1532            None,
1533            &d,
1534            Some(&reg),
1535        )
1536        .unwrap();
1537        assert!(
1538            q.sql
1539                .contains("(\"attributes\" ->> 'energyRating') ILIKE $1"),
1540            "got: {}",
1541            q.sql
1542        );
1543    }
1544
1545    #[test]
1546    fn rsql_unknown_extensible_field_is_rejected() {
1547        let d = PgDialect;
1548        let e = entity_with_bag();
1549        let reg = ext_registry();
1550        let filter = crate::sql::rsql::parse_rsql("attributes.bogus==1").unwrap();
1551        let r = select_list(
1552            &e,
1553            Some(&filter),
1554            &[],
1555            None,
1556            None,
1557            &[],
1558            None,
1559            &d,
1560            Some(&reg),
1561        );
1562        assert!(r.is_err());
1563    }
1564
1565    #[test]
1566    fn rsql_non_filterable_extensible_field_is_rejected() {
1567        let d = PgDialect;
1568        let e = entity_with_bag();
1569        let reg = ext_registry();
1570        let filter = crate::sql::rsql::parse_rsql("attributes.notes==hi").unwrap();
1571        let r = select_list(
1572            &e,
1573            Some(&filter),
1574            &[],
1575            None,
1576            None,
1577            &[],
1578            None,
1579            &d,
1580            Some(&reg),
1581        );
1582        assert!(r.is_err());
1583    }
1584
1585    #[test]
1586    fn sort_on_non_sortable_extensible_field_is_rejected() {
1587        let d = PgDialect;
1588        let e = entity_with_bag();
1589        let reg = ext_registry();
1590        let sort = crate::sql::rsql::parse_sort("attributes.notes");
1591        let r = select_list(&e, None, &sort, None, None, &[], None, &d, Some(&reg));
1592        assert!(r.is_err());
1593    }
1594}
1595
1596/// UPDATE by id: stamp archive_field with NOW() where it is currently NULL.
1597/// Returns the updated row or None (record not found or already archived).
1598pub fn archive(
1599    entity: &ResolvedEntity,
1600    archive_field: &str,
1601    schema_override: Option<&str>,
1602    dialect: &dyn Dialect,
1603) -> QueryBuf {
1604    let mut q = QueryBuf::new();
1605    let schema = resolve_schema(entity, schema_override);
1606    let table = qualified_table(schema, &entity.table_name);
1607    let pk = &entity.pk_columns[0];
1608    let ph = pk_placeholder(entity, 1, dialect);
1609    q.params.push(Value::Null); // placeholder; caller passes real id via execute_returning_one_with_params_exec
1610    let col_list = select_column_list(entity);
1611    let ret = dialect.returning_clause(&col_list);
1612    let suffix = if ret.is_empty() {
1613        String::new()
1614    } else {
1615        format!(" {}", ret)
1616    };
1617    q.sql = format!(
1618        "UPDATE {} SET {} = {} WHERE {} = {} AND {} IS NULL{}",
1619        table,
1620        quoted(archive_field),
1621        dialect.now_fn(),
1622        quoted(pk),
1623        ph,
1624        quoted(archive_field),
1625        suffix
1626    );
1627    q
1628}