Skip to main content

architect_sdk/sql/
builder.rs

1//! Builds parameterized INSERT, SELECT, UPDATE, DELETE from resolved entity.
2
3use crate::config::{IncludeDirection, PkType, ResolvedEntity};
4use crate::db::{type_category_from_cast, Dialect, TypeCategory};
5use crate::error::AppError;
6use crate::sql::rsql::{FilterNode, RsqlOp, SortSpec};
7use serde_json::Value;
8use std::collections::HashMap;
9
10/// Describes one include for single-query list: name, direction, related entity, our key column, their key column.
11pub struct IncludeSelect<'a> {
12    pub name: &'a str,
13    pub direction: IncludeDirection,
14    pub related: &'a ResolvedEntity,
15    pub our_key: &'a str,
16    pub their_key: &'a str,
17}
18
19/// Quote identifier for PostgreSQL (safe: only from config).
20fn quoted(s: &str) -> String {
21    format!("\"{}\"", s.replace('"', "\"\""))
22}
23
24/// Full qualified table name.
25fn qualified_table(schema: &str, table: &str) -> String {
26    format!("{}.{}", quoted(schema), quoted(table))
27}
28
29pub struct QueryBuf {
30    pub sql: String,
31    pub params: Vec<Value>,
32}
33
34impl QueryBuf {
35    fn new() -> Self {
36        QueryBuf {
37            sql: String::new(),
38            params: Vec::new(),
39        }
40    }
41
42    fn push_param(&mut self, v: Value) -> u32 {
43        let n = self.params.len() as u32 + 1;
44        self.params.push(v);
45        n
46    }
47}
48
49/// SELECT list: each column as-is, except custom enum (schema.typename), numeric, time, and timetz
50/// as col::text so sqlx returns String.
51fn select_column_list(entity: &ResolvedEntity) -> String {
52    entity
53        .columns
54        .iter()
55        .map(|c| {
56            let q = quoted(&c.name);
57            let pg_type = c.pg_type.as_deref().unwrap_or("");
58            if pg_type.contains('.')
59                || pg_type == "numeric"
60                || pg_type == "time"
61                || pg_type == "timetz"
62            {
63                format!("{}::text", q)
64            } else {
65                q
66            }
67        })
68        .collect::<Vec<_>>()
69        .join(", ")
70}
71
72/// Resolve schema: override if present, else entity's schema.
73fn resolve_schema<'a>(entity: &'a ResolvedEntity, schema_override: Option<&'a str>) -> &'a str {
74    schema_override.unwrap_or(&entity.schema_name)
75}
76
77/// Postgres array columns: API accepts JSON `["a","b"]`; bind as array literal + `$n::varchar(255)[]` etc.
78pub fn coerce_json_value_for_pg_array(val: Value, pg_type: Option<&str>) -> Value {
79    if !pg_type.is_some_and(|t| t.ends_with("[]")) {
80        return val;
81    }
82    match val {
83        Value::Null => Value::Null,
84        Value::Array(items) => {
85            let mut out = String::from('{');
86            for (i, v) in items.iter().enumerate() {
87                if i > 0 {
88                    out.push(',');
89                }
90                match v {
91                    Value::Null => out.push_str("NULL"),
92                    other => {
93                        let elem = match other {
94                            Value::String(s) => s.clone(),
95                            Value::Number(n) => n.to_string(),
96                            Value::Bool(b) => b.to_string(),
97                            _ => serde_json::to_string(other).unwrap_or_else(|_| "{}".to_string()),
98                        };
99                        out.push('"');
100                        for ch in elem.chars() {
101                            if ch == '"' || ch == '\\' {
102                                out.push('\\');
103                            }
104                            out.push(ch);
105                        }
106                        out.push('"');
107                    }
108                }
109            }
110            out.push('}');
111            Value::String(out)
112        }
113        // For convenience, treat scalar JSON values as single-element arrays so that
114        // clients can send `"id"` instead of `["id"]` for uuid[] and other array columns.
115        other => coerce_json_value_for_pg_array(Value::Array(vec![other]), pg_type),
116    }
117}
118
119/// Placeholder for PK in WHERE (e.g. $1 or $1::uuid) so bound text is cast when column is UUID.
120fn pk_placeholder(entity: &ResolvedEntity, param_num: usize, dialect: &dyn Dialect) -> String {
121    let ph = dialect.placeholder(param_num);
122    match &entity.pk_type {
123        PkType::Uuid => {
124            if let Some(cast) = dialect.cast_name(&crate::db::CanonicalType::Uuid) {
125                dialect.cast_expr(&ph, &cast)
126            } else {
127                ph
128            }
129        }
130        _ => ph,
131    }
132}
133
134// ─── RSQL → SQL ───────────────────────────────────────────────────────────────
135
136fn op_valid_for_category(op: &RsqlOp, category: TypeCategory) -> bool {
137    match category {
138        TypeCategory::Text => matches!(
139            op,
140            RsqlOp::Eq
141                | RsqlOp::Neq
142                | RsqlOp::In
143                | RsqlOp::Out
144                | RsqlOp::Like
145                | RsqlOp::Ilike
146                | RsqlOp::Contains
147                | RsqlOp::Starts
148                | RsqlOp::Ends
149                | RsqlOp::Null(_)
150        ),
151        TypeCategory::Int | TypeCategory::Float => matches!(
152            op,
153            RsqlOp::Eq
154                | RsqlOp::Neq
155                | RsqlOp::Gt
156                | RsqlOp::Ge
157                | RsqlOp::Lt
158                | RsqlOp::Le
159                | RsqlOp::Between
160                | RsqlOp::In
161                | RsqlOp::Out
162                | RsqlOp::Null(_)
163        ),
164        TypeCategory::Bool => matches!(op, RsqlOp::Eq | RsqlOp::Neq | RsqlOp::Null(_)),
165        TypeCategory::Uuid => matches!(
166            op,
167            RsqlOp::Eq | RsqlOp::Neq | RsqlOp::In | RsqlOp::Out | RsqlOp::Null(_)
168        ),
169        TypeCategory::Date | TypeCategory::Timestamp | TypeCategory::Time => matches!(
170            op,
171            RsqlOp::Eq
172                | RsqlOp::Neq
173                | RsqlOp::Gt
174                | RsqlOp::Ge
175                | RsqlOp::Lt
176                | RsqlOp::Le
177                | RsqlOp::Between
178                | RsqlOp::In
179                | RsqlOp::Out
180                | RsqlOp::Null(_)
181        ),
182        // JSON, bytes, arrays, custom types: allow all operators.
183        TypeCategory::Json | TypeCategory::Bytes | TypeCategory::Other => true,
184    }
185}
186
187fn make_placeholder(n: usize, cast: Option<&str>, dialect: &dyn Dialect) -> String {
188    let ph = dialect.placeholder(n);
189    match cast {
190        Some(t) => dialect.cast_expr(&ph, t),
191        None => ph,
192    }
193}
194
195/// Build the SQL fragment for a single RSQL leaf condition.
196/// `qcol` is an already-quoted (and optionally qualified) column expression.
197/// `pg_type` drives operator validation and placeholder casting.
198/// `field_label` is used only in error messages (e.g. "bay" or "transport_unit.bay").
199fn build_leaf_sql(
200    qcol: &str,
201    pg_type: Option<&str>,
202    op: &RsqlOp,
203    values: &[String],
204    q: &mut QueryBuf,
205    field_label: &str,
206    dialect: &dyn Dialect,
207) -> Result<String, AppError> {
208    let category = type_category_from_cast(pg_type.unwrap_or("text"));
209    if !op_valid_for_category(op, category) {
210        return Err(AppError::Validation(format!(
211            "operator {} is not valid for {:?} field '{}' (type: {})",
212            op.display(),
213            category,
214            field_label,
215            pg_type.unwrap_or("text")
216        )));
217    }
218    let cast = if matches!(
219        op,
220        RsqlOp::Like | RsqlOp::Ilike | RsqlOp::Contains | RsqlOp::Starts | RsqlOp::Ends
221    ) {
222        None
223    } else {
224        pg_type
225    };
226    match op {
227        RsqlOp::Null(is_null) => Ok(if *is_null {
228            format!("{} IS NULL", qcol)
229        } else {
230            format!("{} IS NOT NULL", qcol)
231        }),
232        RsqlOp::Eq | RsqlOp::Neq | RsqlOp::Gt | RsqlOp::Ge | RsqlOp::Lt | RsqlOp::Le => {
233            let v = values.first().cloned().unwrap_or_default();
234            let n = q.push_param(Value::String(v));
235            let ph = make_placeholder(n as usize, cast, dialect);
236            let cmp = match op {
237                RsqlOp::Eq => "=",
238                RsqlOp::Neq => "!=",
239                RsqlOp::Gt => ">",
240                RsqlOp::Ge => ">=",
241                RsqlOp::Lt => "<",
242                RsqlOp::Le => "<=",
243                _ => unreachable!(),
244            };
245            Ok(format!("{} {} {}", qcol, cmp, ph))
246        }
247        RsqlOp::Like => {
248            let v = values.first().cloned().unwrap_or_default();
249            let n = q.push_param(Value::String(v));
250            Ok(format!("{} LIKE {}", qcol, dialect.placeholder(n as usize)))
251        }
252        RsqlOp::Ilike => {
253            let v = values.first().cloned().unwrap_or_default();
254            let n = q.push_param(Value::String(v));
255            Ok(format!(
256                "{} ILIKE {}",
257                qcol,
258                dialect.placeholder(n as usize)
259            ))
260        }
261        RsqlOp::Contains => {
262            let v = values.first().cloned().unwrap_or_default();
263            let n = q.push_param(Value::String(format!("%{}%", v)));
264            Ok(format!(
265                "{} ILIKE {}",
266                qcol,
267                dialect.placeholder(n as usize)
268            ))
269        }
270        RsqlOp::Starts => {
271            let v = values.first().cloned().unwrap_or_default();
272            let n = q.push_param(Value::String(format!("{}%", v)));
273            Ok(format!(
274                "{} ILIKE {}",
275                qcol,
276                dialect.placeholder(n as usize)
277            ))
278        }
279        RsqlOp::Ends => {
280            let v = values.first().cloned().unwrap_or_default();
281            let n = q.push_param(Value::String(format!("%{}", v)));
282            Ok(format!(
283                "{} ILIKE {}",
284                qcol,
285                dialect.placeholder(n as usize)
286            ))
287        }
288        RsqlOp::In => {
289            if values.is_empty() {
290                return Err(AppError::Validation(format!(
291                    "=in= requires at least one value for field '{}'",
292                    field_label
293                )));
294            }
295            let phs: Vec<String> = values
296                .iter()
297                .map(|v| {
298                    let n = q.push_param(Value::String(v.clone()));
299                    make_placeholder(n as usize, cast, dialect)
300                })
301                .collect();
302            Ok(format!("{} IN ({})", qcol, phs.join(", ")))
303        }
304        RsqlOp::Out => {
305            if values.is_empty() {
306                return Err(AppError::Validation(format!(
307                    "=out= requires at least one value for field '{}'",
308                    field_label
309                )));
310            }
311            let phs: Vec<String> = values
312                .iter()
313                .map(|v| {
314                    let n = q.push_param(Value::String(v.clone()));
315                    make_placeholder(n as usize, cast, dialect)
316                })
317                .collect();
318            Ok(format!("{} NOT IN ({})", qcol, phs.join(", ")))
319        }
320        RsqlOp::Between => {
321            if values.len() != 2 {
322                return Err(AppError::Validation(format!(
323                    "=between= requires exactly 2 values for field '{}', got {}",
324                    field_label,
325                    values.len()
326                )));
327            }
328            let n1 = q.push_param(Value::String(values[0].clone()));
329            let n2 = q.push_param(Value::String(values[1].clone()));
330            Ok(format!(
331                "{} BETWEEN {} AND {}",
332                qcol,
333                make_placeholder(n1 as usize, cast, dialect),
334                make_placeholder(n2 as usize, cast, dialect)
335            ))
336        }
337        #[allow(unreachable_patterns)]
338        RsqlOp::Null(_) => unreachable!(),
339    }
340}
341
342/// Convert a `FilterNode` tree into a SQL WHERE fragment (no leading `WHERE`).
343/// All values are pushed as parameters into `q`; identifiers come only from
344/// config (never from user input) so SQL injection is structurally impossible.
345///
346/// `col_qualifier` is an optional table alias prefix, e.g. `"main."` for aliased queries.
347///
348/// `filter_includes` supplies the related-entity metadata needed to generate
349/// EXISTS subqueries for dotted-field filters like `transport_unit.bay=contains=bay23`.
350pub fn rsql_to_sql(
351    node: &FilterNode,
352    entity: &ResolvedEntity,
353    q: &mut QueryBuf,
354    col_qualifier: Option<&str>,
355    filter_includes: &[IncludeSelect<'_>],
356    schema_override: Option<&str>,
357    dialect: &dyn Dialect,
358) -> Result<String, AppError> {
359    match node {
360        FilterNode::And(children) => {
361            let parts: Result<Vec<_>, _> = children
362                .iter()
363                .map(|c| {
364                    rsql_to_sql(
365                        c,
366                        entity,
367                        q,
368                        col_qualifier,
369                        filter_includes,
370                        schema_override,
371                        dialect,
372                    )
373                })
374                .collect();
375            Ok(format!("({})", parts?.join(" AND ")))
376        }
377        FilterNode::Or(children) => {
378            let parts: Result<Vec<_>, _> = children
379                .iter()
380                .map(|c| {
381                    rsql_to_sql(
382                        c,
383                        entity,
384                        q,
385                        col_qualifier,
386                        filter_includes,
387                        schema_override,
388                        dialect,
389                    )
390                })
391                .collect();
392            Ok(format!("({})", parts?.join(" OR ")))
393        }
394        FilterNode::Leaf { field, op, values } => {
395            // Dotted field (e.g. "transport_unit.bay"): generate EXISTS subquery
396            if let Some(dot_pos) = field.find('.') {
397                let include_name = &field[..dot_pos];
398                let sub_field = &field[dot_pos + 1..];
399
400                let inc = filter_includes
401                    .iter()
402                    .find(|i| i.name == include_name)
403                    .ok_or_else(|| AppError::Validation(format!(
404                        "filter on '{}': '{}' is not a known include — add it to the include= parameter or ensure the relationship is configured",
405                        field, include_name
406                    )))?;
407
408                let col_info = inc
409                    .related
410                    .columns
411                    .iter()
412                    .find(|c| c.name == sub_field)
413                    .ok_or_else(|| {
414                        AppError::Validation(format!(
415                            "unknown filter field '{}' on related entity '{}'",
416                            sub_field, include_name
417                        ))
418                    })?;
419
420                let rel_schema = schema_override.unwrap_or(inc.related.schema_name.as_str());
421                let rel_table = qualified_table(rel_schema, &inc.related.table_name);
422
423                // FK join condition: related.their_key = main.our_key
424                let join_cond = match col_qualifier {
425                    Some(pfx) => {
426                        format!("{} = {}{}", quoted(inc.their_key), pfx, quoted(inc.our_key))
427                    }
428                    None => format!("{} = {}", quoted(inc.their_key), quoted(inc.our_key)),
429                };
430
431                let field_cond = build_leaf_sql(
432                    &quoted(sub_field),
433                    col_info.pg_type.as_deref(),
434                    op,
435                    values,
436                    q,
437                    field,
438                    dialect,
439                )?;
440
441                return Ok(format!(
442                    "EXISTS (SELECT 1 FROM {} WHERE {} AND {})",
443                    rel_table, join_cond, field_cond
444                ));
445            }
446
447            // Plain field: look up in main entity
448            let col_info = entity
449                .columns
450                .iter()
451                .find(|c| c.name == *field)
452                .ok_or_else(|| AppError::Validation(format!("unknown filter field '{}'", field)))?;
453
454            let qcol = match col_qualifier {
455                Some(pfx) => format!("{}{}", pfx, quoted(field)),
456                None => quoted(field),
457            };
458
459            build_leaf_sql(
460                &qcol,
461                col_info.pg_type.as_deref(),
462                op,
463                values,
464                q,
465                field,
466                dialect,
467            )
468        }
469    }
470}
471
472/// Build ORDER BY clause from sort specs, falling back to pk ASC when empty.
473/// Unknown column names are silently skipped.
474fn build_order_by(
475    sort: &[SortSpec],
476    entity: &ResolvedEntity,
477    col_qualifier: Option<&str>,
478) -> String {
479    let pk = &entity.pk_columns[0];
480    let col_names: std::collections::HashSet<&str> =
481        entity.columns.iter().map(|c| c.name.as_str()).collect();
482
483    let parts: Vec<String> = sort
484        .iter()
485        .filter(|s| col_names.contains(s.field.as_str()))
486        .map(|s| {
487            let qcol = match col_qualifier {
488                Some(pfx) => format!("{}{}", pfx, quoted(&s.field)),
489                None => quoted(&s.field),
490            };
491            if s.desc {
492                format!("{} DESC", qcol)
493            } else {
494                format!("{} ASC", qcol)
495            }
496        })
497        .collect();
498
499    if parts.is_empty() {
500        match col_qualifier {
501            Some(pfx) => format!(" ORDER BY {}{}", pfx, quoted(pk)),
502            None => format!(" ORDER BY {}", quoted(pk)),
503        }
504    } else {
505        format!(" ORDER BY {}", parts.join(", "))
506    }
507}
508
509/// SELECT by primary key (single column PK only). Caller adds id as sole param.
510pub fn select_by_id(
511    entity: &ResolvedEntity,
512    schema_override: Option<&str>,
513    dialect: &dyn Dialect,
514) -> QueryBuf {
515    let mut q = QueryBuf::new();
516    let schema = resolve_schema(entity, schema_override);
517    let table = qualified_table(schema, &entity.table_name);
518    let pk = &entity.pk_columns[0];
519    let cols = select_column_list(entity);
520    let ph = pk_placeholder(entity, 1, dialect);
521    q.sql = format!(
522        "SELECT {} FROM {} WHERE {} = {}",
523        cols,
524        table,
525        quoted(pk),
526        ph
527    );
528    q
529}
530
531/// SELECT list with includes in a single query: main table aliased as "main", each include as a scalar subquery (json_agg for to_many, row_to_json for to_one).
532/// `includes` drives the scalar subqueries (response data); `filter_includes` is the superset used
533/// for EXISTS generation when the filter references dotted fields like `transport_unit.bay`.
534#[allow(clippy::too_many_arguments)]
535pub fn select_list_with_includes(
536    entity: &ResolvedEntity,
537    filter: Option<&FilterNode>,
538    sort: &[SortSpec],
539    limit: Option<u32>,
540    offset: Option<u32>,
541    includes: &[IncludeSelect<'_>],
542    filter_includes: &[IncludeSelect<'_>],
543    schema_override: Option<&str>,
544    dialect: &dyn Dialect,
545) -> Result<QueryBuf, AppError> {
546    let mut q = QueryBuf::new();
547    let schema = resolve_schema(entity, schema_override);
548    let table = qualified_table(schema, &entity.table_name);
549    const MAIN_ALIAS: &str = "main";
550    let main_qualifier = format!("{}.", MAIN_ALIAS);
551
552    let main_cols: Vec<String> = entity
553        .columns
554        .iter()
555        .map(|c| {
556            let q = quoted(&c.name);
557            let pg_type = c.pg_type.as_deref().unwrap_or("");
558            let expr = if pg_type.contains('.')
559                || pg_type == "numeric"
560                || pg_type == "time"
561                || pg_type == "timetz"
562            {
563                format!("{}.{}::text", MAIN_ALIAS, q)
564            } else {
565                format!("{}.{}", MAIN_ALIAS, q)
566            };
567            format!("{} AS {}", expr, q)
568        })
569        .collect();
570
571    let mut select_parts = main_cols;
572    for inc in includes {
573        let rel_schema = resolve_schema(inc.related, schema_override);
574        let rel_table = qualified_table(rel_schema, &inc.related.table_name);
575        let sub_from = format!(
576            "{} WHERE {} = {}.{}",
577            rel_table,
578            quoted(inc.their_key),
579            MAIN_ALIAS,
580            quoted(inc.our_key)
581        );
582        let rel_col_exprs: Vec<String> = inc
583            .related
584            .columns
585            .iter()
586            .map(|c| dialect.quote_ident(&c.name))
587            .collect();
588        let subquery = match inc.direction {
589            IncludeDirection::ToOne => dialect.to_one_subquery(&rel_col_exprs, &sub_from),
590            IncludeDirection::ToMany => dialect.to_many_subquery(&rel_col_exprs, &sub_from),
591        };
592        select_parts.push(format!("{} AS {}", subquery, quoted(inc.name)));
593    }
594
595    let where_clause = match filter {
596        Some(node) => {
597            let frag = rsql_to_sql(
598                node,
599                entity,
600                &mut q,
601                Some(&main_qualifier),
602                filter_includes,
603                schema_override,
604                dialect,
605            )?;
606            format!(" WHERE {}", frag)
607        }
608        None => String::new(),
609    };
610    let order_clause = build_order_by(sort, entity, Some(&main_qualifier));
611    let limit_clause = limit
612        .map(|n| format!(" LIMIT {}", n.min(1000)))
613        .unwrap_or_default();
614    let offset_clause = offset.map(|n| format!(" OFFSET {}", n)).unwrap_or_default();
615
616    q.sql = format!(
617        "SELECT {} FROM {} {}{}{}{}{}",
618        select_parts.join(", "),
619        table,
620        MAIN_ALIAS,
621        where_clause,
622        order_clause,
623        limit_clause,
624        offset_clause
625    );
626    Ok(q)
627}
628
629/// SELECT list with optional RSQL filter and sort specs.
630/// `filter_includes` is needed when the filter contains dotted-field conditions
631/// (e.g. `transport_unit.bay=contains=bay23`) that generate EXISTS subqueries.
632/// Pass an empty slice when there are no such filters.
633#[allow(clippy::too_many_arguments)]
634pub fn select_list(
635    entity: &ResolvedEntity,
636    filter: Option<&FilterNode>,
637    sort: &[SortSpec],
638    limit: Option<u32>,
639    offset: Option<u32>,
640    filter_includes: &[IncludeSelect<'_>],
641    schema_override: Option<&str>,
642    dialect: &dyn Dialect,
643) -> Result<QueryBuf, AppError> {
644    let mut q = QueryBuf::new();
645    let schema = resolve_schema(entity, schema_override);
646    let table = qualified_table(schema, &entity.table_name);
647
648    let where_clause = match filter {
649        Some(node) => {
650            let frag = rsql_to_sql(
651                node,
652                entity,
653                &mut q,
654                None,
655                filter_includes,
656                schema_override,
657                dialect,
658            )?;
659            format!(" WHERE {}", frag)
660        }
661        None => String::new(),
662    };
663    let order_clause = build_order_by(sort, entity, None);
664    let limit_clause = limit
665        .map(|n| format!(" LIMIT {}", n.min(1000)))
666        .unwrap_or_default();
667    let offset_clause = offset.map(|n| format!(" OFFSET {}", n)).unwrap_or_default();
668    let cols = select_column_list(entity);
669    q.sql = format!(
670        "SELECT {} FROM {}{}{}{}{}",
671        cols, table, where_clause, order_clause, limit_clause, offset_clause
672    );
673    Ok(q)
674}
675
676/// SELECT * FROM entity WHERE column IN ($1, $2, ...) ORDER BY pk. Used for batch-fetching related rows (to_many or to_one by key).
677pub fn select_by_column_in(
678    entity: &ResolvedEntity,
679    column_name: &str,
680    values: &[Value],
681    schema_override: Option<&str>,
682    dialect: &dyn Dialect,
683) -> QueryBuf {
684    let mut q = QueryBuf::new();
685    let schema = resolve_schema(entity, schema_override);
686    let table = qualified_table(schema, &entity.table_name);
687    let pk = &entity.pk_columns[0];
688    if values.is_empty() {
689        let cols = select_column_list(entity);
690        q.sql = format!("SELECT {} FROM {} WHERE 1 = 0", cols, table);
691        return q;
692    }
693    let placeholders: Vec<String> = values
694        .iter()
695        .map(|v| {
696            let n = q.push_param(v.clone());
697            entity
698                .columns
699                .iter()
700                .find(|c| c.name == column_name)
701                .and_then(|c| c.pg_type.as_deref())
702                .map(|t| dialect.cast_expr(&dialect.placeholder(n as usize), t))
703                .unwrap_or_else(|| dialect.placeholder(n as usize))
704        })
705        .collect();
706    let cols = select_column_list(entity);
707    q.sql = format!(
708        "SELECT {} FROM {} WHERE {} IN ({}) ORDER BY {}",
709        cols,
710        table,
711        quoted(column_name),
712        placeholders.join(", "),
713        quoted(pk)
714    );
715    q
716}
717
718/// INSERT: columns and placeholders from entity; values from body. Excludes PK if has_default.
719/// Omits columns with DB default when body does not provide a value (so DB uses default).
720/// Uses SQL cast (e.g. $n::timestamptz) for timestamp columns so string values bind correctly.
721/// When `rls_tenant_id` is Some, appends tenant_id column and value (for RLS strategy).
722pub fn insert(
723    entity: &ResolvedEntity,
724    body: &HashMap<String, Value>,
725    include_pk: bool,
726    schema_override: Option<&str>,
727    rls_tenant_id: Option<&str>,
728    caller_user_id: Option<&str>,
729    dialect: &dyn Dialect,
730) -> QueryBuf {
731    let mut q = QueryBuf::new();
732    let schema = resolve_schema(entity, schema_override);
733    let table = qualified_table(schema, &entity.table_name);
734    let mut cols = Vec::new();
735    let mut placeholders = Vec::new();
736    for c in &entity.columns {
737        let name = &c.name;
738        if c.pk_type.is_some() && !include_pk {
739            continue;
740        }
741        // archive_field may only be written via the dedicated archive endpoint, never via POST/create.
742        if entity.archive_field.as_deref().is_some_and(|af| name == af) {
743            continue;
744        }
745        // updated_by is only meaningful on updates, leave NULL on insert.
746        if name == "updated_by" {
747            continue;
748        }
749        let val = if name == "created_by" {
750            caller_user_id
751                .map(|uid| Value::String(uid.to_string()))
752                .or_else(|| body.get(name).cloned())
753        } else {
754            body.get(name).cloned()
755        };
756        if val.is_none() && c.has_default {
757            continue;
758        }
759        let val = val.unwrap_or(Value::Null);
760        let val = coerce_json_value_for_pg_array(val, c.pg_type.as_deref());
761        let param_num = q.push_param(val);
762        let ph = c
763            .pg_type
764            .as_deref()
765            .map(|t| dialect.cast_expr(&dialect.placeholder(param_num as usize), t))
766            .unwrap_or_else(|| dialect.placeholder(param_num as usize));
767        cols.push(quoted(name));
768        placeholders.push(ph);
769    }
770    if let Some(tid) = rls_tenant_id {
771        let param_num = q.push_param(Value::String(tid.to_string()));
772        cols.push(quoted("tenant_id"));
773        placeholders.push(dialect.placeholder(param_num as usize));
774    }
775    let col_list = select_column_list(entity);
776    let ret = dialect.returning_clause(&col_list);
777    let suffix = if ret.is_empty() {
778        String::new()
779    } else {
780        format!(" {}", ret)
781    };
782    q.sql = format!(
783        "INSERT INTO {} ({}) VALUES ({}){}",
784        table,
785        cols.join(", "),
786        placeholders.join(", "),
787        suffix
788    );
789    q
790}
791
792/// UPDATE by id: SET only columns present in body (and in entity columns).
793/// Uses SQL cast for timestamp columns so string values bind correctly.
794pub fn update(
795    entity: &ResolvedEntity,
796    id: &Value,
797    body: &HashMap<String, Value>,
798    schema_override: Option<&str>,
799    caller_user_id: Option<&str>,
800    dialect: &dyn Dialect,
801) -> QueryBuf {
802    let mut q = QueryBuf::new();
803    let schema = resolve_schema(entity, schema_override);
804    let table = qualified_table(schema, &entity.table_name);
805    let pk = &entity.pk_columns[0];
806    let col_by_name: std::collections::HashMap<_, _> = entity
807        .columns
808        .iter()
809        .map(|c| (c.name.as_str(), c))
810        .collect();
811    let mut sets = Vec::new();
812    for (k, v) in body {
813        if *k == *pk {
814            continue;
815        }
816        if k == "tenant_id" {
817            continue;
818        }
819        // archive_field may only be written via the dedicated archive endpoint, never via PATCH.
820        if entity.archive_field.as_deref().is_some_and(|af| k == af) {
821            continue;
822        }
823        let Some(c) = col_by_name.get(k.as_str()) else {
824            continue;
825        };
826        let v = coerce_json_value_for_pg_array(v.clone(), c.pg_type.as_deref());
827        let param_num = q.push_param(v);
828        let rhs = c
829            .pg_type
830            .as_deref()
831            .map(|t| dialect.cast_expr(&dialect.placeholder(param_num as usize), t))
832            .unwrap_or_else(|| dialect.placeholder(param_num as usize));
833        sets.push(format!("{} = {}", quoted(k), rhs));
834    }
835    sets.push(format!("{} = {}", quoted("updated_at"), dialect.now_fn()));
836    if let Some(uid) = caller_user_id {
837        if entity.columns.iter().any(|c| c.name == "updated_by") {
838            let param_num = q.push_param(Value::String(uid.to_string()));
839            sets.push(format!(
840                "{} = {}",
841                quoted("updated_by"),
842                dialect.placeholder(param_num as usize)
843            ));
844        }
845    }
846    if sets.is_empty() {
847        let cols = select_column_list(entity);
848        let ph = pk_placeholder(entity, 1, dialect);
849        q.sql = format!(
850            "SELECT {} FROM {} WHERE {} = {}",
851            cols,
852            table,
853            quoted(pk),
854            ph
855        );
856        q.params.push(id.clone());
857        return q;
858    }
859    let set_clause = sets.join(", ");
860    let id_param = q.params.len() + 1;
861    q.params.push(id.clone());
862    let ph = pk_placeholder(entity, id_param, dialect);
863    let col_list = select_column_list(entity);
864    let ret = dialect.returning_clause(&col_list);
865    let suffix = if ret.is_empty() {
866        String::new()
867    } else {
868        format!(" {}", ret)
869    };
870    q.sql = format!(
871        "UPDATE {} SET {} WHERE {} = {}{}",
872        table,
873        set_clause,
874        quoted(pk),
875        ph,
876        suffix
877    );
878    q
879}
880
881/// DELETE by id.
882pub fn delete(
883    entity: &ResolvedEntity,
884    schema_override: Option<&str>,
885    dialect: &dyn Dialect,
886) -> QueryBuf {
887    let mut q = QueryBuf::new();
888    let schema = resolve_schema(entity, schema_override);
889    let table = qualified_table(schema, &entity.table_name);
890    let pk = &entity.pk_columns[0];
891    let ph = pk_placeholder(entity, 1, dialect);
892    q.params.push(Value::Null);
893    let col_list = select_column_list(entity);
894    let ret = dialect.returning_clause(&col_list);
895    let suffix = if ret.is_empty() {
896        String::new()
897    } else {
898        format!(" {}", ret)
899    };
900    q.sql = format!(
901        "DELETE FROM {} WHERE {} = {}{}",
902        table,
903        quoted(pk),
904        ph,
905        suffix
906    );
907    q
908}
909
910/// UPDATE by id: clear archive_field (set to NULL) where it is currently NOT NULL.
911/// Returns the updated row or None (record not found or not archived).
912pub fn unarchive(
913    entity: &ResolvedEntity,
914    archive_field: &str,
915    schema_override: Option<&str>,
916    dialect: &dyn Dialect,
917) -> QueryBuf {
918    let mut q = QueryBuf::new();
919    let schema = resolve_schema(entity, schema_override);
920    let table = qualified_table(schema, &entity.table_name);
921    let pk = &entity.pk_columns[0];
922    let ph = pk_placeholder(entity, 1, dialect);
923    q.params.push(Value::Null); // placeholder; caller passes real id via execute_returning_one_with_params_exec
924    let col_list = select_column_list(entity);
925    let ret = dialect.returning_clause(&col_list);
926    let suffix = if ret.is_empty() {
927        String::new()
928    } else {
929        format!(" {}", ret)
930    };
931    q.sql = format!(
932        "UPDATE {} SET {} = NULL WHERE {} = {} AND {} IS NOT NULL{}",
933        table,
934        quoted(archive_field),
935        quoted(pk),
936        ph,
937        quoted(archive_field),
938        suffix
939    );
940    q
941}
942
943/// UPDATE by id: stamp archive_field with NOW() where it is currently NULL.
944/// Returns the updated row or None (record not found or already archived).
945pub fn archive(
946    entity: &ResolvedEntity,
947    archive_field: &str,
948    schema_override: Option<&str>,
949    dialect: &dyn Dialect,
950) -> QueryBuf {
951    let mut q = QueryBuf::new();
952    let schema = resolve_schema(entity, schema_override);
953    let table = qualified_table(schema, &entity.table_name);
954    let pk = &entity.pk_columns[0];
955    let ph = pk_placeholder(entity, 1, dialect);
956    q.params.push(Value::Null); // placeholder; caller passes real id via execute_returning_one_with_params_exec
957    let col_list = select_column_list(entity);
958    let ret = dialect.returning_clause(&col_list);
959    let suffix = if ret.is_empty() {
960        String::new()
961    } else {
962        format!(" {}", ret)
963    };
964    q.sql = format!(
965        "UPDATE {} SET {} = {} WHERE {} = {} AND {} IS NULL{}",
966        table,
967        quoted(archive_field),
968        dialect.now_fn(),
969        quoted(pk),
970        ph,
971        quoted(archive_field),
972        suffix
973    );
974    q
975}