Skip to main content

umbral_core/orm/
dynamic.rs

1//! Runtime-typed QuerySet — the ORM's answer to "I know my model at
2//! request time, not compile time."
3//!
4//! `Manager<T>` is parameterised by a `T: Model` so the typed columns
5//! (`post::TITLE`, `post::ID`) carry their `SqlType` and Rust type at
6//! the type level. That's wrong for the admin: it walks the registry
7//! at request time, so the model is a `ModelMeta` value, the column
8//! name is a `String`, and the result row is a `HashMap<String,
9//! String>` (templates can't see typed structs anyway).
10//!
11//! `DynQuerySet` is the parallel surface. It accepts string column
12//! names against a `ModelMeta`, validates them at chain time (unknown
13//! names are silently dropped so a stale `search_fields` config can't
14//! crash a request), and renders to the same `sea_query` machinery
15//! the typed path uses. Decoding goes through `SqlType` dispatch —
16//! [`decode_to_string`] is the new pub helper that mirrors the
17//! admin's private `column_to_string`.
18//!
19//! ## Scope of this first pass
20//!
21//! v0 ships the surface the admin's list / changelist / rows-fragment
22//! handlers need today: `search`, `filter_eq_string`, `order_by_col`,
23//! `limit`, `offset`, `count`, `fetch_as_strings`. INSERT / UPDATE /
24//! DELETE plus a typed `DynValue` enum land as call sites
25//! migrate. Postgres dispatch lands when the admin runs against
26//! Postgres in earnest — for now the Postgres branches panic with a
27//! clear message.
28
29use std::collections::HashMap;
30
31use sea_query::{
32    Alias, Asterisk, Condition, Expr, Func, Order, PostgresQueryBuilder, Query, SqliteQueryBuilder,
33    Value as SeaValue,
34};
35use sea_query_binder::SqlxBinder;
36use sqlx::Row;
37
38use crate::db::{DbPool, pool_for_dispatched};
39use crate::migrate::{Column, ModelMeta};
40use crate::orm::SqlType;
41use crate::orm::write::{WriteError, json_to_sea_value, null_for};
42
43/// Resolve the pool for a dynamic (late-bound) query on `meta`, routing
44/// through the `DatabaseRouter` exactly like the typed path.
45fn resolve_pool_dyn(meta: &crate::migrate::ModelMeta, op: crate::db::RouteOp) -> crate::db::DbPool {
46    let ctx = crate::db::route_context();
47    let r = crate::db::router::router();
48    let alias = match op {
49        crate::db::RouteOp::Read => r.db_for_read(meta, &ctx),
50        crate::db::RouteOp::Write => r.db_for_write(meta, &ctx),
51    };
52    pool_for_dispatched(alias.as_str()).clone()
53}
54
55/// Errors a runtime-typed query can produce.
56///
57/// Carries the structured [`WriteError`] when the failure originates
58/// in the umbral write-validator (form-coercion failures, required-
59/// field misses, future per-field validation), and bare
60/// [`sqlx::Error`] otherwise — DB-driver failures, constraint
61/// violations the validator can't pre-detect, connection drops.
62///
63/// gaps2 #12: prior to this change `DynError` was a bare alias for
64/// `sqlx::Error`, so every `WriteError` that flowed through the
65/// `DynQuerySet` form path was flattened to
66/// `sqlx::Error::Protocol("umbral::orm::write: <message>")` and the
67/// per-field map (`field_errors()` / `non_field_errors()`) was lost
68/// before the admin handler could render it. The enum preserves the
69/// structure all the way to the response surface; the admin's
70/// per-field rendering work (gaps2 #12 part 2) and the `Form<T>`
71/// extractor (gaps2 #19) both consume it directly.
72///
73/// The two-arm shape composes with `?` ergonomically because both
74/// `sqlx::Error` and `WriteError` lift via `From` — handlers can
75/// keep their existing `?` chains and reach for `match` only at the
76/// boundary where the per-field map is rendered.
77#[derive(Debug)]
78pub enum DynError {
79    /// Structured umbral-validator failure (per-field errors,
80    /// validator rules, FK / unique violations the validator
81    /// pre-detected). The carried [`WriteError`] keeps its
82    /// `field_errors()` / `non_field_errors()` accessors.
83    Write(WriteError),
84    /// Bare sqlx failure (driver-level error, connection drop,
85    /// constraint violation the validator didn't catch). Surface
86    /// the message via [`sqlx::Error`]'s own `Display`.
87    Sqlx(sqlx::Error),
88}
89
90impl std::fmt::Display for DynError {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            Self::Write(e) => write!(f, "{e}"),
94            Self::Sqlx(e) => write!(f, "{e}"),
95        }
96    }
97}
98
99impl std::error::Error for DynError {
100    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
101        match self {
102            Self::Write(e) => Some(e),
103            Self::Sqlx(e) => Some(e),
104        }
105    }
106}
107
108impl From<sqlx::Error> for DynError {
109    fn from(e: sqlx::Error) -> Self {
110        Self::Sqlx(e)
111    }
112}
113
114impl From<WriteError> for DynError {
115    fn from(e: WriteError) -> Self {
116        Self::Write(e)
117    }
118}
119
120/// A runtime-typed, lazy SQL query against one `ModelMeta`.
121///
122/// Built by [`DynQuerySet::for_meta`]; chain `.search(...)` /
123/// `.filter_eq_string(...)` / `.order_by_col(...)` / `.limit(...)` /
124/// `.offset(...)` to refine; finish with `.count()` or
125/// `.fetch_as_strings()`.
126pub struct DynQuerySet<'a> {
127    meta: &'a ModelMeta,
128    /// Accumulated WHERE clauses, ANDed together at terminal time.
129    /// Stored as `Condition` (not pushed into a `SelectStatement`
130    /// directly) so `count()` and `fetch_as_strings()` can reuse the
131    /// same predicate set against different SELECT projections.
132    where_clauses: Vec<Condition>,
133    order: Vec<(String, bool)>,
134    limit: Option<u64>,
135    offset: Option<u64>,
136    select_cols: Vec<String>,
137    with_deleted: bool,
138    only_deleted: bool,
139    hard_delete: bool,
140    /// FK column names to expand via a batched `IN (...)` lookup
141    /// after the main query — same one-hop semantics as the typed
142    /// `QuerySet::select_related`. Each entry must be a single-hop
143    /// FK column on `meta` (validated when added). When non-empty,
144    /// `fetch_as_json` / `first_as_json` swap the FK integer values
145    /// in the response for the full related-row JSON object.
146    /// Drives the REST plugin's `?include=fk1,fk2` query param.
147    select_related: Vec<String>,
148}
149
150impl<'a> DynQuerySet<'a> {
151    /// Start a `SELECT` against the model's table. The column list
152    /// defaults to every field in declaration order; restrict it with
153    /// `.select_cols(...)` before fetching when you only want a subset.
154    pub fn for_meta(meta: &'a ModelMeta) -> Self {
155        let select_cols = meta.fields.iter().map(|c| c.name.clone()).collect();
156        Self {
157            meta,
158            where_clauses: Vec::new(),
159            order: Vec::new(),
160            limit: None,
161            offset: None,
162            select_cols,
163            with_deleted: false,
164            only_deleted: false,
165            hard_delete: false,
166            select_related: Vec::new(),
167        }
168    }
169
170    /// Include soft-deleted rows for models tagged with
171    /// `#[umbral(soft_delete)]`.
172    pub fn with_deleted(mut self) -> Self {
173        self.with_deleted = true;
174        self
175    }
176
177    /// Restrict a soft-delete model to only rows whose `deleted_at` is
178    /// populated.
179    pub fn only_deleted(mut self) -> Self {
180        self.only_deleted = true;
181        self
182    }
183
184    /// Force a real `DELETE` for a soft-delete model.
185    pub fn hard_delete(mut self) -> Self {
186        self.hard_delete = true;
187        self
188    }
189
190    fn effective_where_clauses(&self) -> Vec<Condition> {
191        let mut clauses = self.where_clauses.clone();
192        if self.meta.soft_delete {
193            if self.only_deleted {
194                clauses
195                    .push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
196            } else if !self.with_deleted {
197                clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
198            }
199        }
200        clauses
201    }
202
203    fn live_where_clauses(&self) -> Vec<Condition> {
204        let mut clauses = self.where_clauses.clone();
205        if self.meta.soft_delete {
206            clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
207        }
208        clauses
209    }
210
211    /// Restrict the SELECT list to the supplied column names. Names
212    /// that don't exist on the model are silently dropped so a stale
213    /// `list_display` config can't crash a request.
214    pub fn select_cols(mut self, cols: &[String]) -> Self {
215        let valid: Vec<String> = cols
216            .iter()
217            .filter(|n| self.meta.fields.iter().any(|c| &c.name == *n))
218            .cloned()
219            .collect();
220        if !valid.is_empty() {
221            self.select_cols = valid;
222        }
223        self
224    }
225
226    /// Expand the named FK columns via a batched `IN (...)` lookup
227    /// after the main query — mirrors the typed
228    /// `QuerySet::select_related` shape (single-hop and `__`-chained
229    /// alike). After this call, every FK field along the chain in
230    /// the response JSON renders as the full related-row object
231    /// instead of the raw integer id. Query budget is
232    /// `1 + len(hops)` per chain regardless of how many parent rows
233    /// came back (no N+1) — gap2 #18.
234    ///
235    /// Names may use either `.` (URL-natural) or `__` (lookup
236    /// style) as the hop separator; both normalize to the
237    /// same canonical chain internally. Mixed separators in one
238    /// token (e.g. `author.profile__org`) are accepted too.
239    ///
240    /// Names that don't exist on the model OR aren't FK columns at
241    /// any hop are silently dropped — the REST plugin's `?include=`
242    /// handler does its own up-front validation with a 400 on
243    /// unknown names, so stale dynamic includes (e.g. an internal
244    /// call site that hardcoded a name that was later renamed) just
245    /// skip without crashing the request.
246    ///
247    /// ```ignore
248    /// DynQuerySet::for_meta(&meta)
249    ///     .select_related_dyn(&["user".into(), "author.profile".into()])
250    ///     .fetch_as_json().await
251    /// ```
252    pub fn select_related_dyn(mut self, fields: &[String]) -> Self {
253        for name in fields {
254            let canonical = normalize_sr_token(name);
255            if validate_sr_chain(self.meta, &canonical).is_none() {
256                continue;
257            }
258            if !self.select_related.iter().any(|n| n == &canonical) {
259                self.select_related.push(canonical);
260            }
261        }
262        self
263    }
264
265    /// Read-side accessor for the resolved select_related list.
266    /// Used by tests + the REST handler's debug-logging path.
267    #[doc(hidden)]
268    pub fn select_related_fields(&self) -> &[String] {
269        &self.select_related
270    }
271
272    /// Add `WHERE (<predicate1> OR <predicate2> OR ...)` for a free-text
273    /// term against the model's searchable columns. Per-column predicate
274    /// depends on the column's [`SqlType`]:
275    ///
276    /// | SqlType | Predicate |
277    /// |---|---|
278    /// | `Text` | `UPPER(col) LIKE '%TERM%'` — case-insensitive substring |
279    /// | `SmallInt` / `Integer` / `BigInt` / `ForeignKey` | `col = term` when `term.parse::<i64>().is_ok()` |
280    /// | `Real` / `Double` | `col = term` when `term.parse::<f64>().is_ok()` |
281    /// | `Boolean` | `col = term` when `term` parses as `true` / `false` |
282    /// | everything else (Date, Time, Uuid, Json, Bytes, Array, …) | skipped |
283    ///
284    /// `fields` controls which columns participate:
285    ///
286    /// - **Non-empty:** restricted to the named columns. Names that
287    ///   don't exist on the model are silently dropped.
288    /// - **Empty:** every column on the model is a candidate; the
289    ///   per-type table above decides which actually contribute. This
290    ///   is the "no `search_fields` configured" default behaviour.
291    ///
292    /// Empty `term` (after trimming) is always a no-op. If the column
293    /// selection results in zero predicates (e.g. `term = "abc"` and
294    /// the only candidate columns are numeric), nothing is appended.
295    pub fn search(mut self, fields: &[String], term: &str) -> Self {
296        let term = term.trim();
297        if term.is_empty() {
298            return self;
299        }
300
301        let restricted = !fields.is_empty();
302        let as_int = term.parse::<i64>().ok();
303        let as_float = term.parse::<f64>().ok();
304        let as_bool = match term.to_ascii_lowercase().as_str() {
305            "true" => Some(true),
306            "false" => Some(false),
307            _ => None,
308        };
309        // Escape LIKE wildcards in the user's term so `%`/`_` are matched
310        // literally, not as wildcards (ORM-1). Paired with `.escape('\\')`.
311        let like_pat = format!("%{}%", crate::orm::escape_like_literal(term)).to_uppercase();
312
313        let mut cond = Condition::any();
314        let mut added = 0;
315        for col in &self.meta.fields {
316            if restricted && !fields.iter().any(|f| f == &col.name) {
317                continue;
318            }
319            let predicate: Option<sea_query::SimpleExpr> = match col.ty {
320                SqlType::Text => Some(
321                    Expr::expr(Func::upper(Expr::col(Alias::new(&col.name))))
322                        .like(sea_query::LikeExpr::new(like_pat.clone()).escape('\\')),
323                ),
324                SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => {
325                    as_int.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
326                }
327                SqlType::Real | SqlType::Double => {
328                    as_float.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
329                }
330                SqlType::Boolean => as_bool.map(|b| Expr::col(Alias::new(&col.name)).eq(b)),
331                _ => None,
332            };
333            if let Some(p) = predicate {
334                cond = cond.add(p);
335                added += 1;
336            }
337        }
338        if added > 0 {
339            self.where_clauses.push(cond);
340        }
341        self
342    }
343
344    /// Splice an externally-built `sea_query::Condition` into the
345    /// accumulated WHERE clauses. Used by callers that need lookups
346    /// the typed builder methods don't cover (e.g. umbral-rest's
347    /// query-string filter parser produces a `Condition` per
348    /// `field__lookup=value` triple and feeds it in here).
349    pub fn filter_condition(mut self, cond: sea_query::Condition) -> Self {
350        self.where_clauses.push(cond);
351        self
352    }
353
354    /// Add `WHERE <col> IN (?, ?, ...)` for an i64 column (PK / FK).
355    /// Empty `vals` is a no-op; unknown columns are silently dropped.
356    pub fn filter_in_i64(mut self, col: &str, vals: &[i64]) -> Self {
357        if vals.is_empty() || !self.meta.fields.iter().any(|c| c.name == col) {
358            return self;
359        }
360        let cond = Condition::all().add(Expr::col(Alias::new(col)).is_in(vals.iter().copied()));
361        self.where_clauses.push(cond);
362        self
363    }
364
365    /// Filter the parent set down to rows that have an M2M link to at
366    /// least one of `child_ids` through the named M2M field. Emits:
367    ///
368    /// ```sql
369    /// WHERE <pk> IN (
370    ///     SELECT parent_id FROM <parent_table>_<field_name>
371    ///     WHERE child_id IN (?, ?, ...)
372    /// )
373    /// ```
374    ///
375    /// The junction table name follows the framework's
376    /// `{parent_table}_{field_name}` convention (same as
377    /// `set_junction_dynamic` and the migration emitter use). Returns
378    /// `self` unchanged when:
379    ///   - `child_ids` is empty,
380    ///   - no M2M relation with that `field_name` exists on the model,
381    ///   - the parent model has no PK column,
382    ///   - every value in `child_ids` fails to parse as `i64`
383    ///     (M2M PKs are i64 at v1 across the framework).
384    ///
385    /// Use case: admin filter for "products with tag 1 OR tag 2 OR
386    /// tag 3" — call once with all three child ids; the IN subquery
387    /// is one round-trip regardless of selection count.
388    pub fn filter_m2m_contains_any(mut self, field_name: &str, child_ids: &[String]) -> Self {
389        if child_ids.is_empty() {
390            return self;
391        }
392        let Some(rel) = self
393            .meta
394            .m2m_relations
395            .iter()
396            .find(|r| r.field_name == field_name)
397        else {
398            return self;
399        };
400        let Some(pk_col) = self.meta.pk_column() else {
401            return self;
402        };
403        // PK lift Pass B: bind child ids per the M2M target's PK
404        // type, not always i64. Pre-fix, `permissions_permission`
405        // (whose PK is the `codename` String column) couldn't be
406        // filtered via this method because every string id parsed
407        // as `i64::Err` and got dropped. The junction table's
408        // `child_id` column type matches the target's PK type at
409        // DDL emission, so binding correctly here keeps SQLite +
410        // Postgres affinity happy.
411        // PK lift Pass E: cached lookup. Previously cloned the full
412        // model registry per `filter_m2m_contains_any` call.
413        let target_pk_ty = crate::migrate::pk_meta_for_table(&rel.target_table)
414            .map(|(_, ty)| ty)
415            .unwrap_or(SqlType::BigInt);
416        let junction_table = format!("{}_{}", self.meta.table, rel.field_name);
417        let child_id_expr = Expr::col(Alias::new("child_id"));
418        let in_clause: sea_query::SimpleExpr = match target_pk_ty {
419            SqlType::Text | SqlType::Uuid => {
420                // String / UUID PK: bind raw strings. Empty / all-
421                // whitespace tokens drop out (no realistic PK is
422                // blank); everything else goes in verbatim.
423                let bound: Vec<String> = child_ids
424                    .iter()
425                    .filter_map(|s| {
426                        let s = s.trim();
427                        if s.is_empty() {
428                            None
429                        } else {
430                            Some(s.to_string())
431                        }
432                    })
433                    .collect();
434                if bound.is_empty() {
435                    return self;
436                }
437                child_id_expr.is_in(bound)
438            }
439            _ => {
440                // Integer-PK target (default): parse to i64. Same
441                // behaviour as pre-fix; this arm matches the
442                // pre-existing semantics for every shipped model.
443                let parsed: Vec<i64> = child_ids.iter().filter_map(|s| s.parse().ok()).collect();
444                if parsed.is_empty() {
445                    return self;
446                }
447                child_id_expr.is_in(parsed)
448            }
449        };
450        let subq = Query::select()
451            .column(Alias::new("parent_id"))
452            .from(crate::db::router::schema_qualified_table(&junction_table))
453            .and_where(in_clause)
454            .to_owned();
455        let cond =
456            Condition::all().add(Expr::col(Alias::new(pk_col.name.clone())).in_subquery(subq));
457        self.where_clauses.push(cond);
458        self
459    }
460
461    /// Add `WHERE <col> IN (?, ?, ...)` for any column. Each value is
462    /// parsed against the column's [`SqlType`] (same coercion as
463    /// [`Self::filter_eq_string`]) so SQLite's affinity rules see the
464    /// right operand type. Values that fail to parse are dropped from
465    /// the IN list. Empty `vals` (or all-unparseable) is a no-op;
466    /// unknown columns are silently dropped.
467    ///
468    /// Single-value calls degenerate to `<col> = ?` via sea-query's
469    /// `is_in` lowering — callers can use this for both the "one
470    /// selection" and "multi-selection" filter paths and get the
471    /// natural SQL in each case.
472    pub fn filter_in_strings(mut self, col: &str, vals: &[String]) -> Self {
473        let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
474            return self;
475        };
476        if vals.is_empty() {
477            return self;
478        }
479        let expr = Expr::col(Alias::new(col));
480        // Coerce each string value to the column's native type so the
481        // bind kind matches and SQLite's STRICT mode (and Postgres's
482        // type system) accepts the parameter. `fk_effective_type` resolves
483        // a ForeignKey to its target's PK type, so an FK to a String/Uuid
484        // target binds the raw string (via the `_` arm) instead of being
485        // parsed as i64 and dropped.
486        let cond = match crate::migrate::fk_effective_type(meta_col) {
487            SqlType::SmallInt | SqlType::Integer => {
488                let parsed: Vec<i32> = vals.iter().filter_map(|s| s.parse().ok()).collect();
489                if parsed.is_empty() {
490                    return self;
491                }
492                Condition::all().add(expr.is_in(parsed))
493            }
494            SqlType::BigInt | SqlType::ForeignKey => {
495                let parsed: Vec<i64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
496                if parsed.is_empty() {
497                    return self;
498                }
499                Condition::all().add(expr.is_in(parsed))
500            }
501            SqlType::Real | SqlType::Double => {
502                let parsed: Vec<f64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
503                if parsed.is_empty() {
504                    return self;
505                }
506                Condition::all().add(expr.is_in(parsed))
507            }
508            SqlType::Boolean => {
509                let parsed: Vec<bool> = vals
510                    .iter()
511                    .map(|s| matches!(s.as_str(), "true" | "on" | "1"))
512                    .collect();
513                Condition::all().add(expr.is_in(parsed))
514            }
515            // UUIDs are stored as BLOB in SQLite (sqlx Encode<Sqlite> for Uuid
516            // uses .as_bytes()). Binding them as strings would miss every row.
517            // Parse each submitted string into a Uuid and pass the typed vec so
518            // sea-query-binder emits blob binds that match the stored values.
519            SqlType::Uuid => {
520                let parsed: Vec<uuid::Uuid> = vals
521                    .iter()
522                    .filter_map(|s| uuid::Uuid::parse_str(s).ok())
523                    .collect();
524                if parsed.is_empty() {
525                    return self;
526                }
527                Condition::all().add(expr.is_in(parsed))
528            }
529            _ => Condition::all().add(expr.is_in(vals.iter().map(|s| s.to_string()))),
530        };
531        self.where_clauses.push(cond);
532        self
533    }
534
535    /// Add `WHERE <col> = <value>` where the value is parsed against
536    /// the column's `SqlType` so SQLite's affinity rules see the right
537    /// operand type.
538    pub fn filter_eq_string(mut self, col: &str, value: &str) -> Self {
539        let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
540            return self;
541        };
542        let expr = Expr::col(Alias::new(col));
543        // FK-to-non-i64-target columns resolve to their target PK type, so
544        // a String/Uuid FK matches the `_` arm and binds the raw string.
545        let predicate = match crate::migrate::fk_effective_type(meta_col) {
546            SqlType::SmallInt | SqlType::Integer => value.parse::<i32>().ok().map(|v| expr.eq(v)),
547            SqlType::BigInt | SqlType::ForeignKey => value.parse::<i64>().ok().map(|v| expr.eq(v)),
548            SqlType::Real | SqlType::Double => value.parse::<f64>().ok().map(|v| expr.eq(v)),
549            SqlType::Boolean => {
550                let v = matches!(value, "true" | "on" | "1");
551                Some(expr.eq(v))
552            }
553            // UUIDs stored as BLOB in SQLite — parse the string into a typed
554            // Uuid so sea-query-binder emits a blob bind that matches the row.
555            SqlType::Uuid => uuid::Uuid::parse_str(value).ok().map(|u| expr.eq(u)),
556            _ => Some(expr.eq(value.to_string())),
557        };
558        if let Some(p) = predicate {
559            self.where_clauses.push(Condition::all().add(p));
560        }
561        self
562    }
563
564    /// Add `ORDER BY <col> ASC|DESC`. Unknown columns are silently
565    /// dropped. Multiple calls append (sea-query semantics).
566    pub fn order_by_col(mut self, col: &str, descending: bool) -> Self {
567        if self.meta.fields.iter().any(|c| c.name == col) {
568            self.order.push((col.to_string(), descending));
569        }
570        self
571    }
572
573    /// Set `LIMIT`.
574    pub fn limit(mut self, n: u64) -> Self {
575        self.limit = Some(n);
576        self
577    }
578
579    /// Set `OFFSET`.
580    pub fn offset(mut self, n: u64) -> Self {
581        self.offset = Some(n);
582        self
583    }
584
585    /// Terminal: `SELECT COUNT(*)` with the accumulated WHERE
586    /// clauses. ORDER BY / LIMIT / OFFSET are dropped (irrelevant
587    /// to a count).
588    pub async fn count(self) -> Result<i64, DynError> {
589        let mut q = Query::select();
590        q.from(crate::db::router::schema_qualified_table(&self.meta.table));
591        q.expr(Func::count(Expr::col(Asterisk)));
592        let where_clauses = self.effective_where_clauses();
593        for cond in &where_clauses {
594            q.cond_where(cond.clone());
595        }
596
597        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
598            DbPool::Sqlite(pool) => {
599                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
600                let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
601                Ok(row.try_get::<i64, _>(0)?)
602            }
603            DbPool::Postgres(pool) => {
604                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
605                let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
606                Ok(row.try_get::<i64, _>(0)?)
607            }
608        }
609    }
610
611    /// Terminal: `SELECT DISTINCT <col>` with the accumulated WHERE.
612    /// Returns each value as a string (via [`decode_to_string`]). LIMIT
613    /// is honoured; ORDER BY isn't (DISTINCT ordering is whatever the
614    /// underlying scan yields). Unknown column → empty result.
615    pub async fn fetch_distinct_strings(self, col: &str) -> Result<Vec<String>, DynError> {
616        let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
617            return Ok(Vec::new());
618        };
619        let mut q = Query::select();
620        q.distinct();
621        q.from(crate::db::router::schema_qualified_table(&self.meta.table));
622        q.column(Alias::new(col));
623        let where_clauses = self.effective_where_clauses();
624        for cond in &where_clauses {
625            q.cond_where(cond.clone());
626        }
627        if let Some(n) = self.limit {
628            q.limit(n);
629        }
630
631        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
632            DbPool::Sqlite(pool) => {
633                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
634                let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
635                let mut out = Vec::with_capacity(rows.len());
636                for row in rows {
637                    out.push(decode_to_string(&row, col_meta)?);
638                }
639                Ok(out)
640            }
641            DbPool::Postgres(pool) => {
642                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
643                let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
644                let mut out = Vec::with_capacity(rows.len());
645                for row in rows {
646                    out.push(decode_pg_to_string(&row, col_meta)?);
647                }
648                Ok(out)
649            }
650        }
651    }
652
653    /// Terminal: `DELETE FROM <table>` with the accumulated WHERE.
654    /// Returns the number of rows affected.
655    ///
656    /// gaps #77: pre-collects the affected PKs (one extra SELECT per
657    /// call) before the DELETE so `bulk_post_delete:<table>` can fire
658    /// with the actual row ids. Subscribers that need to invalidate
659    /// caches / write audit-log rows / sync a search index get the
660    /// list of PKs that just left the table, not just a row count.
661    pub async fn delete(self) -> Result<u64, DynError> {
662        if self.meta.soft_delete && !self.hard_delete {
663            return self.soft_delete_update().await;
664        }
665        let where_clauses = self.effective_where_clauses();
666        // Pre-collect the affected PKs only when the model has a PK
667        // column (every Model does in practice; the guard handles
668        // the hypothetical PK-less ModelMeta).
669        let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
670            Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &where_clauses)
671                .await
672                .unwrap_or_default(),
673            None => Vec::new(),
674        };
675
676        let mut q = Query::delete();
677        q.from_table(crate::db::router::schema_qualified_table(&self.meta.table));
678        for cond in &where_clauses {
679            q.cond_where(cond.clone());
680        }
681
682        let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
683            DbPool::Sqlite(pool) => {
684                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
685                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
686                res.rows_affected()
687            }
688            DbPool::Postgres(pool) => {
689                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
690                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
691                res.rows_affected()
692            }
693        };
694
695        // gaps #77: emit `bulk_post_delete:<table>` with the PKs we
696        // captured pre-DELETE. Fires even when zero rows matched —
697        // matches the typed bulk-delete convention (subscribers that
698        // want to skip empty events filter in their handler).
699        crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
700        Ok(rows_affected)
701    }
702
703    async fn soft_delete_update(self) -> Result<u64, DynError> {
704        let where_clauses = self.live_where_clauses();
705        let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
706            Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
707                .await
708                .unwrap_or_default(),
709            None => Vec::new(),
710        };
711
712        let mut q = Query::update();
713        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
714        q.value(
715            Alias::new("deleted_at"),
716            sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
717        );
718        for cond in &where_clauses {
719            q.cond_where(cond.clone());
720        }
721
722        let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
723            DbPool::Sqlite(pool) => {
724                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
725                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
726                res.rows_affected()
727            }
728            DbPool::Postgres(pool) => {
729                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
730                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
731                res.rows_affected()
732            }
733        };
734
735        crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
736        Ok(rows_affected)
737    }
738
739    /// Terminal: undo a soft-delete — `UPDATE <table> SET deleted_at =
740    /// NULL` for the rows matching the accumulated WHERE that are
741    /// currently soft-deleted (`deleted_at IS NOT NULL`). Returns the
742    /// number of rows restored. A no-op (0 rows) on a model that isn't
743    /// tagged `soft_delete`, since there is no `deleted_at` column to
744    /// clear — the caller should gate on `meta.soft_delete` first.
745    ///
746    /// This is the inverse of [`Self::delete`] on a soft-delete model:
747    /// `delete()` stamps `deleted_at = now()`, `restore()` clears it.
748    /// The admin's "Restore selected" trash action drives this.
749    pub async fn restore(self) -> Result<u64, DynError> {
750        if !self.meta.soft_delete {
751            return Ok(0);
752        }
753        // Restrict to the rows the caller selected AND that are
754        // actually trashed — restoring a live row is a no-op but
755        // narrowing here keeps the affected-count honest.
756        let mut where_clauses = self.where_clauses.clone();
757        where_clauses
758            .push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
759
760        let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
761            Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
762                .await
763                .unwrap_or_default(),
764            None => Vec::new(),
765        };
766
767        let mut q = Query::update();
768        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
769        q.value(
770            Alias::new("deleted_at"),
771            sea_query::Value::ChronoDateTimeUtc(None),
772        );
773        for cond in &where_clauses {
774            q.cond_where(cond.clone());
775        }
776
777        let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
778            DbPool::Sqlite(pool) => {
779                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
780                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
781                res.rows_affected()
782            }
783            DbPool::Postgres(pool) => {
784                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
785                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
786                res.rows_affected()
787            }
788        };
789
790        // Restoring a row is a "save" from the data model's POV — the
791        // row re-enters the live set — so emit the bulk-post-save
792        // signal, mirroring how soft-delete emits bulk-post-delete.
793        crate::signals::emit_bulk_post_save_by_table(&self.meta.table, parent_pks, false).await;
794        Ok(rows_affected)
795    }
796
797    /// Terminal: `UPDATE <table> SET <col> = <value>` with the
798    /// accumulated WHERE. The value is parsed against the column's
799    /// `SqlType` so SQLite affinity sees the right operand. Returns
800    /// the number of rows affected. Unknown column → 0 rows.
801    pub async fn update_one(self, col: &str, value: &str) -> Result<u64, DynError> {
802        let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
803            return Ok(0);
804        };
805        let sea_value = match form_str_to_sea_value(col_meta, value) {
806            Ok(v) => v,
807            // gaps2 #12: per-field validator failure (see `update_form`).
808            Err(e) => {
809                return Err(DynError::Write(WriteError::Validator {
810                    field: col_meta.name.clone(),
811                    message: e.to_string(),
812                }));
813            }
814        };
815
816        let mut q = Query::update();
817        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
818        q.value(Alias::new(col), sea_value);
819        let where_clauses = self.effective_where_clauses();
820        for cond in &where_clauses {
821            q.cond_where(cond.clone());
822        }
823
824        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
825            DbPool::Sqlite(pool) => {
826                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
827                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
828                Ok(res.rows_affected())
829            }
830            DbPool::Postgres(pool) => {
831                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
832                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
833                Ok(res.rows_affected())
834            }
835        }
836    }
837
838    /// Terminal: `UPDATE <table> SET <col1> = ?, <col2> = ?, ...` with
839    /// the accumulated WHERE. Each form value is parsed against its
840    /// column's `SqlType`. The primary key column is silently dropped
841    /// from the form (it's the filter, not a target). `skip` lists
842    /// columns the caller wants excluded (e.g. readonly fields the
843    /// admin already enforced). Returns rows affected.
844    pub async fn update_form(
845        self,
846        form: &HashMap<String, String>,
847        skip: &[String],
848    ) -> Result<u64, DynError> {
849        let Some(q) = self.build_update_form_query(form, skip)? else {
850            return Ok(0);
851        };
852
853        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
854            DbPool::Sqlite(pool) => {
855                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
856                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
857                Ok(res.rows_affected())
858            }
859            DbPool::Postgres(pool) => {
860                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
861                let res = sqlx::query_with(&sql, values).execute(&pool).await?;
862                Ok(res.rows_affected())
863            }
864        }
865    }
866
867    /// Build the `UPDATE` statement (SET clauses + accumulated WHERE)
868    /// for [`Self::update_form`] / [`Self::update_form_in_tx`]. Returns
869    /// `None` when no column would be written (the callers translate
870    /// that into a `0` return). Holds all per-field validation —
871    /// PK/skip exclusion, `auto_now` refresh, and the structured
872    /// [`WriteError::Validator`] — so the pool and transaction paths
873    /// build provably the same statement.
874    fn build_update_form_query(
875        &self,
876        form: &HashMap<String, String>,
877        skip: &[String],
878    ) -> Result<Option<sea_query::UpdateStatement>, DynError> {
879        let mut q = Query::update();
880        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
881        let mut any = false;
882        for col in &self.meta.fields {
883            if col.primary_key || skip.iter().any(|s| s == &col.name) {
884                continue;
885            }
886            // `auto_now` columns refresh on every update — push
887            // `Utc::now()` regardless of whether the form carried
888            // the column. `auto_now_add` stays frozen on update
889            // (fired once at INSERT time); it falls through to the
890            // standard "form omitted → skip" path below. Mirrors
891            // `update_json` (line ~1047) so form + JSON write paths
892            // honor the annotation identically.
893            if col.auto_now {
894                q.value(
895                    Alias::new(&col.name),
896                    crate::orm::write::now_for_column(col.ty),
897                );
898                any = true;
899                continue;
900            }
901            let Some(raw) = form.get(&col.name) else {
902                continue;
903            };
904            let sea_value = match form_str_to_sea_value(col, raw) {
905                Ok(v) => v,
906                // gaps2 #12: emit a structured per-field validator
907                // failure so the admin / Form<T> consumer can render
908                // it under the offending input. The pre-fix path
909                // flattened to `sqlx::Error::Protocol(...)` and the
910                // per-field hint was lost.
911                Err(e) => {
912                    return Err(DynError::Write(WriteError::Validator {
913                        field: col.name.clone(),
914                        message: e.to_string(),
915                    }));
916                }
917            };
918            q.value(Alias::new(&col.name), sea_value);
919            any = true;
920        }
921        if !any {
922            return Ok(None);
923        }
924        let where_clauses = self.effective_where_clauses();
925        for cond in &where_clauses {
926            q.cond_where(cond.clone());
927        }
928        Ok(Some(q))
929    }
930
931    /// Transaction-aware sibling of [`Self::update_form`]. Builds and
932    /// executes the identical `UPDATE` (same per-field validation,
933    /// `skip` / PK exclusion, `auto_now` refresh, [`WriteError::Validator`]
934    /// shape, and accumulated WHERE) but runs it on the caller-supplied
935    /// `tx`. The caller owns `commit` / `rollback`, so the update is
936    /// uncommitted until they say so — used by the admin to save a
937    /// parent edit and its inline child changes atomically.
938    pub async fn update_form_in_tx(
939        self,
940        tx: &mut crate::db::Transaction,
941        form: &HashMap<String, String>,
942        skip: &[String],
943    ) -> Result<u64, DynError> {
944        let Some(q) = self.build_update_form_query(form, skip)? else {
945            return Ok(0);
946        };
947
948        match tx.backend_name() {
949            "sqlite" => {
950                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
951                let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
952                let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
953                Ok(res.rows_affected())
954            }
955            _ => {
956                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
957                let inner = tx.as_pg_mut().expect("postgres backend_name");
958                let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
959                Ok(res.rows_affected())
960            }
961        }
962    }
963
964    /// Terminal: `INSERT INTO <table> (...) VALUES (...)` from a form
965    /// map. Auto-increment integer PKs are omitted when the form value
966    /// is missing or empty (SQLite hands out the next id). Form keys
967    /// that don't match a column are ignored. `skip` lets the caller
968    /// drop fields the admin pre-filtered. Returns `last_insert_rowid`.
969    pub async fn insert_form(
970        self,
971        form: &HashMap<String, String>,
972        skip: &[String],
973    ) -> Result<i64, DynError> {
974        let Some(mut q) = self.build_insert_form_query(form, skip)? else {
975            return Ok(0);
976        };
977
978        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
979            DbPool::Sqlite(pool) => {
980                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
981                let res = sqlx::query_with(&sql, vals).execute(&pool).await?;
982                Ok(res.last_insert_rowid())
983            }
984            DbPool::Postgres(pool) => {
985                // Postgres doesn't have last_insert_rowid; we ask for
986                // RETURNING the PK and read it back. Falls back to 0
987                // when the model has no integer PK (e.g. UUID PKs) —
988                // the caller's flow needs to skip relying on the
989                // return value in that case.
990                let pk_name = self
991                    .meta
992                    .fields
993                    .iter()
994                    .find(|c| c.primary_key)
995                    .map(|c| c.name.clone());
996                if let Some(pk) = pk_name {
997                    q.returning_col(Alias::new(&pk));
998                    let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
999                    let row = sqlx::query_with(&sql, vals).fetch_one(&pool).await?;
1000                    Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
1001                } else {
1002                    let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1003                    let _ = sqlx::query_with(&sql, vals).execute(&pool).await?;
1004                    Ok(0)
1005                }
1006            }
1007        }
1008    }
1009
1010    /// Build the `INSERT` statement for [`Self::insert_form`] /
1011    /// [`Self::insert_form_in_tx`]. Returns `None` when no column
1012    /// survives the `skip` / auto-increment-PK filtering (the callers
1013    /// translate that into a `0` return). All per-field validation —
1014    /// auto-now/auto-now-add stamping, the auto-increment PK omission,
1015    /// and the structured [`WriteError::Validator`] on a bad value —
1016    /// lives here so the pool and transaction paths build provably the
1017    /// same statement.
1018    fn build_insert_form_query(
1019        &self,
1020        form: &HashMap<String, String>,
1021        skip: &[String],
1022    ) -> Result<Option<sea_query::InsertStatement>, DynError> {
1023        let mut cols: Vec<&str> = Vec::new();
1024        let mut values: Vec<SeaValue> = Vec::new();
1025        for col in &self.meta.fields {
1026            if skip.iter().any(|s| s == &col.name) {
1027                continue;
1028            }
1029            // Auto-increment PK: omit when the form supplies no value
1030            // or an empty one; the backend hands out the next id.
1031            if col.primary_key
1032                && matches!(
1033                    col.ty,
1034                    SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
1035                )
1036                && form.get(&col.name).is_none_or(|v| v.is_empty())
1037            {
1038                continue;
1039            }
1040            // `auto_now_add` / `auto_now` columns: when the form
1041            // omits the field (the post-fix admin shape — these
1042            // columns are hidden from create + edit forms), fill
1043            // with `Utc::now()` here. Mirrors the same handling on
1044            // `insert_json` (line ~836) so the form path and the
1045            // JSON path stay consistent — both honor the annotation
1046            // without the body / form having to carry the value.
1047            if (col.auto_now_add || col.auto_now)
1048                && form.get(&col.name).is_none_or(|v| v.is_empty())
1049            {
1050                cols.push(&col.name);
1051                values.push(crate::orm::write::now_for_column(col.ty));
1052                continue;
1053            }
1054            let raw = form.get(&col.name).map(|s| s.as_str()).unwrap_or("");
1055            let sea_value = match form_str_to_sea_value(col, raw) {
1056                Ok(v) => v,
1057                // gaps2 #12: structured per-field validator failure
1058                // (see the matching site in `update_form`).
1059                Err(e) => {
1060                    return Err(DynError::Write(WriteError::Validator {
1061                        field: col.name.clone(),
1062                        message: e.to_string(),
1063                    }));
1064                }
1065            };
1066            cols.push(&col.name);
1067            values.push(sea_value);
1068        }
1069        if cols.is_empty() {
1070            return Ok(None);
1071        }
1072
1073        let mut q = Query::insert();
1074        q.into_table(crate::db::router::schema_qualified_table(&self.meta.table));
1075        q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
1076        let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
1077        q.values_panic(exprs);
1078        Ok(Some(q))
1079    }
1080
1081    /// Transaction-aware sibling of [`Self::insert_form`]. Builds and
1082    /// executes the identical `INSERT` (same `form_str_to_sea_value`
1083    /// per-field validation, same `skip` / auto-increment-PK / auto-now
1084    /// handling, same [`WriteError::Validator`] shape, same returned-PK
1085    /// semantics) but runs it on the caller-supplied `tx` instead of a
1086    /// fresh pool connection. The caller owns `commit` / `rollback`, so
1087    /// the insert is uncommitted until they say so — this is what lets
1088    /// the admin save a parent row and its inline children atomically.
1089    pub async fn insert_form_in_tx(
1090        self,
1091        tx: &mut crate::db::Transaction,
1092        form: &HashMap<String, String>,
1093        skip: &[String],
1094    ) -> Result<i64, DynError> {
1095        let Some(mut q) = self.build_insert_form_query(form, skip)? else {
1096            return Ok(0);
1097        };
1098
1099        match tx.backend_name() {
1100            "sqlite" => {
1101                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1102                let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1103                let res = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
1104                Ok(res.last_insert_rowid())
1105            }
1106            _ => {
1107                // Postgres has no last_insert_rowid; RETURNING the PK
1108                // mirrors the pool path exactly, including the `0`
1109                // fallback for a non-integer PK.
1110                let pk_name = self
1111                    .meta
1112                    .fields
1113                    .iter()
1114                    .find(|c| c.primary_key)
1115                    .map(|c| c.name.clone());
1116                let inner = tx.as_pg_mut().expect("postgres backend_name");
1117                if let Some(pk) = pk_name {
1118                    q.returning_col(Alias::new(&pk));
1119                    let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1120                    let row = sqlx::query_with(&sql, vals).fetch_one(&mut **inner).await?;
1121                    Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
1122                } else {
1123                    let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1124                    let _ = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
1125                    Ok(0)
1126                }
1127            }
1128        }
1129    }
1130
1131    /// Terminal: fetch every row, decoding each cell to its string
1132    /// form via [`decode_to_string`]. Returns one `HashMap` per row,
1133    /// keyed by column name, holding only the columns named in
1134    /// `select_cols` (defaults to all).
1135    pub async fn fetch_as_strings(self) -> Result<Vec<HashMap<String, String>>, DynError> {
1136        let mut q = Query::select();
1137        q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1138        for c in &self.select_cols {
1139            q.column(Alias::new(c));
1140        }
1141        let where_clauses = self.effective_where_clauses();
1142        for cond in &where_clauses {
1143            q.cond_where(cond.clone());
1144        }
1145        for (col, descending) in &self.order {
1146            q.order_by(
1147                Alias::new(col),
1148                if *descending { Order::Desc } else { Order::Asc },
1149            );
1150        }
1151        if let Some(n) = self.limit {
1152            q.limit(n);
1153        }
1154        if let Some(n) = self.offset {
1155            q.offset(n);
1156        }
1157
1158        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
1159            DbPool::Sqlite(pool) => {
1160                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1161                let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1162                let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
1163                for row in rows {
1164                    let mut entry = HashMap::new();
1165                    for col_name in &self.select_cols {
1166                        if let Some(col_meta) =
1167                            self.meta.fields.iter().find(|c| &c.name == col_name)
1168                        {
1169                            let v = decode_to_string(&row, col_meta)?;
1170                            entry.insert(col_name.clone(), v);
1171                        }
1172                    }
1173                    out.push(entry);
1174                }
1175                Ok(out)
1176            }
1177            DbPool::Postgres(pool) => {
1178                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1179                let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1180                let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
1181                for row in rows {
1182                    let mut entry = HashMap::new();
1183                    for col_name in &self.select_cols {
1184                        if let Some(col_meta) =
1185                            self.meta.fields.iter().find(|c| &c.name == col_name)
1186                        {
1187                            let v = decode_pg_to_string(&row, col_meta)?;
1188                            entry.insert(col_name.clone(), v);
1189                        }
1190                    }
1191                    out.push(entry);
1192                }
1193                Ok(out)
1194            }
1195        }
1196    }
1197
1198    /// Terminal: fetch every row, decoding each cell to a
1199    /// `serde_json::Value` that preserves JSON shape (numbers stay
1200    /// numbers, booleans stay booleans, JSON columns nest verbatim).
1201    /// The right shape for HTTP API responses. Returns one
1202    /// `serde_json::Map` per row, keyed by column name.
1203    pub async fn fetch_as_json(
1204        self,
1205    ) -> Result<Vec<serde_json::Map<String, serde_json::Value>>, DynError> {
1206        let mut q = Query::select();
1207        q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1208        for c in &self.select_cols {
1209            q.column(Alias::new(c));
1210        }
1211        let where_clauses = self.effective_where_clauses();
1212        for cond in &where_clauses {
1213            q.cond_where(cond.clone());
1214        }
1215        for (col, descending) in &self.order {
1216            q.order_by(
1217                Alias::new(col),
1218                if *descending { Order::Desc } else { Order::Asc },
1219            );
1220        }
1221        if let Some(n) = self.limit {
1222            q.limit(n);
1223        }
1224        if let Some(n) = self.offset {
1225            q.offset(n);
1226        }
1227
1228        let pk_name = self
1229            .meta
1230            .pk_column()
1231            .map(|c| c.name.clone())
1232            .unwrap_or_default();
1233        let selected_cols: Vec<(&String, &Column)> = self
1234            .select_cols
1235            .iter()
1236            .filter_map(|col_name| {
1237                self.meta
1238                    .fields
1239                    .iter()
1240                    .find(|c| &c.name == col_name)
1241                    .map(|col| (col_name, col))
1242            })
1243            .collect();
1244        let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1245            match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
1246                DbPool::Sqlite(pool) => {
1247                    let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1248                    let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1249                    let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1250                        Vec::with_capacity(rows.len());
1251                    for row in rows {
1252                        let mut entry = serde_json::Map::new();
1253                        for (col_name, col_meta) in &selected_cols {
1254                            entry.insert((*col_name).clone(), decode_to_json(&row, col_meta)?);
1255                        }
1256                        out.push(entry);
1257                    }
1258                    out
1259                }
1260                DbPool::Postgres(pool) => {
1261                    let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1262                    let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1263                    let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1264                        Vec::with_capacity(rows.len());
1265                    for row in rows {
1266                        let mut entry = serde_json::Map::new();
1267                        for (col_name, col_meta) in &selected_cols {
1268                            entry.insert((*col_name).clone(), decode_pg_to_json(&row, col_meta)?);
1269                        }
1270                        out.push(entry);
1271                    }
1272                    out
1273                }
1274            };
1275
1276        // M2M echo via one batched IN per relation across every
1277        // parent row in `out`. Replaces the per-row, per-relation
1278        // SELECT that ran inside the row loop above (gap2 #16) —
1279        // query budget drops from `1 + N*M` to `1 + count(M2M
1280        // relations)` regardless of how many parent rows came back.
1281        // Each row picks up its `<relation>: [child_id, ...]`
1282        // array via PK→children grouping, with an empty array
1283        // for parents that have no junction rows (preserves the
1284        // per-row helper's "always echo the key" contract).
1285        if !self.meta.m2m_relations.is_empty() && !out.is_empty() {
1286            hydrate_m2m_batched(&self.meta, &pk_name, &mut out).await?;
1287        }
1288
1289        // FK expansion via select_related — one batched
1290        // `IN (...)` per requested FK after the main query, then
1291        // splice the resolved row's JSON in where the integer id
1292        // was. No N+1: each FK costs one round-trip regardless of
1293        // how many parent rows came back. Reuses the same
1294        // `fetch_related_as_json` helper that powers the typed
1295        // `QuerySet::select_related` path so SQLite + Postgres
1296        // dispatch stays in one place.
1297        if !self.select_related.is_empty() && !out.is_empty() {
1298            hydrate_select_related_into(&self.meta, &self.select_related, &mut out).await?;
1299        }
1300        Ok(out)
1301    }
1302
1303    /// Terminal: fetch the first row (LIMIT 1) as a JSON object.
1304    /// Returns `None` when the filter matches zero rows.
1305    pub async fn first_as_json(
1306        mut self,
1307    ) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
1308        self.limit = Some(1);
1309        let mut rows = self.fetch_as_json().await?;
1310        Ok(rows.pop())
1311    }
1312
1313    /// Transaction-aware single-row read: `SELECT <cols> ... LIMIT 1` for
1314    /// the accumulated WHERE, run on the open `tx`. Decodes every model
1315    /// column into a JSON map. Used by REST bulk update to read a row back
1316    /// on the same (uncommitted) transaction so the response reflects the
1317    /// in-flight write. Returns `None` when the filter matches no row.
1318    ///
1319    /// Unlike [`Self::fetch_as_json`] this does NOT hydrate M2M arrays or
1320    /// `select_related` — it's the column-level read the bulk write path
1321    /// needs, matching what the single-object PATCH read-back returns.
1322    pub async fn fetch_one_json_in_tx(
1323        self,
1324        tx: &mut crate::db::Transaction,
1325    ) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
1326        let mut q = Query::select();
1327        q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1328        for c in &self.meta.fields {
1329            q.column(Alias::new(&c.name));
1330        }
1331        let where_clauses = self.effective_where_clauses();
1332        for cond in &where_clauses {
1333            q.cond_where(cond.clone());
1334        }
1335        q.limit(1);
1336
1337        let out = match tx.backend_name() {
1338            "sqlite" => {
1339                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1340                let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1341                let row = sqlx::query_with(&sql, values)
1342                    .fetch_optional(&mut **inner)
1343                    .await?;
1344                match row {
1345                    Some(row) => {
1346                        let mut entry = serde_json::Map::new();
1347                        for col in &self.meta.fields {
1348                            entry.insert(col.name.clone(), decode_to_json(&row, col)?);
1349                        }
1350                        Some(entry)
1351                    }
1352                    None => None,
1353                }
1354            }
1355            _ => {
1356                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1357                let inner = tx.as_pg_mut().expect("postgres backend_name");
1358                let row = sqlx::query_with(&sql, values)
1359                    .fetch_optional(&mut **inner)
1360                    .await?;
1361                match row {
1362                    Some(row) => {
1363                        let mut entry = serde_json::Map::new();
1364                        for col in &self.meta.fields {
1365                            entry.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1366                        }
1367                        Some(entry)
1368                    }
1369                    None => None,
1370                }
1371            }
1372        };
1373        Ok(out)
1374    }
1375
1376    /// Terminal: INSERT one row from a JSON map. Auto-increment integer
1377    /// PKs are omitted when missing or null (the backend assigns).
1378    /// Returns the newly-inserted row as JSON (via RETURNING * on
1379    /// Postgres; via last_insert_rowid → SELECT * on SQLite). The
1380    /// per-column JSON-to-SeaValue coercion goes through the existing
1381    /// `json_to_sea_value` so timestamp / uuid / json paths are the
1382    /// same as the typed Manager::create path.
1383    pub async fn insert_json(
1384        self,
1385        body: &serde_json::Map<String, serde_json::Value>,
1386    ) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
1387        use crate::orm::write::WriteError;
1388
1389        // Phase -1 — normalise the body (strip `noform`, derive
1390        // `slug_from`). Shared with the tx path.
1391        let body_owned: serde_json::Map<String, serde_json::Value>;
1392        let body: &serde_json::Map<String, serde_json::Value> =
1393            match normalise_insert_body(self.meta, body) {
1394                Some(owned) => {
1395                    body_owned = owned;
1396                    &body_owned
1397                }
1398                None => body,
1399            };
1400
1401        // Phase 0 — pre-DB validation against the ambient pool.
1402        let validation_errors = crate::orm::validation::validate_on_create(self.meta, body).await;
1403        if !validation_errors.is_empty() {
1404            return Err(WriteError::Multiple {
1405                errors: validation_errors,
1406            });
1407        }
1408
1409        // Phase 1 — build the INSERT + read back the PK shape.
1410        // Shared with the tx path.
1411        let InsertPlan {
1412            mut q,
1413            pk_name,
1414            pk_ty,
1415        } = build_insert_plan(self.meta, body)?;
1416
1417        // gaps #77: fire `pre_save:<table>` for the dynamic-write
1418        // path so REST endpoints and admin form submits surface in
1419        // signal subscribers (audit logs, cache invalidation, search
1420        // index sync). Payload mirrors the typed `Manager::create`
1421        // shape — `{ "instance": <body JSON>, "created": true }`.
1422        crate::signals::emit_pre_save_by_table(
1423            &self.meta.table,
1424            serde_json::Value::Object(body.clone()),
1425            true,
1426        )
1427        .await;
1428
1429        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
1430            DbPool::Sqlite(pool) => {
1431                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1432                let res = sqlx::query_with(&sql, vals)
1433                    .execute(&pool)
1434                    .await
1435                    .map_err(|e| classify_or_sqlx(e, body))?;
1436                // Re-fetch by PK so the caller sees the row as the DB
1437                // stored it (defaults, autoincrement, server-side
1438                // coercion).
1439                let pk_pred = match pk_ty {
1440                    SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
1441                        Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
1442                    }
1443                    _ => {
1444                        // Client-supplied non-integer PK: pull it back
1445                        // from the body.
1446                        let supplied = body
1447                            .get(&pk_name)
1448                            .cloned()
1449                            .unwrap_or(serde_json::Value::Null);
1450                        let sea_value = crate::orm::write::json_to_sea_value(
1451                            pk_ty, &supplied, false, &pk_name, None,
1452                        )?;
1453                        Expr::col(Alias::new(&pk_name)).eq(sea_value)
1454                    }
1455                };
1456                let mut sel = Query::select();
1457                sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
1458                for c in &self.meta.fields {
1459                    sel.column(Alias::new(&c.name));
1460                }
1461                sel.cond_where(Condition::all().add(pk_pred));
1462                let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
1463                let row = sqlx::query_with(&sel_sql, sel_vals)
1464                    .fetch_one(&pool)
1465                    .await?;
1466                let mut out = serde_json::Map::new();
1467                for col in &self.meta.fields {
1468                    out.insert(col.name.clone(), decode_to_json(&row, col)?);
1469                }
1470                // Phase 2 — write junction rows for every M2M
1471                // relation the body carried. Validation has
1472                // already confirmed the array shape + element
1473                // existence; we just have to mirror the ids into
1474                // the auto-generated `<table>_<field>` table.
1475                let pk_value = out.get(&pk_name).cloned();
1476                write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
1477                // Phase 3 — hydrate M2M arrays back into the
1478                // response so the caller sees `tags: [1, 2]`
1479                // instead of an empty echo.
1480                hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
1481                // gaps #77: post_save with the fully-hydrated row.
1482                crate::signals::emit_post_save_by_table(
1483                    &self.meta.table,
1484                    serde_json::Value::Object(out.clone()),
1485                    true,
1486                )
1487                .await;
1488                Ok(out)
1489            }
1490            DbPool::Postgres(pool) => {
1491                // `RETURNING *` fetches every column of the newly-inserted
1492                // row in one round trip. sea-query's chained
1493                // `returning_col` calls don't accumulate, so we use the
1494                // explicit "all columns" variant.
1495                q.returning_all();
1496                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1497                let row = sqlx::query_with(&sql, vals)
1498                    .fetch_one(&pool)
1499                    .await
1500                    .map_err(|e| classify_or_sqlx(e, body))?;
1501                let mut out = serde_json::Map::new();
1502                for col in &self.meta.fields {
1503                    out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1504                }
1505                let pk_value = out.get(&pk_name).cloned();
1506                write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
1507                hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
1508                // gaps #77: post_save on the Postgres branch.
1509                crate::signals::emit_post_save_by_table(
1510                    &self.meta.table,
1511                    serde_json::Value::Object(out.clone()),
1512                    true,
1513                )
1514                .await;
1515                Ok(out)
1516            }
1517        }
1518    }
1519
1520    /// Terminal: INSERT one row from a JSON map ON the passed
1521    /// transaction. The transactional sibling of [`Self::insert_json`]:
1522    /// the INSERT, the PK re-fetch, the M2M junction writes, the M2M
1523    /// read-back, AND the FK-existence validation all execute on `tx`
1524    /// rather than the ambient pool — so a caller can insert a parent
1525    /// and its children on one transaction and have the whole set
1526    /// commit (or roll back) atomically (`planning/orm_fixes.md` #2).
1527    ///
1528    /// Validation runs against the open transaction
1529    /// ([`crate::orm::validation::validate_on_create_in_tx`]) so a
1530    /// child whose FK targets a parent inserted earlier on the same
1531    /// (uncommitted) `tx` resolves. This is what makes a true-atomic
1532    /// nested create possible without the old compensating-delete
1533    /// dance.
1534    ///
1535    /// **Signals.** Unlike the auto-commit path, this does NOT fire
1536    /// `pre_save` / `post_save`. The row isn't durable until the
1537    /// caller commits `tx`, and a subscriber (audit log, cache
1538    /// invalidation, search index) firing before commit could observe
1539    /// — or react to — a write that then rolls back. The caller owns
1540    /// the commit, so the caller owns whatever post-commit signalling
1541    /// it wants. (The typed `Manager::create_in_tx` path makes the
1542    /// same choice for the same reason.)
1543    pub async fn insert_json_in_tx(
1544        self,
1545        body: &serde_json::Map<String, serde_json::Value>,
1546        tx: &mut crate::db::Transaction,
1547    ) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
1548        use crate::orm::write::WriteError;
1549
1550        // Phase -1 — normalise (shared with the pool path).
1551        let body_owned: serde_json::Map<String, serde_json::Value>;
1552        let body: &serde_json::Map<String, serde_json::Value> =
1553            match normalise_insert_body(self.meta, body) {
1554                Some(owned) => {
1555                    body_owned = owned;
1556                    &body_owned
1557                }
1558                None => body,
1559            };
1560
1561        // Phase 0 — validation reads through the transaction so an FK
1562        // at an uncommitted parent resolves.
1563        let validation_errors =
1564            crate::orm::validation::validate_on_create_in_tx(self.meta, body, tx).await;
1565        if !validation_errors.is_empty() {
1566            return Err(WriteError::Multiple {
1567                errors: validation_errors,
1568            });
1569        }
1570
1571        // Phase 1 — build the INSERT (shared with the pool path).
1572        let InsertPlan {
1573            mut q,
1574            pk_name,
1575            pk_ty,
1576        } = build_insert_plan(self.meta, body)?;
1577
1578        match tx.backend_name() {
1579            "sqlite" => {
1580                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1581                let res = {
1582                    let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1583                    sqlx::query_with(&sql, vals)
1584                        .execute(&mut **inner)
1585                        .await
1586                        .map_err(|e| classify_or_sqlx(e, body))?
1587                };
1588                // Re-fetch by PK on the same tx so the caller sees the
1589                // row the DB stored (defaults, autoincrement).
1590                let pk_pred = match pk_ty {
1591                    SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
1592                        Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
1593                    }
1594                    _ => {
1595                        let supplied = body
1596                            .get(&pk_name)
1597                            .cloned()
1598                            .unwrap_or(serde_json::Value::Null);
1599                        let sea_value = crate::orm::write::json_to_sea_value(
1600                            pk_ty, &supplied, false, &pk_name, None,
1601                        )?;
1602                        Expr::col(Alias::new(&pk_name)).eq(sea_value)
1603                    }
1604                };
1605                let mut sel = Query::select();
1606                sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
1607                for c in &self.meta.fields {
1608                    sel.column(Alias::new(&c.name));
1609                }
1610                sel.cond_where(Condition::all().add(pk_pred));
1611                let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
1612                let mut out = serde_json::Map::new();
1613                {
1614                    let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1615                    let row = sqlx::query_with(&sel_sql, sel_vals)
1616                        .fetch_one(&mut **inner)
1617                        .await?;
1618                    for col in &self.meta.fields {
1619                        out.insert(col.name.clone(), decode_to_json(&row, col)?);
1620                    }
1621                }
1622                // Phase 2/3 — junction writes + read-back on the tx.
1623                let pk_value = out.get(&pk_name).cloned();
1624                write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
1625                hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
1626                Ok(out)
1627            }
1628            _ => {
1629                q.returning_all();
1630                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1631                let mut out = serde_json::Map::new();
1632                {
1633                    let inner = tx.as_pg_mut().expect("postgres backend_name");
1634                    let row = sqlx::query_with(&sql, vals)
1635                        .fetch_one(&mut **inner)
1636                        .await
1637                        .map_err(|e| classify_or_sqlx(e, body))?;
1638                    for col in &self.meta.fields {
1639                        out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1640                    }
1641                }
1642                let pk_value = out.get(&pk_name).cloned();
1643                write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
1644                hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
1645                Ok(out)
1646            }
1647        }
1648    }
1649
1650    /// Transaction-aware sibling of [`Self::update_json`]. PATCH semantics —
1651    /// update only the columns present in `body` for the rows matched by the
1652    /// accumulated WHERE — but every statement runs on the open `tx` so a
1653    /// batch of updates commits or rolls back as a unit. M2M arrays in the
1654    /// body are mirrored into junction tables on the same tx. Returns the
1655    /// number of rows touched.
1656    ///
1657    /// Used by REST bulk update (one tx for the whole array). Mirrors the
1658    /// pool path's validation + `noform`/`slug_from`/`auto_now` handling;
1659    /// the only difference is the execution target.
1660    pub async fn update_json_in_tx(
1661        self,
1662        body: &serde_json::Map<String, serde_json::Value>,
1663        tx: &mut crate::db::Transaction,
1664    ) -> Result<u64, crate::orm::write::WriteError> {
1665        use crate::orm::write::WriteError;
1666
1667        // Phase -1 — strip `noform` columns + derive `slug_from` (mirrors
1668        // the pool path).
1669        let needs_owned = self
1670            .meta
1671            .fields
1672            .iter()
1673            .any(|c| c.noform || c.slug_from.is_some());
1674        let mut body_owned: serde_json::Map<String, serde_json::Value>;
1675        let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
1676            body_owned = body.clone();
1677            for col in &self.meta.fields {
1678                if col.noform {
1679                    body_owned.remove(&col.name);
1680                }
1681            }
1682            crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
1683            &body_owned
1684        } else {
1685            body
1686        };
1687
1688        // Phase 0 — pre-DB validation, same shape as `update_json`. FK
1689        // existence reads through the open tx so an FK at an uncommitted
1690        // sibling row in the same batch resolves.
1691        let validation_errors =
1692            crate::orm::validation::validate_on_update_in_tx(self.meta, body, tx).await;
1693        if !validation_errors.is_empty() {
1694            return Err(WriteError::Multiple {
1695                errors: validation_errors,
1696            });
1697        }
1698
1699        let mut q = Query::update();
1700        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
1701        let mut any = false;
1702        for col in &self.meta.fields {
1703            if col.primary_key {
1704                continue;
1705            }
1706            let Some(json) = body.get(&col.name) else {
1707                if col.auto_now {
1708                    let now_value = crate::orm::write::now_for_column(col.ty);
1709                    q.value(Alias::new(&col.name), now_value);
1710                    any = true;
1711                }
1712                continue;
1713            };
1714            validate_numeric_bounds(col, json)?;
1715            if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
1716                if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
1717                    return Err(WriteError::Validator {
1718                        field: col.name.clone(),
1719                        message: e.to_string(),
1720                    });
1721                }
1722            }
1723            let sea_value = crate::orm::write::json_to_sea_value(
1724                col.ty,
1725                json,
1726                col.nullable,
1727                &col.name,
1728                fk_target_pk_sql_type(col),
1729            )?;
1730            q.value(Alias::new(&col.name), sea_value);
1731            any = true;
1732        }
1733        let touches_m2m = self
1734            .meta
1735            .m2m_relations
1736            .iter()
1737            .any(|r| body.contains_key(&r.field_name));
1738        if !any && !touches_m2m {
1739            return Ok(0);
1740        }
1741        let where_clauses = self.effective_where_clauses();
1742        for cond in &where_clauses {
1743            q.cond_where(cond.clone());
1744        }
1745
1746        // The PKs the WHERE matches — needed for the M2M mirror below. We
1747        // read them on the same tx so the bulk update sees its own
1748        // uncommitted siblings.
1749        let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
1750            Some(pk_col) => {
1751                collect_parent_pks_in_tx(self.meta, pk_col, &where_clauses, tx).await?
1752            }
1753            None => Vec::new(),
1754        };
1755
1756        if any {
1757            match tx.backend_name() {
1758                "sqlite" => {
1759                    let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1760                    let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1761                    sqlx::query_with(&sql, values)
1762                        .execute(&mut **inner)
1763                        .await
1764                        .map_err(|e| classify_or_sqlx(e, body))?;
1765                }
1766                _ => {
1767                    let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1768                    let inner = tx.as_pg_mut().expect("postgres backend_name");
1769                    sqlx::query_with(&sql, values)
1770                        .execute(&mut **inner)
1771                        .await
1772                        .map_err(|e| classify_or_sqlx(e, body))?;
1773                }
1774            }
1775        }
1776        for pk in &parent_pks {
1777            write_m2m_junctions_in_tx(self.meta, Some(pk), body, tx).await?;
1778        }
1779        Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
1780    }
1781
1782    /// Transaction-aware sibling of [`Self::delete`]. Deletes (or
1783    /// soft-deletes, for a `soft_delete` model) the rows matched by the
1784    /// accumulated WHERE on the open `tx`, so a batch of deletes commits or
1785    /// rolls back as a unit. Returns the number of rows affected.
1786    ///
1787    /// Soft-delete models stamp `deleted_at = now()` (consistent with the
1788    /// pool path / gaps #35) unless [`Self::hard_delete`] was set.
1789    pub async fn delete_in_tx(self, tx: &mut crate::db::Transaction) -> Result<u64, DynError> {
1790        let soft = self.meta.soft_delete && !self.hard_delete;
1791        let where_clauses = if soft {
1792            self.live_where_clauses()
1793        } else {
1794            self.effective_where_clauses()
1795        };
1796
1797        // Build the SQL for the active backend. Soft-delete is an UPDATE
1798        // stamping `deleted_at`; a hard delete is a DELETE. Each statement
1799        // type lowers to `(sql, values)` so the execute arm is uniform.
1800        let table = crate::db::router::schema_qualified_table(&self.meta.table);
1801        let build = |is_sqlite: bool| {
1802            if soft {
1803                let mut u = Query::update();
1804                u.table(table.clone());
1805                u.value(
1806                    Alias::new("deleted_at"),
1807                    sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
1808                );
1809                for cond in &where_clauses {
1810                    u.cond_where(cond.clone());
1811                }
1812                if is_sqlite {
1813                    u.build_sqlx(SqliteQueryBuilder)
1814                } else {
1815                    u.build_sqlx(PostgresQueryBuilder)
1816                }
1817            } else {
1818                let mut d = Query::delete();
1819                d.from_table(table.clone());
1820                for cond in &where_clauses {
1821                    d.cond_where(cond.clone());
1822                }
1823                if is_sqlite {
1824                    d.build_sqlx(SqliteQueryBuilder)
1825                } else {
1826                    d.build_sqlx(PostgresQueryBuilder)
1827                }
1828            }
1829        };
1830
1831        let rows_affected = match tx.backend_name() {
1832            "sqlite" => {
1833                let (sql, values) = build(true);
1834                let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1835                sqlx::query_with(&sql, values)
1836                    .execute(&mut **inner)
1837                    .await?
1838                    .rows_affected()
1839            }
1840            _ => {
1841                let (sql, values) = build(false);
1842                let inner = tx.as_pg_mut().expect("postgres backend_name");
1843                sqlx::query_with(&sql, values)
1844                    .execute(&mut **inner)
1845                    .await?
1846                    .rows_affected()
1847            }
1848        };
1849        Ok(rows_affected)
1850    }
1851
1852    /// Terminal: PATCH semantics — update only the columns present
1853    /// in `body`. The accumulated WHERE clauses narrow the target
1854    /// row(s). Returns the number of rows affected.
1855    pub async fn update_json(
1856        self,
1857        body: &serde_json::Map<String, serde_json::Value>,
1858    ) -> Result<u64, crate::orm::write::WriteError> {
1859        use crate::orm::write::WriteError;
1860
1861        // Phase -1 — strip `noform` columns (server-managed
1862        // fields the client must not overwrite).
1863        //
1864        // Gap 109: also auto-derive `slug_from` columns when the
1865        // source field is part of the update body (see
1866        // `apply_slug_from`'s update guard for why).
1867        let needs_owned = self
1868            .meta
1869            .fields
1870            .iter()
1871            .any(|c| c.noform || c.slug_from.is_some());
1872        let mut body_owned: serde_json::Map<String, serde_json::Value>;
1873        let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
1874            body_owned = body.clone();
1875            for col in &self.meta.fields {
1876                if col.noform {
1877                    body_owned.remove(&col.name);
1878                }
1879            }
1880            crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
1881            &body_owned
1882        } else {
1883            body
1884        };
1885
1886        // Phase 0 — pre-DB validation. Update-shape: required-
1887        // field check only complains about EXPLICIT blanks
1888        // (preserving the partial-update contract); FK existence
1889        // + choices + M2M shape apply to whatever the body
1890        // carries.
1891        let validation_errors = crate::orm::validation::validate_on_update(&self.meta, body).await;
1892        if !validation_errors.is_empty() {
1893            return Err(WriteError::Multiple {
1894                errors: validation_errors,
1895            });
1896        }
1897
1898        let mut q = Query::update();
1899        q.table(crate::db::router::schema_qualified_table(&self.meta.table));
1900        let mut any = false;
1901        for col in &self.meta.fields {
1902            if col.primary_key {
1903                continue;
1904            }
1905            let Some(json) = body.get(&col.name) else {
1906                // BUG-5 fix: `auto_now` columns refresh to
1907                // `Utc::now()` on every update, even if the body
1908                // doesn't mention them. `auto_now_add` columns
1909                // stay frozen (they fired on create only).
1910                if col.auto_now {
1911                    let now_value = crate::orm::write::now_for_column(col.ty);
1912                    q.value(Alias::new(&col.name), now_value);
1913                    any = true;
1914                }
1915                continue;
1916            };
1917            validate_numeric_bounds(col, json)?;
1918            // BUG-11/12/13: same wrapper-type pre-validation as
1919            // insert_json.
1920            if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
1921                if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
1922                    return Err(WriteError::Validator {
1923                        field: col.name.clone(),
1924                        message: e.to_string(),
1925                    });
1926                }
1927            }
1928            let sea_value = crate::orm::write::json_to_sea_value(
1929                col.ty,
1930                json,
1931                col.nullable,
1932                &col.name,
1933                fk_target_pk_sql_type(col),
1934            )?;
1935            q.value(Alias::new(&col.name), sea_value);
1936            any = true;
1937        }
1938        // Detect whether the body wants to touch any M2M
1939        // relations. If so, we'll write junctions *after* the
1940        // UPDATE — and we'll need to know the matched parent
1941        // PKs even when no regular columns are being changed.
1942        let touches_m2m = self
1943            .meta
1944            .m2m_relations
1945            .iter()
1946            .any(|r| body.contains_key(&r.field_name));
1947        if !any && !touches_m2m {
1948            return Ok(0);
1949        }
1950        let where_clauses = self.effective_where_clauses();
1951        for cond in &where_clauses {
1952            q.cond_where(cond.clone());
1953        }
1954        // Find every parent_id matched by the filter so we can
1955        // mirror the M2M arrays into each one's junction AND fire
1956        // `bulk_post_save:<table>` with the affected ids (gaps #77).
1957        // Done BEFORE the UPDATE so:
1958        //   - a no-op (`any = false`, `touches_m2m = true`) still
1959        //     gets the M2M write, and
1960        //   - the signal payload carries the exact PK set the WHERE
1961        //     matched, even when the UPDATE itself is a no-op
1962        //     (matches the typed `bulk_post_save` semantics: the
1963        //     subscriber learns "these rows were targeted" rather
1964        //     than guessing from `rows_affected`).
1965        let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
1966            Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &self.where_clauses).await?,
1967            None => Vec::new(),
1968        };
1969
1970        match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
1971            DbPool::Sqlite(pool) => {
1972                if any {
1973                    let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1974                    sqlx::query_with(&sql, values)
1975                        .execute(&pool)
1976                        .await
1977                        .map_err(|e| classify_or_sqlx(e, body))?;
1978                }
1979                for pk in &parent_pks {
1980                    write_m2m_junctions(&self.meta, Some(pk), body).await?;
1981                }
1982                // gaps #77: `bulk_post_save:<table>` fires after the
1983                // UPDATE on the dynamic path. `created = false` because
1984                // this is UPDATE (matches the typed bulk-save convention
1985                // from gap #38). `ids` is whatever the WHERE matched —
1986                // collect_parent_pks already ran above.
1987                crate::signals::emit_bulk_post_save_by_table(
1988                    &self.meta.table,
1989                    parent_pks.clone(),
1990                    false,
1991                )
1992                .await;
1993                Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
1994            }
1995            DbPool::Postgres(pool) => {
1996                if any {
1997                    let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1998                    sqlx::query_with(&sql, values)
1999                        .execute(&pool)
2000                        .await
2001                        .map_err(|e| classify_or_sqlx(e, body))?;
2002                }
2003                for pk in &parent_pks {
2004                    write_m2m_junctions(&self.meta, Some(pk), body).await?;
2005                }
2006                crate::signals::emit_bulk_post_save_by_table(
2007                    &self.meta.table,
2008                    parent_pks.clone(),
2009                    false,
2010                )
2011                .await;
2012                Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
2013            }
2014        }
2015    }
2016}
2017
2018/// Decode one SQLite cell to its template-friendly string form.
2019///
2020/// Public so admin-like crates can decode rows they fetched outside
2021/// `DynQuerySet` (typed row paths, ad-hoc joins). The dispatch mirrors
2022/// `bind_form_value`'s parse step in reverse.
2023pub fn decode_to_string(
2024    row: &sqlx::sqlite::SqliteRow,
2025    col: &Column,
2026) -> Result<String, sqlx::Error> {
2027    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2028    use serde_json::Value;
2029    use uuid::Uuid;
2030
2031    let name = col.name.as_str();
2032    if col.nullable {
2033        return Ok(match col.ty {
2034            SqlType::SmallInt | SqlType::Integer => row
2035                .try_get::<Option<i32>, _>(name)?
2036                .map_or(String::new(), |v| v.to_string()),
2037            SqlType::BigInt => row
2038                .try_get::<Option<i64>, _>(name)?
2039                .map_or(String::new(), |v| v.to_string()),
2040            SqlType::Real => row
2041                .try_get::<Option<f32>, _>(name)?
2042                .map_or(String::new(), |v| v.to_string()),
2043            SqlType::Double => row
2044                .try_get::<Option<f64>, _>(name)?
2045                .map_or(String::new(), |v| v.to_string()),
2046            SqlType::Boolean => row
2047                .try_get::<Option<bool>, _>(name)?
2048                .map_or(String::new(), |v| {
2049                    if v { "true" } else { "false" }.to_string()
2050                }),
2051            SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2052            SqlType::Date => row
2053                .try_get::<Option<NaiveDate>, _>(name)?
2054                .map_or(String::new(), |v| v.to_string()),
2055            SqlType::Time => row
2056                .try_get::<Option<NaiveTime>, _>(name)?
2057                .map_or(String::new(), |v| v.to_string()),
2058            SqlType::Timestamptz => row
2059                .try_get::<Option<DateTime<Utc>>, _>(name)?
2060                .map_or(String::new(), |v| v.to_rfc3339()),
2061            SqlType::Uuid => row
2062                .try_get::<Option<Uuid>, _>(name)?
2063                .map_or(String::new(), |v| v.to_string()),
2064            SqlType::Json => row
2065                .try_get::<Option<Value>, _>(name)?
2066                .map_or(String::new(), |v| v.to_string()),
2067            SqlType::Array(_) => panic_array_unsupported(&col.name),
2068            SqlType::Inet
2069            | SqlType::Cidr
2070            | SqlType::MacAddr
2071            | SqlType::Xml
2072            | SqlType::Ltree
2073            | SqlType::Bit
2074            | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2075            // PK lift (review #3): FK columns to a String/Uuid-PK target
2076            // store TEXT/UUID, not BIGINT — decode by the target PK type so
2077            // the admin display path doesn't fail on a non-i64 FK.
2078            SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2079                Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2080                Some(SqlType::Uuid) => row
2081                    .try_get::<Option<Uuid>, _>(name)?
2082                    .map_or(String::new(), |v| v.to_string()),
2083                _ => row
2084                    .try_get::<Option<i64>, _>(name)?
2085                    .map_or(String::new(), |v| v.to_string()),
2086            },
2087            SqlType::Bytes => row
2088                .try_get::<Option<Vec<u8>>, _>(name)?
2089                .map_or(String::new(), |b| hex_encode(&b)),
2090            SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2091        });
2092    }
2093    Ok(match col.ty {
2094        SqlType::SmallInt | SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
2095        SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
2096        SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
2097        SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
2098        SqlType::Boolean => if row.try_get::<bool, _>(name)? {
2099            "true"
2100        } else {
2101            "false"
2102        }
2103        .to_string(),
2104        SqlType::Text => row.try_get::<String, _>(name)?,
2105        SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
2106        SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
2107        SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
2108        SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
2109        SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
2110        SqlType::Array(_) => panic_array_unsupported(&col.name),
2111        SqlType::Inet
2112        | SqlType::Cidr
2113        | SqlType::MacAddr
2114        | SqlType::Xml
2115        | SqlType::Ltree
2116        | SqlType::Bit
2117        | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2118        SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2119            Some(SqlType::Text) => row.try_get::<String, _>(name)?,
2120            Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
2121            _ => row.try_get::<i64, _>(name)?.to_string(),
2122        },
2123        SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
2124        SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2125    })
2126}
2127
2128/// Decode one Postgres cell to its template-friendly string form.
2129///
2130/// Sibling of [`decode_to_string`] for the Postgres backend. Same
2131/// dispatch table on `SqlType`; the only difference is the executor
2132/// type (`PgRow` instead of `SqliteRow`) and a handful of types that
2133/// Postgres binds differently — `i32` for SmallInt instead of SQLite's
2134/// affinity-coerced `i32`, native bool, native chrono / uuid /
2135/// serde_json::Value. Array / Inet / Cidr / MacAddr / FullText all
2136/// live on Postgres natively but are decoded as their JSON string
2137/// shape here (the admin templates only need a printable form).
2138pub fn decode_pg_to_string(
2139    row: &sqlx::postgres::PgRow,
2140    col: &Column,
2141) -> Result<String, sqlx::Error> {
2142    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2143    use serde_json::Value;
2144    use uuid::Uuid;
2145
2146    let name = col.name.as_str();
2147    if col.nullable {
2148        return Ok(match col.ty {
2149            SqlType::SmallInt => row
2150                .try_get::<Option<i16>, _>(name)?
2151                .map_or(String::new(), |v| v.to_string()),
2152            SqlType::Integer => row
2153                .try_get::<Option<i32>, _>(name)?
2154                .map_or(String::new(), |v| v.to_string()),
2155            SqlType::BigInt => row
2156                .try_get::<Option<i64>, _>(name)?
2157                .map_or(String::new(), |v| v.to_string()),
2158            SqlType::Real => row
2159                .try_get::<Option<f32>, _>(name)?
2160                .map_or(String::new(), |v| v.to_string()),
2161            SqlType::Double => row
2162                .try_get::<Option<f64>, _>(name)?
2163                .map_or(String::new(), |v| v.to_string()),
2164            SqlType::Boolean => row
2165                .try_get::<Option<bool>, _>(name)?
2166                .map_or(String::new(), |v| {
2167                    if v { "true" } else { "false" }.to_string()
2168                }),
2169            SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2170            SqlType::Date => row
2171                .try_get::<Option<NaiveDate>, _>(name)?
2172                .map_or(String::new(), |v| v.to_string()),
2173            SqlType::Time => row
2174                .try_get::<Option<NaiveTime>, _>(name)?
2175                .map_or(String::new(), |v| v.to_string()),
2176            SqlType::Timestamptz => row
2177                .try_get::<Option<DateTime<Utc>>, _>(name)?
2178                .map_or(String::new(), |v| v.to_rfc3339()),
2179            SqlType::Uuid => row
2180                .try_get::<Option<Uuid>, _>(name)?
2181                .map_or(String::new(), |v| v.to_string()),
2182            SqlType::Json => row
2183                .try_get::<Option<Value>, _>(name)?
2184                .map_or(String::new(), |v| v.to_string()),
2185            // Array / network / FullText decode as their printable forms.
2186            // Pg drivers hand back typed Vec / IpNetwork / etc.; we lift
2187            // through a best-effort string decode for now since the admin
2188            // only needs a glance. Decode failures fall through to empty
2189            // string (the admin still renders something useful).
2190            SqlType::Array(_)
2191            | SqlType::Inet
2192            | SqlType::Cidr
2193            | SqlType::MacAddr
2194            | SqlType::Xml
2195            | SqlType::Ltree
2196            | SqlType::Bit
2197            | SqlType::FullText => row
2198                .try_get::<Option<String>, _>(name)
2199                .ok()
2200                .flatten()
2201                .unwrap_or_default(),
2202            // PK lift (review #3): FK to a String/Uuid-PK target is a
2203            // TEXT/native-uuid column on PG — decode by the target PK type.
2204            SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2205                Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2206                Some(SqlType::Uuid) => row
2207                    .try_get::<Option<Uuid>, _>(name)?
2208                    .map_or(String::new(), |v| v.to_string()),
2209                _ => row
2210                    .try_get::<Option<i64>, _>(name)?
2211                    .map_or(String::new(), |v| v.to_string()),
2212            },
2213            SqlType::Bytes => row
2214                .try_get::<Option<Vec<u8>>, _>(name)?
2215                .map_or(String::new(), |b| hex_encode(&b)),
2216            SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2217        });
2218    }
2219    Ok(match col.ty {
2220        SqlType::SmallInt => row.try_get::<i16, _>(name)?.to_string(),
2221        SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
2222        SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
2223        SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
2224        SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
2225        SqlType::Boolean => if row.try_get::<bool, _>(name)? {
2226            "true"
2227        } else {
2228            "false"
2229        }
2230        .to_string(),
2231        SqlType::Text => row.try_get::<String, _>(name)?,
2232        SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
2233        SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
2234        SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
2235        SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
2236        SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
2237        // Same as the nullable branch: lift through best-effort string.
2238        SqlType::Array(_)
2239        | SqlType::Inet
2240        | SqlType::Cidr
2241        | SqlType::MacAddr
2242        | SqlType::Xml
2243        | SqlType::Ltree
2244        | SqlType::Bit
2245        | SqlType::FullText => row.try_get::<String, _>(name).unwrap_or_default(),
2246        SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2247            Some(SqlType::Text) => row.try_get::<String, _>(name)?,
2248            Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
2249            _ => row.try_get::<i64, _>(name)?.to_string(),
2250        },
2251        SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
2252        SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2253    })
2254}
2255
2256/// Decode one SQLite cell to a `serde_json::Value` that preserves the
2257/// column's JSON shape (numbers stay numbers, booleans stay booleans,
2258/// dates render as ISO strings, JSON columns nest verbatim, NULLs
2259/// become `Value::Null`). This is the row → JSON converter the REST
2260/// plugin's auto-CRUD list / detail handlers feed straight into their
2261/// HTTP body.
2262/// Alias-aware sibling of [`decode_to_json`] — same decode logic but
2263/// pulls from a different column name (the aliased name in a JOIN
2264/// SELECT). Used by `QuerySet::join_related` to read child columns
2265/// out of a JOIN row where every child column is exposed as
2266/// `<field>__<col>`. Cheap clone of `Column` because the existing
2267/// decoder is keyed off `col.name.as_str()`.
2268pub fn decode_to_json_aliased(
2269    row: &sqlx::sqlite::SqliteRow,
2270    col: &Column,
2271    alias: &str,
2272) -> Result<serde_json::Value, sqlx::Error> {
2273    let mut aliased = col.clone();
2274    aliased.name = alias.to_string();
2275    decode_to_json(row, &aliased)
2276}
2277
2278/// Postgres counterpart to [`decode_to_json_aliased`].
2279pub fn decode_pg_to_json_aliased(
2280    row: &sqlx::postgres::PgRow,
2281    col: &Column,
2282    alias: &str,
2283) -> Result<serde_json::Value, sqlx::Error> {
2284    let mut aliased = col.clone();
2285    aliased.name = alias.to_string();
2286    decode_pg_to_json(row, &aliased)
2287}
2288
2289/// PK lift Pass A — when `col` is an FK column (`SqlType::ForeignKey`)
2290/// pointing at a model whose PK is a `String` / `Uuid` (not the
2291/// default `i64`), the decoder needs to bind as `String` instead of
2292/// `i64` or sqlx errors with "Rust type i64 not compatible with SQL
2293/// type TEXT".
2294///
2295/// Looks the target table up in the model registry and reads its
2296/// PK column's `SqlType`. Returns `None` when:
2297///   - `col` isn't an FK (caller falls back to the normal arm),
2298///   - the FK has no target (defensive — shouldn't happen in
2299///     practice since the macro always sets `fk_target` on FK
2300///     columns),
2301///   - the target isn't in the registry (only possible when an
2302///     internal call site fires before `App::build()` finishes
2303///     wiring plugins).
2304///
2305/// PK lift Pass E — O(1) lookup via the `pk_meta_for_table` cache
2306/// (was O(n) `Vec<ModelMeta>` clone + linear scan per call). The
2307/// cache initialises lazily on first post-`App::build` call and
2308/// serves from a `HashMap` for every subsequent lookup. In a hot
2309/// decode loop (e.g. 1000 rows × 50 columns × per-FK decode) this
2310/// drops the per-row registry-walk cost from a few milliseconds
2311/// to a single hashmap probe.
2312fn fk_target_pk_sql_type(col: &Column) -> Option<SqlType> {
2313    if !matches!(col.ty, SqlType::ForeignKey) {
2314        return None;
2315    }
2316    let target_table = col.fk_target.as_deref()?;
2317    crate::migrate::pk_meta_for_table(target_table).map(|(_, ty)| ty)
2318}
2319
2320pub fn decode_to_json(
2321    row: &sqlx::sqlite::SqliteRow,
2322    col: &Column,
2323) -> Result<serde_json::Value, sqlx::Error> {
2324    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2325    use serde_json::Value;
2326    use uuid::Uuid;
2327
2328    let name = col.name.as_str();
2329    if col.nullable {
2330        return Ok(match col.ty {
2331            SqlType::SmallInt | SqlType::Integer => row
2332                .try_get::<Option<i32>, _>(name)?
2333                .map_or(Value::Null, Value::from),
2334            SqlType::BigInt => row
2335                .try_get::<Option<i64>, _>(name)?
2336                .map_or(Value::Null, Value::from),
2337            SqlType::Real => row
2338                .try_get::<Option<f32>, _>(name)?
2339                .map_or(Value::Null, |v| Value::from(v as f64)),
2340            SqlType::Double => row
2341                .try_get::<Option<f64>, _>(name)?
2342                .map_or(Value::Null, Value::from),
2343            SqlType::Boolean => row
2344                .try_get::<Option<bool>, _>(name)?
2345                .map_or(Value::Null, Value::from),
2346            SqlType::Text => row
2347                .try_get::<Option<String>, _>(name)?
2348                .map_or(Value::Null, Value::from),
2349            SqlType::Date => row
2350                .try_get::<Option<NaiveDate>, _>(name)?
2351                .map_or(Value::Null, |v| Value::from(v.to_string())),
2352            SqlType::Time => row
2353                .try_get::<Option<NaiveTime>, _>(name)?
2354                .map_or(Value::Null, |v| Value::from(v.to_string())),
2355            SqlType::Timestamptz => row
2356                .try_get::<Option<DateTime<Utc>>, _>(name)?
2357                .map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
2358            SqlType::Uuid => row
2359                .try_get::<Option<Uuid>, _>(name)?
2360                .map_or(Value::Null, |v| Value::from(v.to_string())),
2361            SqlType::Json => row
2362                .try_get::<Option<Value>, _>(name)?
2363                .unwrap_or(Value::Null),
2364            SqlType::Array(_) => panic_array_unsupported(&col.name),
2365            SqlType::Inet
2366            | SqlType::Cidr
2367            | SqlType::MacAddr
2368            | SqlType::Xml
2369            | SqlType::Ltree
2370            | SqlType::Bit
2371            | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2372            // PK lift Pass A: FK columns that target a String /
2373            // Uuid PK store their values as TEXT, not BIGINT. Probe
2374            // the target meta to pick the right Rust type for the
2375            // bind.
2376            SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2377                Some(SqlType::Text) => row
2378                    .try_get::<Option<String>, _>(name)?
2379                    .map_or(Value::Null, Value::from),
2380                Some(SqlType::Uuid) => row
2381                    .try_get::<Option<Uuid>, _>(name)?
2382                    .map_or(Value::Null, |v| Value::from(v.to_string())),
2383                _ => row
2384                    .try_get::<Option<i64>, _>(name)?
2385                    .map_or(Value::Null, Value::from),
2386            },
2387            SqlType::Bytes => row
2388                .try_get::<Option<Vec<u8>>, _>(name)?
2389                .map_or(Value::Null, |b| bytes_to_json(&b)),
2390            SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2391        });
2392    }
2393    Ok(match col.ty {
2394        SqlType::SmallInt | SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
2395        SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
2396        SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
2397        SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
2398        SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
2399        SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
2400        SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
2401        SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
2402        SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
2403        SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2404        SqlType::Json => row.try_get::<Value, _>(name)?,
2405        SqlType::Array(_) => panic_array_unsupported(&col.name),
2406        SqlType::Inet
2407        | SqlType::Cidr
2408        | SqlType::MacAddr
2409        | SqlType::Xml
2410        | SqlType::Ltree
2411        | SqlType::Bit
2412        | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2413        // PK lift Pass A: see the nullable arm above for the same
2414        // String/Uuid target dispatch.
2415        SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2416            Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
2417            Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2418            _ => Value::from(row.try_get::<i64, _>(name)?),
2419        },
2420        SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
2421        SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2422    })
2423}
2424
2425/// Postgres sibling of [`decode_to_json`]. Same dispatch table; the
2426/// only difference is the executor type (`PgRow`) and the i16 path
2427/// for SmallInt (PG binds i16, SQLite affinity-coerces to i32).
2428pub fn decode_pg_to_json(
2429    row: &sqlx::postgres::PgRow,
2430    col: &Column,
2431) -> Result<serde_json::Value, sqlx::Error> {
2432    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2433    use serde_json::Value;
2434    use uuid::Uuid;
2435
2436    let name = col.name.as_str();
2437    if col.nullable {
2438        return Ok(match col.ty {
2439            SqlType::SmallInt => row
2440                .try_get::<Option<i16>, _>(name)?
2441                .map_or(Value::Null, Value::from),
2442            SqlType::Integer => row
2443                .try_get::<Option<i32>, _>(name)?
2444                .map_or(Value::Null, Value::from),
2445            SqlType::BigInt => row
2446                .try_get::<Option<i64>, _>(name)?
2447                .map_or(Value::Null, Value::from),
2448            SqlType::Real => row
2449                .try_get::<Option<f32>, _>(name)?
2450                .map_or(Value::Null, |v| Value::from(v as f64)),
2451            SqlType::Double => row
2452                .try_get::<Option<f64>, _>(name)?
2453                .map_or(Value::Null, Value::from),
2454            SqlType::Boolean => row
2455                .try_get::<Option<bool>, _>(name)?
2456                .map_or(Value::Null, Value::from),
2457            SqlType::Text => row
2458                .try_get::<Option<String>, _>(name)?
2459                .map_or(Value::Null, Value::from),
2460            SqlType::Date => row
2461                .try_get::<Option<NaiveDate>, _>(name)?
2462                .map_or(Value::Null, |v| Value::from(v.to_string())),
2463            SqlType::Time => row
2464                .try_get::<Option<NaiveTime>, _>(name)?
2465                .map_or(Value::Null, |v| Value::from(v.to_string())),
2466            SqlType::Timestamptz => row
2467                .try_get::<Option<DateTime<Utc>>, _>(name)?
2468                .map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
2469            SqlType::Uuid => row
2470                .try_get::<Option<Uuid>, _>(name)?
2471                .map_or(Value::Null, |v| Value::from(v.to_string())),
2472            SqlType::Json => row
2473                .try_get::<Option<Value>, _>(name)?
2474                .unwrap_or(Value::Null),
2475            SqlType::Array(_)
2476            | SqlType::Inet
2477            | SqlType::Cidr
2478            | SqlType::MacAddr
2479            | SqlType::Xml
2480            | SqlType::Ltree
2481            | SqlType::Bit
2482            | SqlType::FullText => row
2483                .try_get::<Option<String>, _>(name)
2484                .ok()
2485                .flatten()
2486                .map_or(Value::Null, Value::from),
2487            // PK lift Pass A: see the SQLite path for the same
2488            // String/Uuid target dispatch.
2489            SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2490                Some(SqlType::Text) => row
2491                    .try_get::<Option<String>, _>(name)?
2492                    .map_or(Value::Null, Value::from),
2493                Some(SqlType::Uuid) => row
2494                    .try_get::<Option<Uuid>, _>(name)?
2495                    .map_or(Value::Null, |v| Value::from(v.to_string())),
2496                _ => row
2497                    .try_get::<Option<i64>, _>(name)?
2498                    .map_or(Value::Null, Value::from),
2499            },
2500            SqlType::Bytes => row
2501                .try_get::<Option<Vec<u8>>, _>(name)?
2502                .map_or(Value::Null, |b| bytes_to_json(&b)),
2503            SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2504        });
2505    }
2506    Ok(match col.ty {
2507        SqlType::SmallInt => Value::from(row.try_get::<i16, _>(name)?),
2508        SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
2509        SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
2510        SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
2511        SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
2512        SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
2513        SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
2514        SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
2515        SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
2516        SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
2517        SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2518        SqlType::Json => row.try_get::<Value, _>(name)?,
2519        SqlType::Array(_)
2520        | SqlType::Inet
2521        | SqlType::Cidr
2522        | SqlType::MacAddr
2523        | SqlType::Xml
2524        | SqlType::Ltree
2525        | SqlType::Bit
2526        | SqlType::FullText => row
2527            .try_get::<String, _>(name)
2528            .map(Value::from)
2529            .unwrap_or(Value::Null),
2530        // PK lift Pass A: FK columns dispatch on their target's PK
2531        // type (i64 / String / Uuid).
2532        SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2533            Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
2534            Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2535            _ => Value::from(row.try_get::<i64, _>(name)?),
2536        },
2537        SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
2538        SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2539    })
2540}
2541
2542/// Convert one form-submitted string into a `SeaValue` ready for
2543/// binding. Handles the "empty + nullable" case explicitly so a blank
2544/// form field produces SQL NULL instead of an empty-string mismatch
2545/// for numeric columns. The rest of the conversion delegates to
2546/// [`json_to_sea_value`] by wrapping the value as `JsonValue::String`,
2547/// which already understands "true"/"false" booleans and RFC3339
2548/// timestamps the HTML form layer hands in.
2549fn form_str_to_sea_value(col: &Column, raw: &str) -> Result<SeaValue, WriteError> {
2550    if raw.is_empty() {
2551        if col.ty == SqlType::Boolean {
2552            // Unchecked checkbox = false, not NULL.
2553            return Ok(SeaValue::Bool(Some(false)));
2554        }
2555        if col.nullable {
2556            return Ok(null_for(col.ty));
2557        }
2558        return Err(WriteError::RequiredFieldMissing {
2559            field: col.name.clone(),
2560        });
2561    }
2562    // #116: JSON / Array columns must PARSE the form string so the
2563    // typed input becomes a real JsonValue::Object / Array / etc.
2564    // Pre-fix every form value was wrapped as JsonValue::String —
2565    // typing `{"k": 1}` into a JSON textarea stored the literal
2566    // text `"\"{\\\"k\\\": 1}\""` rather than the object.
2567    //
2568    // serde_json::from_str rejects unbalanced braces / missing
2569    // quotes / etc.; we surface that as a WriteError::Validator so
2570    // the admin's inline error renders "Not valid JSON: <reason>"
2571    // instead of either silently storing junk OR crashing the
2572    // write with a raw sqlx Protocol error downstream.
2573    if matches!(col.ty, SqlType::Json | SqlType::Array(_)) {
2574        let parsed: serde_json::Value =
2575            serde_json::from_str(raw).map_err(|e| WriteError::Validator {
2576                field: col.name.clone(),
2577                message: format!("Not valid JSON: {e}"),
2578            })?;
2579        return json_to_sea_value(col.ty, &parsed, col.nullable, &col.name, None);
2580    }
2581    if matches!(col.ty, SqlType::ForeignKey) {
2582        return match fk_target_pk_sql_type(col) {
2583            Some(SqlType::Text) => Ok(SeaValue::String(Some(Box::new(raw.to_string())))),
2584            Some(SqlType::Uuid) => uuid::Uuid::parse_str(raw)
2585                .map(|v| SeaValue::Uuid(Some(Box::new(v))))
2586                .map_err(|_| WriteError::TypeMismatch {
2587                    field: col.name.clone(),
2588                    expected: SqlType::Uuid,
2589                    got: raw.to_string(),
2590                }),
2591            _ => raw
2592                .parse::<i64>()
2593                .map(|v| SeaValue::BigInt(Some(v)))
2594                .map_err(|_| WriteError::TypeMismatch {
2595                    field: col.name.clone(),
2596                    expected: SqlType::BigInt,
2597                    got: raw.to_string(),
2598                }),
2599        };
2600    }
2601    let json = serde_json::Value::String(raw.to_string());
2602    json_to_sea_value(col.ty, &json, col.nullable, &col.name, None)
2603}
2604
2605/// Hex-encode a byte slice, lowercase, no `0x` prefix. The
2606/// human-readable rendering for `SqlType::Bytes` columns when the
2607/// admin / debug tooling asks for a string form.
2608fn hex_encode(bytes: &[u8]) -> String {
2609    let mut out = String::with_capacity(bytes.len() * 2);
2610    for b in bytes {
2611        out.push_str(&format!("{b:02x}"));
2612    }
2613    out
2614}
2615
2616/// Render bytes as a JSON array of u8 numbers. Symmetric with the
2617/// `json_to_sea_value` path that accepts the same shape on input.
2618fn bytes_to_json(bytes: &[u8]) -> serde_json::Value {
2619    serde_json::Value::Array(bytes.iter().map(|b| serde_json::Value::from(*b)).collect())
2620}
2621
2622fn panic_array_unsupported(column: &str) -> ! {
2623    panic!(
2624        "DynQuerySet: column `{column}` is a Postgres-only Array; the \
2625         field/backend system check should have failed boot."
2626    )
2627}
2628
2629fn panic_pg_only_unsupported(column: &str) -> ! {
2630    panic!(
2631        "DynQuerySet: column `{column}` is a Postgres-only network type \
2632         (Inet/Cidr/MacAddr); the field/backend system check should \
2633         have failed boot."
2634    )
2635}
2636
2637/// Classify a sqlx error from an `insert_json` / `update_json`
2638/// SQL execution into a structured `WriteError`. Constraint
2639/// failures are body-aware (the original JSON value is threaded
2640/// into the message); unknown errors fall through to
2641/// `WriteError::Sqlx` and the REST layer renders them as a 500.
2642fn classify_or_sqlx(
2643    e: sqlx::Error,
2644    body: &serde_json::Map<String, serde_json::Value>,
2645) -> crate::orm::write::WriteError {
2646    if let Some(classified) = crate::orm::validation::classify_sql_error(&e, body) {
2647        return classified;
2648    }
2649    crate::orm::write::WriteError::Sqlx(e)
2650}
2651
2652fn validate_numeric_bounds(
2653    col: &Column,
2654    json: &serde_json::Value,
2655) -> Result<(), crate::orm::write::WriteError> {
2656    let Some(n) = json.as_f64() else {
2657        return Ok(());
2658    };
2659    if let Some(min) = col.min {
2660        if n < min as f64 {
2661            return Err(crate::orm::write::WriteError::Validator {
2662                field: col.name.clone(),
2663                message: format!("must be >= {min} (got {n})."),
2664            });
2665        }
2666    }
2667    if let Some(max) = col.max {
2668        if n > max as f64 {
2669            return Err(crate::orm::write::WriteError::Validator {
2670                field: col.name.clone(),
2671                message: format!("must be <= {max} (got {n})."),
2672            });
2673        }
2674    }
2675    Ok(())
2676}
2677
2678/// Convert a JSON PK-shaped value (number or string) into a
2679/// `sea_query::Value` usable as a junction-table binding. Returns
2680/// `None` for shapes we don't know how to bind (arrays, objects,
2681/// booleans) — those won't reach here because
2682/// `validate_m2m_relations` rejects them upstream.
2683fn json_pk_to_sea(v: &serde_json::Value) -> Option<sea_query::Value> {
2684    match v {
2685        serde_json::Value::Number(n) => n.as_i64().map(|i| sea_query::Value::BigInt(Some(i))),
2686        serde_json::Value::String(s) => Some(sea_query::Value::String(Some(Box::new(s.clone())))),
2687        _ => None,
2688    }
2689}
2690
2691/// Read every M2M relation off its junction table and attach
2692/// the resulting `child_id` arrays to `out` under each relation's
2693/// field name. Called from `insert_json` / `update_json`'s read-
2694/// back path so the response JSON includes the relations the
2695/// caller just wrote (otherwise the `tags: [1, 2]` they POSTed
2696/// would never appear in the response, since `M2M<T>` is
2697/// `#[serde(skip)]` on the parent struct).
2698/// Normalize a select_related token: accept both `.` and `__` as
2699/// hop separators (gap2 #18), return the canonical dotted form
2700/// (`author.profile`). Mixed separators in one token are flattened
2701/// the same way (`author.profile__org` → `author.profile.org`).
2702///
2703/// Edge case: a column whose actual name contains `__` (rare; real
2704/// models don't do this) would alias to a dotted chain after this
2705/// pass and fail validation; the caller silently drops it, matching
2706/// the existing "unknown column" behaviour.
2707fn normalize_sr_token(name: &str) -> String {
2708    name.replace("__", ".")
2709}
2710
2711/// Validate a dotted select_related chain (e.g. `"author.profile"`)
2712/// against the model graph. Each hop must be an FK on the prior
2713/// hop's target meta. Returns the per-hop target tables on success
2714/// (same length as `hops.len()`); returns `None` on any failure so
2715/// the caller can drop the token silently — same contract as the
2716/// pre-existing single-hop validation in `select_related_dyn`.
2717///
2718/// Empty chains, missing meta lookups, and non-FK columns all
2719/// return `None`.
2720fn validate_sr_chain(root_meta: &crate::migrate::ModelMeta, chain: &str) -> Option<Vec<String>> {
2721    let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
2722    if hops.is_empty() {
2723        return None;
2724    }
2725    let registered = crate::migrate::registered_models();
2726    let mut targets: Vec<String> = Vec::with_capacity(hops.len());
2727    let mut current_table: String = root_meta.table.clone();
2728    let mut current_meta: Option<crate::migrate::ModelMeta> = None;
2729    for hop in &hops {
2730        let meta_ref: &crate::migrate::ModelMeta =
2731            if current_table == root_meta.table && current_meta.is_none() {
2732                root_meta
2733            } else {
2734                current_meta = registered
2735                    .iter()
2736                    .find(|m| m.table == current_table)
2737                    .cloned();
2738                current_meta.as_ref()?
2739            };
2740        let col = meta_ref.fields.iter().find(|c| &c.name == hop)?;
2741        let target = col.fk_target.clone()?;
2742        targets.push(target.clone());
2743        current_table = target;
2744    }
2745    Some(targets)
2746}
2747
2748/// FK expansion for the dynamic-dispatch read path. For each name
2749/// in `sr_fields` (canonical dotted form — `select_related_dyn`
2750/// has already normalized + validated), collect the integer ids
2751/// across `rows`, run one batched `SELECT * FROM <target> WHERE id
2752/// IN (...)` per hop, and splice the resolved chain back where the
2753/// root FK id was. Query budget is `1 + len(hops)` per chain
2754/// regardless of how many parent rows came back. No N+1.
2755///
2756/// Mirrors the typed
2757/// `queryset::hydration::hydrate_select_related_nested` semantics:
2758/// per-hop fetch top-down, then bottom-up embed so the root rows
2759/// carry the full nested chain.
2760///
2761/// Caller has already validated that every name in `sr_fields`
2762/// resolves to an FK chain on `meta` (via `select_related_dyn` →
2763/// [`validate_sr_chain`]).
2764async fn hydrate_select_related_into(
2765    meta: &crate::migrate::ModelMeta,
2766    sr_fields: &[String],
2767    rows: &mut [serde_json::Map<String, serde_json::Value>],
2768) -> Result<(), sqlx::Error> {
2769    let pool = resolve_pool_dyn(meta, crate::db::RouteOp::Read);
2770    for chain in sr_fields {
2771        let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
2772        if hops.is_empty() {
2773            continue;
2774        }
2775        let Some(targets) = validate_sr_chain(meta, chain) else {
2776            // select_related_dyn validates up front; if a chain
2777            // slipped through validation but fails here (e.g. an
2778            // unregistered intermediate model — only possible from
2779            // a direct internal caller), skip rather than crash.
2780            continue;
2781        };
2782
2783        // gaps #112 / PK lift Pass A: walk the chain in PK-shape-
2784        // agnostic terms. Each hop's PK column name comes from the
2785        // target meta (could be `"id"` for integer-PK models, but
2786        // also `"codename"` for `permissions_permission`, etc.).
2787        // FK ids and PK lookups round-trip as `serde_json::Value`
2788        // so String / UUID / mixed-PK chains all hydrate without
2789        // the pre-fix `.as_i64()` silently dropping non-integer
2790        // links.
2791        let registered = crate::migrate::registered_models();
2792        let hop_target_pk: Vec<(String, SqlType)> = targets
2793            .iter()
2794            .filter_map(|t| {
2795                registered
2796                    .iter()
2797                    .find(|m| &m.table == t)
2798                    .and_then(|m| m.pk_column().map(|c| (c.name.clone(), c.ty)))
2799            })
2800            .collect();
2801        if hop_target_pk.len() != hops.len() {
2802            // A meta lookup failed mid-chain (only possible from
2803            // an unregistered intermediate model — unreachable in
2804            // practice). Skip the chain rather than crash.
2805            continue;
2806        }
2807        let hop_target_soft_delete: Vec<bool> = targets
2808            .iter()
2809            .map(|t| {
2810                registered
2811                    .iter()
2812                    .find(|m| &m.table == t)
2813                    .is_some_and(|m| m.soft_delete)
2814            })
2815            .collect();
2816
2817        // Phase 1: per-hop fetch, top-down. levels[i] holds the
2818        // related-row JSON objects at depth i, BEFORE any nesting
2819        // is embedded.
2820        let first_field = hops[0];
2821        let mut ids: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
2822        for row in rows.iter() {
2823            let Some(v) = row.get(first_field) else {
2824                continue;
2825            };
2826            if v.is_null() {
2827                continue;
2828            }
2829            ids.push(v.clone());
2830        }
2831        if ids.is_empty() {
2832            continue;
2833        }
2834        dedup_by_pk_key(&mut ids);
2835        let mut levels: Vec<Vec<serde_json::Value>> = Vec::with_capacity(hops.len());
2836        levels.push(
2837            crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
2838                &targets[0],
2839                &hop_target_pk[0].0,
2840                hop_target_pk[0].1,
2841                hop_target_soft_delete[0],
2842                &ids,
2843                &pool,
2844            )
2845            .await?,
2846        );
2847
2848        for hop_idx in 1..hops.len() {
2849            let hop_field = hops[hop_idx];
2850            let hop_target = &targets[hop_idx];
2851            let prev_lvl = &levels[hop_idx - 1];
2852            let mut next_ids: Vec<serde_json::Value> = prev_lvl
2853                .iter()
2854                .filter_map(|r| {
2855                    let v = r.as_object()?.get(hop_field)?;
2856                    if v.is_null() { None } else { Some(v.clone()) }
2857                })
2858                .collect();
2859            if next_ids.is_empty() {
2860                // Chain bottoms out (every prior-level row has
2861                // NULL for this hop). Subsequent hops would also
2862                // be empty; stop here. Earlier levels still embed
2863                // below.
2864                break;
2865            }
2866            dedup_by_pk_key(&mut next_ids);
2867            levels.push(
2868                crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
2869                    hop_target,
2870                    &hop_target_pk[hop_idx].0,
2871                    hop_target_pk[hop_idx].1,
2872                    hop_target_soft_delete[hop_idx],
2873                    &next_ids,
2874                    &pool,
2875                )
2876                .await?,
2877            );
2878        }
2879
2880        // Phase 2: bottom-up embed. For each level from second-
2881        // to-last down to first, splice the next level's matching
2882        // row into the corresponding hop slot. By the time we hit
2883        // level 0 its rows carry the full nested chain.
2884        if levels.len() > 1 {
2885            for i in (0..levels.len() - 1).rev() {
2886                let next_pk_col = &hop_target_pk[i + 1].0;
2887                let next_by_pk: HashMap<String, serde_json::Value> = levels[i + 1]
2888                    .iter()
2889                    .filter_map(|obj| {
2890                        let map = obj.as_object()?;
2891                        let pk_val = map.get(next_pk_col.as_str())?;
2892                        Some((pk_json_key(pk_val), obj.clone()))
2893                    })
2894                    .collect();
2895                let hop_field = hops[i + 1];
2896                for row in levels[i].iter_mut() {
2897                    let Some(map) = row.as_object_mut() else {
2898                        continue;
2899                    };
2900                    let Some(fk_val) = map.get(hop_field) else {
2901                        continue;
2902                    };
2903                    if fk_val.is_null() {
2904                        continue;
2905                    }
2906                    let key = pk_json_key(fk_val);
2907                    if let Some(next_json) = next_by_pk.get(&key) {
2908                        map.insert(hop_field.to_string(), next_json.clone());
2909                    }
2910                }
2911            }
2912        }
2913
2914        // Phase 3: splice level-0 rows (now fully nested) into
2915        // the root rows. Rows pointing at an id that didn't
2916        // resolve (target row deleted between the parent fetch
2917        // and the IN-lookup — a race window) keep the raw FK
2918        // value; the alternative would be silently nulling the
2919        // field which hides a real referential-integrity issue.
2920        let first_pk_col = &hop_target_pk[0].0;
2921        let first_by_pk: HashMap<String, serde_json::Value> = levels
2922            .into_iter()
2923            .next()
2924            .unwrap_or_default()
2925            .into_iter()
2926            .filter_map(|obj| {
2927                let map = obj.as_object()?;
2928                let pk_val = map.get(first_pk_col.as_str())?;
2929                Some((pk_json_key(pk_val), obj.clone()))
2930            })
2931            .collect();
2932        for row in rows.iter_mut() {
2933            let Some(fk_val) = row.get(first_field) else {
2934                continue;
2935            };
2936            if fk_val.is_null() {
2937                continue;
2938            }
2939            let key = pk_json_key(fk_val);
2940            if let Some(resolved) = first_by_pk.get(&key) {
2941                row.insert(first_field.to_string(), resolved.clone());
2942            }
2943        }
2944    }
2945    Ok(())
2946}
2947
2948/// Dedup a `Vec<serde_json::Value>` of PK values by stable string
2949/// key. `serde_json::Value` isn't `Hash`, so the standard
2950/// sort+dedup doesn't apply; the `pk_json_key` namespacing makes
2951/// every Number / String / other land in its own bucket.
2952fn dedup_by_pk_key(ids: &mut Vec<serde_json::Value>) {
2953    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2954    ids.retain(|v| seen.insert(pk_json_key(v)));
2955}
2956
2957/// Batched M2M echo across every row returned by `fetch_as_json`.
2958/// One `SELECT parent_id, child_id FROM <junction> WHERE parent_id
2959/// IN (...)` per registered M2M relation — query budget is
2960/// `count(meta.m2m_relations)` regardless of how many parent rows
2961/// came back. Replaces the per-row [`hydrate_m2m_into`] call site
2962/// in the read loop (gap2 #16) which was a 1+N*M issuer.
2963///
2964/// Each row's `<relation>` key is inserted as an array of `child_id`
2965/// values (integers or strings, matching the junction column's
2966/// declared shape). Parents with no junction rows still get the key
2967/// — initialised to an empty array — so the response shape is
2968/// consistent regardless of link presence (same contract the
2969/// per-row helper already maintained).
2970async fn hydrate_m2m_batched(
2971    meta: &crate::migrate::ModelMeta,
2972    pk_name: &str,
2973    rows: &mut [serde_json::Map<String, serde_json::Value>],
2974) -> Result<(), sqlx::Error> {
2975    if meta.m2m_relations.is_empty() || rows.is_empty() {
2976        return Ok(());
2977    }
2978
2979    // Initialise every row's relation arrays up front so parents
2980    // with zero junction rows still surface the field. Matches the
2981    // per-row helper's behaviour where the `SELECT` returning zero
2982    // rows produced `<rel>: []` rather than omitting the key.
2983    for row in rows.iter_mut() {
2984        for rel in &meta.m2m_relations {
2985            row.insert(rel.field_name.clone(), serde_json::Value::Array(Vec::new()));
2986        }
2987    }
2988
2989    // Collect parent PKs once across all rows, deduped. Skip rows
2990    // missing the PK column or whose PK value isn't a shape the
2991    // junction can bind (numbers + strings; see `json_pk_to_sea`).
2992    let mut parent_sea_vals: Vec<sea_query::Value> = Vec::with_capacity(rows.len());
2993    let mut seen_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
2994    for row in rows.iter() {
2995        let Some(pk_json) = row.get(pk_name) else {
2996            continue;
2997        };
2998        let Some(sea_val) = json_pk_to_sea(pk_json) else {
2999            continue;
3000        };
3001        let key = pk_json_key(pk_json);
3002        if seen_keys.insert(key) {
3003            parent_sea_vals.push(sea_val);
3004        }
3005    }
3006    if parent_sea_vals.is_empty() {
3007        return Ok(());
3008    }
3009
3010    for rel in &meta.m2m_relations {
3011        let junction_table = format!("{}_{}", meta.table, rel.field_name);
3012        let mut sel = Query::select();
3013        sel.from(crate::db::router::schema_qualified_table(&junction_table));
3014        sel.column(Alias::new("parent_id"));
3015        sel.column(Alias::new("child_id"));
3016        sel.and_where(Expr::col(Alias::new("parent_id")).is_in(parent_sea_vals.clone()));
3017
3018        let mut children_by_parent: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
3019        match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3020            DbPool::Sqlite(pool) => {
3021                let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3022                let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3023                for r in &db_rows {
3024                    let parent = read_junction_id_sqlite(r, "parent_id")?;
3025                    let child = read_junction_id_sqlite(r, "child_id")?;
3026                    children_by_parent
3027                        .entry(pk_json_key(&parent))
3028                        .or_default()
3029                        .push(child);
3030                }
3031            }
3032            DbPool::Postgres(pool) => {
3033                let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3034                let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3035                for r in &db_rows {
3036                    let parent = read_junction_id_pg(r, "parent_id")?;
3037                    let child = read_junction_id_pg(r, "child_id")?;
3038                    children_by_parent
3039                        .entry(pk_json_key(&parent))
3040                        .or_default()
3041                        .push(child);
3042                }
3043            }
3044        }
3045
3046        for row in rows.iter_mut() {
3047            let Some(pk_json) = row.get(pk_name) else {
3048                continue;
3049            };
3050            let key = pk_json_key(pk_json);
3051            if let Some(children) = children_by_parent.remove(&key) {
3052                row.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3053            }
3054        }
3055    }
3056    Ok(())
3057}
3058
3059/// Stable string key for a parent PK JSON value, used to group
3060/// junction rows under their owning parent in
3061/// [`hydrate_m2m_batched`]. Integers and strings get their own
3062/// disjoint namespaces (`n:42` vs `s:42`) so a numeric PK and a
3063/// string PK that stringify identically never collide.
3064fn pk_json_key(v: &serde_json::Value) -> String {
3065    match v {
3066        serde_json::Value::Number(n) => format!("n:{n}"),
3067        serde_json::Value::String(s) => format!("s:{s}"),
3068        other => format!("o:{other}"),
3069    }
3070}
3071
3072/// Read a junction-table id column as JSON (number or string).
3073/// Junction columns are i64 for integer PKs and TEXT for string /
3074/// uuid PKs; we don't know at compile time which one a relation
3075/// uses, so try i64 first and fall back to String.
3076fn read_junction_id_sqlite(
3077    row: &sqlx::sqlite::SqliteRow,
3078    col: &str,
3079) -> Result<serde_json::Value, sqlx::Error> {
3080    if let Ok(i) = row.try_get::<i64, _>(col) {
3081        return Ok(serde_json::Value::Number(i.into()));
3082    }
3083    let s = row.try_get::<String, _>(col)?;
3084    Ok(serde_json::Value::String(s))
3085}
3086
3087fn read_junction_id_pg(
3088    row: &sqlx::postgres::PgRow,
3089    col: &str,
3090) -> Result<serde_json::Value, sqlx::Error> {
3091    if let Ok(i) = row.try_get::<i64, _>(col) {
3092        return Ok(serde_json::Value::Number(i.into()));
3093    }
3094    let s = row.try_get::<String, _>(col)?;
3095    Ok(serde_json::Value::String(s))
3096}
3097
3098async fn hydrate_m2m_into(
3099    meta: &crate::migrate::ModelMeta,
3100    parent_pk_json: Option<&serde_json::Value>,
3101    out: &mut serde_json::Map<String, serde_json::Value>,
3102) -> Result<(), sqlx::Error> {
3103    if meta.m2m_relations.is_empty() {
3104        return Ok(());
3105    }
3106    let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3107        return Ok(());
3108    };
3109    for rel in &meta.m2m_relations {
3110        let junction_table = format!("{}_{}", meta.table, rel.field_name);
3111        let mut sel = Query::select();
3112        sel.from(crate::db::router::schema_qualified_table(&junction_table));
3113        sel.column(Alias::new("child_id"));
3114        sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
3115        let children: Vec<serde_json::Value> =
3116            match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3117                DbPool::Sqlite(pool) => {
3118                    let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3119                    let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3120                    rows.iter()
3121                        .map(|r| {
3122                            r.try_get::<i64, _>("child_id")
3123                                .map(|i| serde_json::Value::Number(i.into()))
3124                                .or_else(|_| {
3125                                    r.try_get::<String, _>("child_id")
3126                                        .map(serde_json::Value::String)
3127                                })
3128                        })
3129                        .collect::<Result<Vec<_>, _>>()?
3130                }
3131                DbPool::Postgres(pool) => {
3132                    let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3133                    let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3134                    rows.iter()
3135                        .map(|r| {
3136                            r.try_get::<i64, _>("child_id")
3137                                .map(|i| serde_json::Value::Number(i.into()))
3138                                .or_else(|_| {
3139                                    r.try_get::<String, _>("child_id")
3140                                        .map(serde_json::Value::String)
3141                                })
3142                        })
3143                        .collect::<Result<Vec<_>, _>>()?
3144                }
3145            };
3146        out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3147    }
3148    Ok(())
3149}
3150
3151/// Run `SELECT <pk> FROM <table> WHERE <conds>` to find every
3152/// row the dynamic UPDATE would touch. Returns each matched PK
3153/// as the raw JSON value the parent table holds — number for
3154/// integer PKs, string for UUID / String PKs. Used by
3155/// `update_json` so we know which junction-table parent_ids
3156/// to write to even when the body has no regular column changes.
3157async fn collect_parent_pks(
3158    meta: &crate::migrate::ModelMeta,
3159    pk_col: &crate::migrate::Column,
3160    where_clauses: &[Condition],
3161) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
3162    let mut sel = Query::select();
3163    sel.from(crate::db::router::schema_qualified_table(&meta.table));
3164    sel.column(Alias::new(&pk_col.name));
3165    for cond in where_clauses {
3166        sel.cond_where(cond.clone());
3167    }
3168    match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3169        DbPool::Sqlite(pool) => {
3170            let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3171            let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3172            rows.iter()
3173                .map(|row| decode_to_json(row, pk_col))
3174                .collect::<Result<Vec<_>, _>>()
3175                .map_err(crate::orm::write::WriteError::Sqlx)
3176        }
3177        DbPool::Postgres(pool) => {
3178            let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3179            let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3180            rows.iter()
3181                .map(|row| decode_pg_to_json(row, pk_col))
3182                .collect::<Result<Vec<_>, _>>()
3183                .map_err(crate::orm::write::WriteError::Sqlx)
3184        }
3185    }
3186}
3187
3188/// Transaction-aware sibling of [`collect_parent_pks`]: reads the matched
3189/// PKs on the open `tx` so a bulk update mid-transaction sees the rows the
3190/// same tx has touched. Used by `update_json_in_tx`.
3191async fn collect_parent_pks_in_tx(
3192    meta: &crate::migrate::ModelMeta,
3193    pk_col: &crate::migrate::Column,
3194    where_clauses: &[Condition],
3195    tx: &mut crate::db::Transaction,
3196) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
3197    let mut sel = Query::select();
3198    sel.from(crate::db::router::schema_qualified_table(&meta.table));
3199    sel.column(Alias::new(&pk_col.name));
3200    for cond in where_clauses {
3201        sel.cond_where(cond.clone());
3202    }
3203    match tx.backend_name() {
3204        "sqlite" => {
3205            let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3206            let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
3207            let rows = sqlx::query_with(&sql, values)
3208                .fetch_all(&mut **inner)
3209                .await?;
3210            rows.iter()
3211                .map(|row| decode_to_json(row, pk_col))
3212                .collect::<Result<Vec<_>, _>>()
3213                .map_err(crate::orm::write::WriteError::Sqlx)
3214        }
3215        _ => {
3216            let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3217            let inner = tx.as_pg_mut().expect("postgres backend_name");
3218            let rows = sqlx::query_with(&sql, values)
3219                .fetch_all(&mut **inner)
3220                .await?;
3221            rows.iter()
3222                .map(|row| decode_pg_to_json(row, pk_col))
3223                .collect::<Result<Vec<_>, _>>()
3224                .map_err(crate::orm::write::WriteError::Sqlx)
3225        }
3226    }
3227}
3228
3229/// Mirror each M2M field in `body` into its junction table for
3230/// the given parent PK. Validation has already confirmed array
3231/// shape + child existence, so this is a straight write —
3232/// `set_junction_dynamic` wipes any existing rows for the
3233/// parent and re-inserts the supplied ids inside a transaction.
3234///
3235/// `parent_pk_json` is the JSON value the parent row holds at
3236/// its PK column (read straight off the post-INSERT row). When
3237/// it's `None` or unparseable we silently skip — there's nothing
3238/// to anchor the junction to.
3239/// Phase -1 of the dynamic insert: strip `noform` columns and derive
3240/// any `#[umbral(slug_from = "...")]` columns. Returns `Some(owned)`
3241/// when either rule fired (the caller binds the owned copy) or `None`
3242/// when the body passes through untouched. Shared by `insert_json`
3243/// and `insert_json_in_tx` so the two paths can't drift on what they
3244/// strip / derive before validation runs.
3245fn normalise_insert_body(
3246    meta: &crate::migrate::ModelMeta,
3247    body: &serde_json::Map<String, serde_json::Value>,
3248) -> Option<serde_json::Map<String, serde_json::Value>> {
3249    let needs_owned = meta
3250        .fields
3251        .iter()
3252        .any(|c| c.noform || c.slug_from.is_some());
3253    if !needs_owned {
3254        return None;
3255    }
3256    let mut owned = body.clone();
3257    for col in &meta.fields {
3258        if col.noform {
3259            owned.remove(&col.name);
3260        }
3261    }
3262    crate::orm::write::apply_slug_from(&meta.fields, &mut owned, false);
3263    Some(owned)
3264}
3265
3266/// The prepared INSERT plus the PK shape the caller re-fetches by.
3267struct InsertPlan {
3268    q: sea_query::InsertStatement,
3269    pk_name: String,
3270    pk_ty: SqlType,
3271}
3272
3273/// Phase 1 of the dynamic insert: validate min/max + text-format
3274/// wrappers per column, coerce each JSON value to its `SeaValue`, and
3275/// assemble the `Query::insert()`. Auto-increment integer PKs and
3276/// absent-with-default columns are omitted so the backend fills them;
3277/// `auto_now` / `auto_now_add` columns the body omitted are filled
3278/// with `Utc::now()`. Shared by `insert_json` and `insert_json_in_tx`
3279/// so column handling is identical on both paths; the methods differ
3280/// only in which executor runs the statement.
3281fn build_insert_plan(
3282    meta: &crate::migrate::ModelMeta,
3283    body: &serde_json::Map<String, serde_json::Value>,
3284) -> Result<InsertPlan, crate::orm::write::WriteError> {
3285    use crate::orm::write::{WriteError, is_default_pk};
3286
3287    let mut cols: Vec<&str> = Vec::new();
3288    let mut values: Vec<SeaValue> = Vec::new();
3289    for col in &meta.fields {
3290        if col.primary_key {
3291            let supplied = body.get(&col.name);
3292            let is_sentinel = match supplied {
3293                None | Some(serde_json::Value::Null) => true,
3294                Some(v) => is_default_pk(col.ty, v),
3295            };
3296            if matches!(
3297                col.ty,
3298                SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
3299            ) && is_sentinel
3300            {
3301                continue;
3302            }
3303        }
3304        let Some(json) = body.get(&col.name) else {
3305            if col.auto_now_add || col.auto_now {
3306                let now_value = crate::orm::write::now_for_column(col.ty);
3307                cols.push(&col.name);
3308                values.push(now_value);
3309                continue;
3310            }
3311            continue;
3312        };
3313        if json.is_null() {
3314            continue;
3315        }
3316        validate_numeric_bounds(col, json)?;
3317        if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
3318            if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
3319                return Err(WriteError::Validator {
3320                    field: col.name.clone(),
3321                    message: e.to_string(),
3322                });
3323            }
3324        }
3325        let sea_value = crate::orm::write::json_to_sea_value(
3326            col.ty,
3327            json,
3328            col.nullable,
3329            &col.name,
3330            fk_target_pk_sql_type(col),
3331        )?;
3332        cols.push(&col.name);
3333        values.push(sea_value);
3334    }
3335
3336    let pk_col = meta.fields.iter().find(|c| c.primary_key).ok_or_else(|| {
3337        WriteError::Sqlx(sqlx::Error::Protocol(
3338            "insert_json: model has no PK".to_string(),
3339        ))
3340    })?;
3341    let pk_name = pk_col.name.clone();
3342    let pk_ty = pk_col.ty;
3343
3344    let mut q = Query::insert();
3345    q.into_table(crate::db::router::schema_qualified_table(&meta.table));
3346    q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
3347    let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
3348    q.values_panic(exprs);
3349
3350    Ok(InsertPlan { q, pk_name, pk_ty })
3351}
3352
3353/// Transaction-aware sibling of [`write_m2m_junctions`]: mirrors each
3354/// M2M field in `body` into its junction table on the passed `tx`, so
3355/// the junction rows commit / roll back with the parent INSERT.
3356async fn write_m2m_junctions_in_tx(
3357    meta: &crate::migrate::ModelMeta,
3358    parent_pk_json: Option<&serde_json::Value>,
3359    body: &serde_json::Map<String, serde_json::Value>,
3360    tx: &mut crate::db::Transaction,
3361) -> Result<(), crate::orm::write::WriteError> {
3362    if meta.m2m_relations.is_empty() {
3363        return Ok(());
3364    }
3365    let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3366        return Ok(());
3367    };
3368    for rel in &meta.m2m_relations {
3369        let Some(value) = body.get(&rel.field_name) else {
3370            continue;
3371        };
3372        let Some(items) = value.as_array() else {
3373            continue;
3374        };
3375        let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
3376        for item in items {
3377            if item.is_null() {
3378                continue;
3379            }
3380            if let Some(v) = json_pk_to_sea(item) {
3381                child_ids.push(v);
3382            }
3383        }
3384        let junction_table = format!("{}_{}", meta.table, rel.field_name);
3385        crate::orm::m2m::set_junction_dynamic_in_tx(
3386            &junction_table,
3387            parent_pk_value.clone(),
3388            child_ids,
3389            tx,
3390        )
3391        .await
3392        .map_err(crate::orm::write::WriteError::Sqlx)?;
3393    }
3394    Ok(())
3395}
3396
3397/// Transaction-aware sibling of [`hydrate_m2m_into`]: read the just-
3398/// written junction rows back off the SAME `tx` so the response echoes
3399/// the M2M arrays the caller will see post-commit. Reading on the pool
3400/// here would miss the uncommitted junction writes.
3401async fn hydrate_m2m_into_tx(
3402    meta: &crate::migrate::ModelMeta,
3403    parent_pk_json: Option<&serde_json::Value>,
3404    out: &mut serde_json::Map<String, serde_json::Value>,
3405    tx: &mut crate::db::Transaction,
3406) -> Result<(), sqlx::Error> {
3407    if meta.m2m_relations.is_empty() {
3408        return Ok(());
3409    }
3410    let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3411        return Ok(());
3412    };
3413    for rel in &meta.m2m_relations {
3414        let junction_table = format!("{}_{}", meta.table, rel.field_name);
3415        let mut sel = Query::select();
3416        sel.from(crate::db::router::schema_qualified_table(&junction_table));
3417        sel.column(Alias::new("child_id"));
3418        sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
3419        let children: Vec<serde_json::Value> = match tx.backend_name() {
3420            "sqlite" => {
3421                let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
3422                let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3423                let rows = sqlx::query_with(&sql, values)
3424                    .fetch_all(&mut **inner)
3425                    .await?;
3426                rows.iter()
3427                    .map(|r| {
3428                        r.try_get::<i64, _>("child_id")
3429                            .map(|i| serde_json::Value::Number(i.into()))
3430                            .or_else(|_| {
3431                                r.try_get::<String, _>("child_id")
3432                                    .map(serde_json::Value::String)
3433                            })
3434                    })
3435                    .collect::<Result<Vec<_>, _>>()?
3436            }
3437            _ => {
3438                let inner = tx.as_pg_mut().expect("postgres backend_name");
3439                let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3440                let rows = sqlx::query_with(&sql, values)
3441                    .fetch_all(&mut **inner)
3442                    .await?;
3443                rows.iter()
3444                    .map(|r| {
3445                        r.try_get::<i64, _>("child_id")
3446                            .map(|i| serde_json::Value::Number(i.into()))
3447                            .or_else(|_| {
3448                                r.try_get::<String, _>("child_id")
3449                                    .map(serde_json::Value::String)
3450                            })
3451                    })
3452                    .collect::<Result<Vec<_>, _>>()?
3453            }
3454        };
3455        out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3456    }
3457    Ok(())
3458}
3459
3460async fn write_m2m_junctions(
3461    meta: &crate::migrate::ModelMeta,
3462    parent_pk_json: Option<&serde_json::Value>,
3463    body: &serde_json::Map<String, serde_json::Value>,
3464) -> Result<(), crate::orm::write::WriteError> {
3465    if meta.m2m_relations.is_empty() {
3466        return Ok(());
3467    }
3468    let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3469        return Ok(());
3470    };
3471    for rel in &meta.m2m_relations {
3472        let Some(value) = body.get(&rel.field_name) else {
3473            continue;
3474        };
3475        let Some(items) = value.as_array() else {
3476            continue; // shape was validated upstream
3477        };
3478        let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
3479        for item in items {
3480            if item.is_null() {
3481                continue;
3482            }
3483            if let Some(v) = json_pk_to_sea(item) {
3484                child_ids.push(v);
3485            }
3486        }
3487        let junction_table = format!("{}_{}", meta.table, rel.field_name);
3488        crate::orm::m2m::set_junction_dynamic(
3489            &junction_table,
3490            parent_pk_value.clone(),
3491            child_ids,
3492            Some(&meta.name),
3493        )
3494        .await
3495        .map_err(crate::orm::write::WriteError::Sqlx)?;
3496    }
3497    Ok(())
3498}
3499
3500// =========================================================================
3501// CSV / tabular import (#61). Coerce string cells to the column's type and
3502// route each row through `insert_json`, so validators / auto_now /
3503// slug_from / FK-existence checks all apply. The CSV *parsing* lives in the
3504// CLI (the `csv` crate); this is the coerce-and-insert half, kept in core
3505// because the type coercion needs `ModelMeta` + `SqlType` + the dynamic
3506// write path.
3507// =========================================================================
3508
3509/// Coerce one raw CSV cell to the `serde_json::Value` shape its column
3510/// expects, so downstream validation (`min`/`max`, choices) sees a typed
3511/// value rather than a string. An empty cell on a nullable column becomes
3512/// `null`. A value that doesn't parse for a numeric/bool column falls back
3513/// to the raw string, letting `insert_json` surface a clear per-row error
3514/// instead of silently dropping data. Text / Date / Time / Uuid / etc.
3515/// pass through as strings — `json_to_sea_value` parses each from there.
3516fn coerce_csv_cell(ty: SqlType, nullable: bool, raw: &str) -> serde_json::Value {
3517    use serde_json::Value;
3518    if raw.is_empty() && nullable {
3519        return Value::Null;
3520    }
3521    match ty {
3522        SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => raw
3523            .parse::<i64>()
3524            .map(Value::from)
3525            .unwrap_or_else(|_| Value::String(raw.to_string())),
3526        SqlType::Real | SqlType::Double => raw
3527            .parse::<f64>()
3528            .ok()
3529            .and_then(serde_json::Number::from_f64)
3530            .map(Value::Number)
3531            .unwrap_or_else(|| Value::String(raw.to_string())),
3532        SqlType::Boolean => match raw.trim().to_ascii_lowercase().as_str() {
3533            "true" | "1" | "t" | "yes" | "y" => Value::Bool(true),
3534            "false" | "0" | "f" | "no" | "n" => Value::Bool(false),
3535            _ => Value::String(raw.to_string()),
3536        },
3537        SqlType::Json => {
3538            serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string()))
3539        }
3540        _ => Value::String(raw.to_string()),
3541    }
3542}
3543
3544/// Outcome of [`import_table_rows`]: how many rows inserted, plus the
3545/// `(line, message)` of every row that failed. Best-effort — a bad row is
3546/// reported and skipped, never fatal — because messy real-world CSVs want
3547/// "tell me which rows are wrong," not an all-or-nothing abort. `line` is
3548/// 1-based over the file (the header is line 1, so the first data row is
3549/// line 2), matching what a spreadsheet shows.
3550#[derive(Debug, Default)]
3551pub struct CsvImportReport {
3552    pub inserted: usize,
3553    pub errors: Vec<(usize, String)>,
3554}
3555
3556/// Insert tabular string rows into `meta`'s table. Each cell is coerced to
3557/// its column's type ([`coerce_csv_cell`]) and the row routes through the
3558/// dynamic write path ([`DynQuerySet::insert_json`]) so every per-row
3559/// framework behaviour (validators, `auto_now`, `slug_from`, FK existence,
3560/// soft-delete) applies exactly as it would for a REST POST.
3561///
3562/// `headers` names the column each cell maps to; a header that matches no
3563/// model field is ignored, so an extra CSV column (or a re-ordered export)
3564/// imports cleanly. Rows commit independently — there is no surrounding
3565/// transaction (the dynamic write path has none; see `orm_fixes.md` #2).
3566pub async fn import_table_rows(
3567    meta: &ModelMeta,
3568    headers: &[String],
3569    rows: &[Vec<String>],
3570) -> CsvImportReport {
3571    let col_for: HashMap<&str, &Column> =
3572        meta.fields.iter().map(|c| (c.name.as_str(), c)).collect();
3573
3574    let mut report = CsvImportReport::default();
3575    for (i, row) in rows.iter().enumerate() {
3576        let mut obj = serde_json::Map::new();
3577        for (header, cell) in headers.iter().zip(row.iter()) {
3578            if let Some(col) = col_for.get(header.as_str()) {
3579                obj.insert(header.clone(), coerce_csv_cell(col.ty, col.nullable, cell));
3580            }
3581        }
3582        match DynQuerySet::for_meta(meta).insert_json(&obj).await {
3583            Ok(_) => report.inserted += 1,
3584            Err(e) => report.errors.push((i + 2, e.to_string())),
3585        }
3586    }
3587    report
3588}
3589
3590#[cfg(test)]
3591mod tests {
3592    use super::form_str_to_sea_value;
3593    use crate::migrate::Column;
3594    use crate::orm::{FkAction, SqlType};
3595    use sea_query::Value as SeaValue;
3596
3597    fn col(name: &str, ty: SqlType, nullable: bool) -> Column {
3598        Column {
3599            name: name.to_string(),
3600            ty,
3601            primary_key: false,
3602            nullable,
3603            fk_target: None,
3604            noform: false,
3605            db_constraint: true,
3606            noedit: false,
3607            is_string_repr: false,
3608            max_length: 0,
3609            choices: Vec::new(),
3610            choice_labels: Vec::new(),
3611            default: String::new(),
3612            is_multichoice: false,
3613            unique: false,
3614            on_delete: FkAction::NoAction,
3615            on_update: FkAction::NoAction,
3616            index: false,
3617            auto_now_add: false,
3618            auto_now: false,
3619            help: String::new(),
3620            example: String::new(),
3621            widget: None,
3622            supported_backends: Vec::new(),
3623            min: None,
3624            max: None,
3625            text_format: None,
3626            slug_from: None,
3627        }
3628    }
3629
3630    #[test]
3631    fn form_fk_numeric_string_binds_as_bigint() {
3632        let mut plugin = col("plugin", SqlType::ForeignKey, false);
3633        plugin.fk_target = Some("plugin".to_string());
3634
3635        let value = form_str_to_sea_value(&plugin, "1").expect("coerce FK id");
3636
3637        assert_eq!(
3638            value,
3639            SeaValue::BigInt(Some(1)),
3640            "integer-backed FK form values must bind as bigint, not text"
3641        );
3642    }
3643
3644    #[test]
3645    fn nullable_form_fk_blank_binds_as_null_bigint() {
3646        let mut parent = col("parent", SqlType::ForeignKey, true);
3647        parent.fk_target = Some("plugin_comment".to_string());
3648
3649        let value = form_str_to_sea_value(&parent, "").expect("blank nullable FK");
3650
3651        assert_eq!(
3652            value,
3653            SeaValue::BigInt(None),
3654            "blank nullable integer-backed FK should bind SQL NULL"
3655        );
3656    }
3657}