Skip to main content

umbral_core/orm/queryset/
mod.rs

1//! `QuerySet<T>` and `Manager<T>`: chainable lazy SQL builder + entry
2//! point.
3//!
4//! `T::objects()` returns a `Manager<T>`; chaining `.filter` / `.order_by`
5//! / `.limit` / etc. on it (or on a `QuerySet<T>` directly) yields a new
6//! `QuerySet<T>`. Terminals (`.fetch`, `.first`, `.count`, `.exists`)
7//! await an async DB roundtrip via the ambient or explicit pool.
8//!
9//! At M1 the surface is intentionally narrow per
10//! `docs/specs/03-orm-querysets.md`: filter / order_by / limit / offset
11//! for chaining, and fetch / first / count / exists for terminals. No
12//! exclude / distinct / values / annotate / aggregate / update / delete
13//! yet — those land as later milestones surface real need.
14//!
15//! M2 lifted the terminals and the `Manager` delegation onto a generic
16//! `T: Model` bound. The table name comes from `T::TABLE`, the SELECT
17//! column list from `T::FIELDS`, and row materialisation from the
18//! `FromRow` bound the terminals carry. M3 generates the `Model` impl
19//! from `#[derive(Model)]`.
20//!
21//! ## Phase 2.5 — backend-agnostic terminals
22//!
23//! Through Phase 2 the QuerySet stored a `SqlitePool` and built every
24//! query with sea-query's `SqliteQueryBuilder`. Phase 2.5 widens that:
25//! the explicit-pool slot is `Option<DbPool>`, `.on(&SqlitePool)` keeps
26//! working unchanged, and a new `.on_pg(&PgPool)` registers a Postgres
27//! pool. The terminal methods dispatch on the resolved pool variant —
28//! SQLite path uses `SqliteQueryBuilder` + a `SqlitePool` executor;
29//! Postgres path uses `PostgresQueryBuilder` + a `PgPool` executor.
30//!
31//! The row-materialization bound on each terminal is the conjunction
32//! of both backends' `FromRow` impls. `#[derive(sqlx::FromRow)]` emits
33//! a generic-over-`R` impl, so a user struct with standard field
34//! types satisfies both bounds without any per-backend ceremony.
35
36mod backend_pg;
37mod backend_sqlite;
38mod errors;
39pub(crate) mod hydration;
40mod tx;
41mod write_helpers;
42
43pub use errors::{GetError, TryForEachError};
44use hydration::{hydrate_prefetch_related, hydrate_select_related};
45pub use tx::QuerySetTx;
46use write_helpers::{
47    build_insert_many_for, build_insert_one_for, fk_pk_hint, pk_field, serialize_to_map,
48};
49
50use std::collections::HashMap;
51use std::marker::PhantomData;
52
53use sea_query::{
54    Alias, Expr, Func, IntoIden, Order, PostgresQueryBuilder, Query, SqliteQueryBuilder,
55};
56use sea_query_binder::SqlxBinder;
57use serde_json::Value as JsonValue;
58
59use crate::db::DbPool;
60use crate::orm::{FExpr, HydrateRelated, Model, OrderExpr, Predicate};
61use umbral_casing::to_snake_case;
62
63/// Entry point for queries on a model.
64///
65/// `Manager<T>` wraps a freshly-constructed `QuerySet<T>` and exposes
66/// the same chainable surface. The user never constructs one directly;
67/// `Post::objects()` is the only door.
68pub struct Manager<T> {
69    _phantom: PhantomData<T>,
70    /// Per-Manager override for the `atomic_transactions` builder
71    /// default. `None` = inherit the global default; `Some(true)` =
72    /// wrap subsequent writes in a transaction; `Some(false)` =
73    /// explicitly opt out. Propagates into the QuerySet `queryset()`
74    /// constructs.
75    atomic: Option<bool>,
76}
77
78impl<T> Manager<T> {
79    pub(crate) fn new() -> Self {
80        Self {
81            _phantom: PhantomData,
82            atomic: None,
83        }
84    }
85
86    /// Wrap every write terminal that hangs off this Manager in a
87    /// transaction. Equivalent to calling `.atomic()` on each
88    /// QuerySet derived from it. Per-call `.non_atomic()` overrides.
89    pub fn atomic(mut self) -> Self {
90        self.atomic = Some(true);
91        self
92    }
93
94    /// Opt this Manager (and every QuerySet derived from it) out of
95    /// the global `App::builder().atomic_transactions(true)` default.
96    pub fn non_atomic(mut self) -> Self {
97        self.atomic = Some(false);
98        self
99    }
100}
101
102impl<T> Default for Manager<T> {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108/// SQL join flavor recorded per `join_related` hop. `None` in a
109/// `JoinReq` means "infer from FK nullability" (gap 4c); an explicit
110/// `left_/inner_/right_join_related` records `Some(..)`.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum JoinKind {
113    Inner,
114    Left,
115    Right,
116}
117
118impl JoinKind {
119    /// Lower to sea-query's join type.
120    pub(crate) fn sea(self) -> sea_query::JoinType {
121        match self {
122            JoinKind::Inner => sea_query::JoinType::InnerJoin,
123            JoinKind::Left => sea_query::JoinType::LeftJoin,
124            JoinKind::Right => sea_query::JoinType::RightJoin,
125        }
126    }
127}
128
129/// One requested eager-join: a dotted relation path (`"plugin__author"`)
130/// plus the join type to apply to the LAST hop. `kind: None` means
131/// auto-infer per-hop from FK nullability (INNER for NOT NULL, LEFT for
132/// nullable), the default. The explicit methods pin `Some(..)`.
133#[derive(Debug, Clone)]
134pub(crate) struct JoinReq {
135    pub(crate) path: String,
136    pub(crate) kind: Option<JoinKind>,
137}
138
139/// A lazy, chainable SQL query.
140///
141/// Carries a sea-query `SelectStatement` plus pool-resolution state.
142/// Nothing is sent to the database until a terminal method is awaited.
143/// Cloning is cheap (the `SelectStatement` clones in O(query size)).
144pub struct QuerySet<T> {
145    /// The base SelectStatement — FROM, columns, joins, group-by,
146    /// order-by, limit, offset. Filters are NOT applied here; they
147    /// accumulate on [`Self::predicates`] and get woven in at
148    /// terminal time, so per-backend predicate variants (Phase
149    /// 4.2.2) can pick the right SimpleExpr based on the resolved
150    /// pool.
151    pub(crate) query: sea_query::SelectStatement,
152    /// Accumulated filter predicates. Each one renders to either its
153    /// default `cond` (for Postgres) or its `cond_sqlite` override
154    /// (for SQLite, if set) at terminal time.
155    pub(crate) predicates: Vec<Predicate<T>>,
156    pub(crate) explicit_pool: Option<DbPool>,
157    /// FK field names requested for eager loading via `select_related`.
158    /// After the main query returns rows, a batch `IN (...)` query
159    /// fetches the related rows for each named field and calls
160    /// `HydrateRelated::hydrate_fk` to populate `ForeignKey.resolved`.
161    pub(crate) select_related: Vec<String>,
162    /// M2M field names requested for eager loading via
163    /// `prefetch_related`. After the main query, one batched JOIN
164    /// against the junction + child table fetches every related row
165    /// for every parent in a single round-trip; each parent's
166    /// `M2M.resolved` slot is populated via
167    /// `HydrateRelated::set_m2m_resolved_json`. Gap #19.
168    pub(crate) prefetch_related: Vec<String>,
169    /// BUG-8: `#[umbral(ordering = [...])]` lowers to a default ORDER
170    /// BY applied at terminal time when the caller didn't supply an
171    /// explicit `.order_by(...)`. The semantics: explicit calls REPLACE
172    /// the default rather than appending to it.
173    pub(crate) default_ordering: Vec<(&'static str, bool)>,
174    /// Set to `true` the first time `.order_by(...)` is called; when
175    /// `false`, `build_query_for` applies `default_ordering`.
176    pub(crate) explicit_order: bool,
177    /// Per-QuerySet override for the `atomic_transactions` builder
178    /// default. `None` = inherit the global default via
179    /// [`crate::db::atomic_default`]; `Some(true)` = wrap this
180    /// QuerySet's write terminal in a transaction; `Some(false)` =
181    /// explicitly opt out.
182    pub(crate) atomic: Option<bool>,
183    /// Feature #72 — soft-delete state. Snapshotted from
184    /// `T::SOFT_DELETE` when the QuerySet is constructed from a
185    /// Manager (the no-bounds `QuerySet::new` constructor leaves
186    /// this `false` so hand-built QuerySets stay opt-out by
187    /// default).
188    pub(crate) soft_delete_active: bool,
189    /// True when the caller opted back into soft-deleted rows via
190    /// `.with_deleted()`. Skips the auto `WHERE deleted_at IS NULL`
191    /// injection.
192    pub(crate) with_deleted: bool,
193    /// True when the caller wants ONLY soft-deleted rows via
194    /// `.only_deleted()`. Inverts the auto-filter to
195    /// `WHERE deleted_at IS NOT NULL`.
196    pub(crate) only_deleted: bool,
197    /// True when the caller asked for a real DELETE via
198    /// `.hard_delete()` — bypasses the soft-delete rewrite that
199    /// would normally turn `delete()` into an UPDATE.
200    pub(crate) hard_delete: bool,
201    /// Gap #111 — column projection set by [`Self::only`]. When
202    /// `Some`, [`Self::to_sql`] / [`Self::to_sql_pg`] swap the
203    /// SELECT list for just these columns, and the typed terminals
204    /// (`fetch` / `first` / `get`) refuse to run with a clear error
205    /// pointing the caller at [`Self::values`] (FromRow can't
206    /// hydrate `T` from a partial-column row). `None` keeps the
207    /// pre-#111 behaviour (full SELECT).
208    pub(crate) only_cols: Option<Vec<String>>,
209    /// FK field names requested for JOIN-based prefetch via
210    /// [`Self::join_related`]. Distinct from `select_related`:
211    /// `join_related` weaves `LEFT JOIN <related_table>` into the
212    /// main SELECT (with aliased columns `<field>__<col>`) so one
213    /// round-trip pulls parent + related rows together. The existing
214    /// `select_related` path keeps its "batched-IN-followup-query"
215    /// shape — both are valid; this one wins when round-trip count
216    /// matters more than the per-row column overhead.
217    pub(crate) join_related: Vec<JoinReq>,
218    /// Related-aggregate annotations added via
219    /// [`Self::annotate_related`] / [`Self::annotate_count`]. Applied
220    /// inside `build_query_for`, so EVERY terminal and introspection
221    /// path — `fetch_annotated`, `explain`, `to_sql`, `to_sql_pg` —
222    /// sees the same correlated subqueries. That's the
223    /// `annotate()` contract: an annotation is query-builder state,
224    /// not a side query.
225    pub(crate) annotations: Vec<RelatedAnnotation>,
226    _phantom: PhantomData<T>,
227}
228
229// Manual `Clone` — NOT `#[derive(Clone)]`, because the derive would force a
230// spurious `T: Clone` bound (every field is `T`-independent or carries its
231// own `T`-free `Clone`: `Predicate<T>` has a manual `impl<T> Clone`, and
232// `PhantomData<T>` clones for any `T`). The doc comment above already
233// promised cheap cloning; this is what makes `Paginator` (and any
234// requery-without-consume caller) slice the same query per page.
235impl<T> Clone for QuerySet<T> {
236    fn clone(&self) -> Self {
237        Self {
238            query: self.query.clone(),
239            predicates: self.predicates.clone(),
240            explicit_pool: self.explicit_pool.clone(),
241            select_related: self.select_related.clone(),
242            prefetch_related: self.prefetch_related.clone(),
243            default_ordering: self.default_ordering.clone(),
244            explicit_order: self.explicit_order,
245            atomic: self.atomic,
246            soft_delete_active: self.soft_delete_active,
247            with_deleted: self.with_deleted,
248            only_deleted: self.only_deleted,
249            hard_delete: self.hard_delete,
250            only_cols: self.only_cols.clone(),
251            join_related: self.join_related.clone(),
252            annotations: self.annotations.clone(),
253            _phantom: PhantomData,
254        }
255    }
256}
257
258// Manual `Debug` for the same `T: Debug`-free reason; `Predicate<T>`'s own
259// `Debug` is likewise `T`-free.
260impl<T> std::fmt::Debug for QuerySet<T> {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        // `Predicate<T>` is intentionally not `Debug` (its `SimpleExpr`
263        // carries no `T`), so report the predicate count rather than the
264        // opaque expressions.
265        f.debug_struct("QuerySet")
266            .field("query", &self.query)
267            .field("predicates", &format_args!("[{} predicate(s)]", self.predicates.len()))
268            .field("explicit_pool", &self.explicit_pool)
269            .field("select_related", &self.select_related)
270            .field("prefetch_related", &self.prefetch_related)
271            .field("default_ordering", &self.default_ordering)
272            .field("explicit_order", &self.explicit_order)
273            .field("atomic", &self.atomic)
274            .field("soft_delete_active", &self.soft_delete_active)
275            .field("with_deleted", &self.with_deleted)
276            .field("only_deleted", &self.only_deleted)
277            .field("hard_delete", &self.hard_delete)
278            .field("only_cols", &self.only_cols)
279            .field("join_related", &self.join_related)
280            .field("annotations", &self.annotations)
281            .finish()
282    }
283}
284
285/// One related-aggregate annotation on a [`QuerySet`] —
286/// `(alias, relation, aggregate)` resolved against the model's
287/// `REVERSE_FK_RELATIONS` at builder time. A name that fails to
288/// resolve is stored poisoned (`resolved: Err`) so the infallible
289/// builder stays chainable while every fallible consumer
290/// (`fetch_annotated`, `explain`) reports it loudly.
291#[derive(Debug, Clone)]
292pub(crate) struct RelatedAnnotation {
293    pub(crate) alias: String,
294    pub(crate) agg: crate::orm::Aggregate,
295    /// `Ok((child_table, fk_column, parent_table, parent_pk))` or the
296    /// loud error message for an unknown relation.
297    pub(crate) resolved: Result<(String, String, String, String), String>,
298    /// Child model is `#[umbral(soft_delete)]` — fold
299    /// `AND <child>.deleted_at IS NULL` into the correlated subquery so
300    /// a trashed child stops inflating the parent's count.
301    pub(crate) child_soft_delete: bool,
302    /// Optional child-side predicate (a filtered count),
303    /// pre-rendered to a backend-default `SimpleExpr`. ANDed into the
304    /// subquery WHERE. From `annotate_count_where`.
305    pub(crate) child_filter: Option<sea_query::SimpleExpr>,
306    /// `Some(junction_table)` when this annotation counts M2M junction
307    /// rows instead of child rows (`annotate_count` over an `M2M<T>`).
308    pub(crate) m2m_junction: Option<String>,
309}
310
311/// Outcome of auto-discovering a reverse-FK relation (gaps2 #45) when
312/// the parent declares no matching `ReverseSet` field. The resolver
313/// scans the registry for children whose FK targets the parent table
314/// and matches `relation` against their conventional name forms.
315enum AutoDiscovery {
316    /// Exactly one (child, fk_column) candidate matched.
317    Resolved {
318        child_table: String,
319        fk_column: String,
320        soft_delete: bool,
321    },
322    /// Two or more candidates matched — the caller must declare a
323    /// `#[umbral(reverse_fk = "...")]` field to disambiguate. Carries
324    /// the candidate `child.fk` labels for the error message.
325    Ambiguous(Vec<String>),
326    /// No candidate matched. Carries the list of auto-discoverable
327    /// child names so the error can teach the available relations.
328    NotFound(Vec<String>),
329}
330
331// `snake_case` replaced by `umbral_casing::to_snake_case` (imported above)
332// in the gaps2 #77 consolidation refactor.
333
334/// Scan the model registry for children whose FK targets `T::TABLE`,
335/// and match `relation` against each candidate's conventional name
336/// forms: the child's table name, the child's struct name in
337/// snake_case and bare-lowercase, and any of those with a `_set`
338/// suffix (the `<model>_set` form). Declared `REVERSE_FK_RELATIONS` /
339/// `M2M_RELATIONS` are resolved by the caller BEFORE this runs, so
340/// they always take precedence.
341fn discover_reverse_relation<T: crate::orm::Model>(relation: &str) -> AutoDiscovery {
342    if !crate::migrate::is_initialised() {
343        return AutoDiscovery::NotFound(Vec::new());
344    }
345    let parent_table = T::TABLE;
346    // Each candidate: (child_table, fk_column, child_soft_delete).
347    let mut candidates: Vec<(String, String, bool)> = Vec::new();
348    let mut discoverable: Vec<String> = Vec::new();
349    for meta in crate::migrate::registered_models() {
350        for col in &meta.fields {
351            if col.fk_target.as_deref() != Some(parent_table) {
352                continue;
353            }
354            // Conventional name forms this (child, fk_column) answers to.
355            let snake = to_snake_case(&meta.name);
356            let lower = meta.name.to_ascii_lowercase();
357            let mut forms = vec![
358                meta.table.clone(),
359                snake.clone(),
360                lower.clone(),
361                format!("{}_set", meta.table),
362                format!("{snake}_set"),
363                format!("{lower}_set"),
364            ];
365            forms.sort();
366            forms.dedup();
367            // Surface a friendly name for "available children" errors.
368            discoverable.push(format!("{}_set", meta.table));
369            if forms.iter().any(|f| f == relation) {
370                candidates.push((meta.table.clone(), col.name.clone(), meta.soft_delete));
371            }
372        }
373    }
374    match candidates.len() {
375        1 => {
376            let (child_table, fk_column, soft_delete) = candidates.pop().unwrap();
377            AutoDiscovery::Resolved {
378                child_table,
379                fk_column,
380                soft_delete,
381            }
382        }
383        0 => {
384            discoverable.sort();
385            discoverable.dedup();
386            AutoDiscovery::NotFound(discoverable)
387        }
388        _ => {
389            let labels = candidates
390                .into_iter()
391                .map(|(child, fk, _)| format!("{child}.{fk}"))
392                .collect();
393            AutoDiscovery::Ambiguous(labels)
394        }
395    }
396}
397
398impl<T> QuerySet<T> {
399    pub(crate) fn new(query: sea_query::SelectStatement) -> Self {
400        Self {
401            query,
402            default_ordering: Vec::new(),
403            explicit_order: false,
404            predicates: Vec::new(),
405            explicit_pool: None,
406            select_related: Vec::new(),
407            prefetch_related: Vec::new(),
408            atomic: None,
409            soft_delete_active: false,
410            with_deleted: false,
411            only_deleted: false,
412            hard_delete: false,
413            only_cols: None,
414            join_related: Vec::new(),
415            annotations: Vec::new(),
416            _phantom: PhantomData,
417        }
418    }
419
420    /// Feature #72 — include soft-deleted rows in this query. Skips
421    /// the auto `WHERE deleted_at IS NULL` injection. No-op on
422    /// models that aren't tagged `#[umbral(soft_delete)]`.
423    pub fn with_deleted(mut self) -> Self {
424        self.with_deleted = true;
425        self
426    }
427
428    /// Feature #72 — only soft-deleted rows. Useful for admin
429    /// trash views and undelete workflows. No-op on models that
430    /// aren't tagged `#[umbral(soft_delete)]`.
431    pub fn only_deleted(mut self) -> Self {
432        self.only_deleted = true;
433        self
434    }
435
436    /// Feature #72 — force a real DELETE for the next `.delete()`
437    /// terminal call. Soft-delete models normally rewrite delete()
438    /// as `UPDATE ... SET deleted_at = NOW()`; `.hard_delete()`
439    /// bypasses that for GDPR purges, test cleanup, or any other
440    /// case where the row truly should be gone. No-op on models
441    /// that aren't tagged `#[umbral(soft_delete)]` (their delete()
442    /// is already a hard DELETE).
443    pub fn hard_delete(mut self) -> Self {
444        self.hard_delete = true;
445        self
446    }
447
448    /// Gap #111 — restrict the SELECT to the named columns.
449    ///
450    /// Affects [`Self::to_sql`] / [`Self::to_sql_pg`] (the SELECT list
451    /// shrinks to just these columns) and propagates into
452    /// [`Self::values`] when that terminal is called without its own
453    /// explicit column slice.
454    ///
455    /// **The typed terminals (`fetch` / `first` / `get`) refuse to
456    /// run with `.only()` set** because a partial-column row can't
457    /// satisfy `T`'s `FromRow` impl. The error message points at
458    /// `.values(...)` (returns `Vec<serde_json::Value>`) as the
459    /// execution path. Use `.only()` for `.to_sql()` inspection and
460    /// `.values()` for actual reads.
461    ///
462    /// ```rust,ignore
463    /// // Inspect: "SELECT \"id\", \"name\" FROM \"brand\" WHERE \"id\" = ?"
464    /// let sql = Brand::objects()
465    ///     .filter(brand::ID.eq(1))
466    ///     .only(&["id", "name"])
467    ///     .to_sql();
468    ///
469    /// // Execute (returns JSON rows):
470    /// let rows = Brand::objects()
471    ///     .filter(brand::ID.eq(1))
472    ///     .values(&["id", "name"])
473    ///     .await?;
474    /// ```
475    ///
476    /// Unknown column names are not validated here — they surface at
477    /// terminal time the same way `.values()` reports them (the
478    /// rendered SQL contains the bad identifier and SQLite/Postgres
479    /// raises). This keeps the chainable surface return-type-stable.
480    pub fn only(mut self, cols: &[&str]) -> Self {
481        self.only_cols = Some(cols.iter().map(|s| s.to_string()).collect());
482        self
483    }
484
485    /// Wrap this QuerySet's write terminal in a transaction. Reads are
486    /// unaffected (read terminals are single statements and the DB
487    /// gives them a consistent snapshot). Mutually exclusive with
488    /// [`Self::on_tx`] — if both are set, `on_tx` wins (you're
489    /// already inside a transaction, so wrapping again would deadlock
490    /// or fail on backends without nested transactions).
491    pub fn atomic(mut self) -> Self {
492        self.atomic = Some(true);
493        self
494    }
495
496    /// Opt this QuerySet's write terminal out of the global
497    /// `App::builder().atomic_transactions(true)` default. Useful in
498    /// hot-path batches where the caller already owns the outer
499    /// transaction.
500    pub fn non_atomic(mut self) -> Self {
501        self.atomic = Some(false);
502        self
503    }
504
505    /// Resolve whether this QuerySet should auto-wrap its write
506    /// terminal in a transaction. Per-call override > builder global.
507    pub(crate) fn should_atomic_wrap(&self) -> bool {
508        self.atomic.unwrap_or_else(crate::db::atomic_default)
509    }
510
511    /// Clone the base query and weave in the accumulated predicates,
512    /// picking the dialect-appropriate `SimpleExpr` for each one. The
513    /// `backend_name` is `"sqlite"` or `"postgres"`; any other value
514    /// behaves like Postgres (the default).
515    pub(crate) fn build_query_for(&self, backend_name: &str) -> sea_query::SelectStatement {
516        let mut q = self.query.clone();
517        for p in &self.predicates {
518            q.and_where(p.cond_for(backend_name));
519        }
520        // Feature #72 — soft-delete auto-filter. When the model
521        // opted in via `#[umbral(soft_delete)]` AND the caller
522        // didn't switch the visibility via `.with_deleted()` /
523        // `.only_deleted()`, inject `WHERE deleted_at IS NULL`.
524        // `.with_deleted()` shows everything; `.only_deleted()`
525        // shows just the soft-deleted rows.
526        if self.soft_delete_active {
527            use sea_query::Expr;
528            if self.only_deleted {
529                q.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
530            } else if !self.with_deleted {
531                q.and_where(Expr::col(Alias::new("deleted_at")).is_null());
532            }
533        }
534        // Related-aggregate annotations: one correlated scalar
535        // subquery per entry, aliased onto the SELECT list. Living
536        // HERE is what makes `.annotate_*` compose with everything —
537        // explain(), to_sql(), fetch_annotated() all see the same
538        // query. Poisoned entries (unknown relation) are skipped in
539        // this infallible path; the fallible consumers call
540        // `check_annotations()` first and fail loudly instead.
541        for ann in &self.annotations {
542            // M2M-junction annotations count rows of the junction table
543            // (`<parent>_<field>`, columns parent_id / child_id),
544            // correlated on parent_id = <parent>.<pk>.
545            if let Some(junction) = &ann.m2m_junction {
546                if let Ok((_child_table, _fk_col, parent_table, parent_pk)) = &ann.resolved {
547                    let mut sub = sea_query::Query::select();
548                    sub.expr(ann.agg.to_simple_expr())
549                        .from(crate::db::router::schema_qualified_table(junction.as_str()))
550                        .and_where(
551                            sea_query::Expr::col((
552                                Alias::new(junction.as_str()),
553                                Alias::new("parent_id"),
554                            ))
555                            .equals((
556                                Alias::new(parent_table.as_str()),
557                                Alias::new(parent_pk.as_str()),
558                            )),
559                        );
560                    q.expr_as(
561                        sea_query::SimpleExpr::SubQuery(
562                            None,
563                            Box::new(sea_query::SubQueryStatement::SelectStatement(
564                                sub.to_owned(),
565                            )),
566                        ),
567                        Alias::new(ann.alias.as_str()),
568                    );
569                }
570                continue;
571            }
572            if let Ok((child_table, fk_col, parent_table, parent_pk)) = &ann.resolved {
573                let mut sub = sea_query::Query::select();
574                sub.expr(ann.agg.to_simple_expr())
575                    .from(crate::db::router::schema_qualified_table(
576                        child_table.as_str(),
577                    ))
578                    .and_where(
579                        sea_query::Expr::col((
580                            Alias::new(child_table.as_str()),
581                            Alias::new(fk_col.as_str()),
582                        ))
583                        .equals((
584                            Alias::new(parent_table.as_str()),
585                            Alias::new(parent_pk.as_str()),
586                        )),
587                    );
588                // Auto-exclude soft-deleted children from the count when
589                // the child model is `#[umbral(soft_delete)]`.
590                if ann.child_soft_delete {
591                    sub.and_where(
592                        sea_query::Expr::col((
593                            Alias::new(child_table.as_str()),
594                            Alias::new("deleted_at"),
595                        ))
596                        .is_null(),
597                    );
598                }
599                // Child-side predicate (annotate_count_where).
600                if let Some(filter) = &ann.child_filter {
601                    sub.and_where(filter.clone());
602                }
603                q.expr_as(
604                    sea_query::SimpleExpr::SubQuery(
605                        None,
606                        Box::new(sea_query::SubQueryStatement::SelectStatement(
607                            sub.to_owned(),
608                        )),
609                    ),
610                    Alias::new(ann.alias.as_str()),
611                );
612            }
613        }
614        // BUG-8: default ORDER BY applies only when the caller didn't
615        // supply an explicit `.order_by(...)`: the model-default
616        // ordering semantics.
617        if !self.explicit_order {
618            for (col, desc) in &self.default_ordering {
619                let order = if *desc { Order::Desc } else { Order::Asc };
620                q.order_by(Alias::new(*col), order);
621            }
622        }
623        q
624    }
625}
626
627/// Chainable methods on every `QuerySet<T>`.
628///
629/// These are model-agnostic: they only touch the sea-query
630/// `SelectStatement` and the pool-resolution slot, neither of which
631/// depends on `T`. Terminals (which need row mapping) live in the
632/// `impl<T: Model> QuerySet<T>` block below.
633impl<T> QuerySet<T> {
634    /// Add a WHERE condition. Multiple `.filter` calls AND together
635    /// (sea-query's `and_where` semantics — applied at terminal time
636    /// once the resolved pool's backend is known).
637    pub fn filter(mut self, p: Predicate<T>) -> Self {
638        self.predicates.push(p);
639        self
640    }
641
642    /// Add a negated WHERE condition. The negated predicate ANDs into
643    /// the chain alongside any `filter()` calls, so
644    /// `.filter(A).exclude(B).filter(C)` renders as `WHERE A AND NOT B
645    /// AND C`. Sugar for `filter(Q::not(p))`.
646    ///
647    /// The negated-filter terminal.
648    pub fn exclude(self, p: Predicate<T>) -> Self {
649        self.filter(crate::orm::Q::not(p))
650    }
651
652    /// Add an ORDER BY clause. Multiple `.order_by` calls append.
653    /// The first explicit call also opts out of the model's
654    /// `#[umbral(ordering = [...])]` default (BUG-8): explicit ordering
655    /// replaces the default rather than stacking on top of it.
656    pub fn order_by(mut self, o: OrderExpr<T>) -> Self {
657        let order = if o.descending {
658            Order::Desc
659        } else {
660            Order::Asc
661        };
662        self.query.order_by(Alias::new(o.column), order);
663        self.explicit_order = true;
664        self
665    }
666
667    /// Set LIMIT.
668    pub fn limit(mut self, n: u64) -> Self {
669        self.query.limit(n);
670        self
671    }
672
673    /// Set OFFSET.
674    pub fn offset(mut self, n: u64) -> Self {
675        self.query.offset(n);
676        self
677    }
678
679    /// Override the pool resolved at terminal time with a SQLite pool.
680    ///
681    /// Wins over the ambient default. Used by tests that drive the ORM
682    /// without going through `App::build()`. For a Postgres override
683    /// use [`Self::on_pg`].
684    pub fn on(mut self, pool: &sqlx::SqlitePool) -> Self {
685        self.explicit_pool = Some(DbPool::Sqlite(pool.clone()));
686        self
687    }
688
689    /// Override the pool resolved at terminal time with a Postgres pool.
690    ///
691    /// The Postgres counterpart of [`Self::on`]. Tests that want to
692    /// exercise the Postgres branch (or that drive against a real
693    /// Postgres instance) reach for this directly.
694    pub fn on_pg(mut self, pool: &sqlx::PgPool) -> Self {
695        self.explicit_pool = Some(DbPool::Postgres(pool.clone()));
696        self
697    }
698
699    /// Attach this `QuerySet` to an open transaction.
700    ///
701    /// Returns a [`QuerySetTx`] that holds both the query and a mutable
702    /// reference to the transaction. Every terminal on `QuerySetTx`
703    /// (`fetch`, `first`, `count`, `exists`, `get`, `delete`,
704    /// `update_values`) executes inside the open transaction so all
705    /// operations in the same closure commit or roll back as a unit.
706    ///
707    /// ```rust,ignore
708    /// umbral::db::transaction(|tx| async move {
709    ///     let order = Order::objects().on_tx(tx).create(new_order).await?;
710    ///     Stock::objects()
711    ///         .on_tx(tx)
712    ///         .filter(stock::SKU.eq(sku))
713    ///         .update_values(delta)
714    ///         .await?;
715    ///     Ok::<_, MyError>(order)
716    /// }).await?;
717    /// ```
718    pub fn on_tx(self, tx: &mut crate::db::Transaction) -> QuerySetTx<'_, T> {
719        QuerySetTx { qs: self, tx }
720    }
721
722    /// Eagerly load a single FK field by name.
723    ///
724    /// After the main SELECT returns rows, a batch `SELECT ... FROM <related_table>
725    /// WHERE id IN (...)` fetches all referenced rows in one round-trip. Each
726    /// returned row is deserialised as the target model and stored in
727    /// `ForeignKey<U>.resolved` so template rendering (`{{ post.author.username }}`)
728    /// and `serde_json::to_value(&post)["author"]["username"]` both work without
729    /// additional queries.
730    ///
731    /// Calling `select_related` multiple times accumulates the names:
732    /// `.select_related("author").select_related("editor")` works the same as
733    /// `.select_related_many(&["author", "editor"])`.
734    ///
735    /// ## Nested traversal (post-#42)
736    ///
737    /// `.select_related("author__manager")` walks the FK chain through
738    /// the `__` separator. One batched `IN (...)` query per hop —
739    /// `1 + len(hops)` round-trips regardless of parent count. No
740    /// N+1. Each hop's related row is embedded into the prior level's
741    /// JSON, and recursive `ForeignKey<T>::Deserialize` unpacks the
742    /// chain into `resolved()` slots at every depth. Bonus: a
743    /// select_related'd model now round-trips through
744    /// `serde_json::to_value(&t)` / `from_value` without losing the
745    /// resolved relation.
746    ///
747    /// ## Companion shapes
748    ///
749    /// - **`join_related(name)`** — same goal (load related rows) via
750    ///   a true `LEFT JOIN` in the main SELECT. One round-trip total
751    ///   vs. `select_related`'s `1 + N` batched-IN approach. Wider
752    ///   per-row payload; better when round-trip count dominates.
753    /// - **`prefetch_related(name)`** — M2M batched loading (one
754    ///   query per declared M2M field). For reverse-FK collections
755    ///   (`prefetch_related("comment_set")`-style) see gap #44 — not
756    ///   yet implemented.
757    ///
758    /// ## Loud errors
759    ///
760    /// Unknown field names (typos, M2M names accidentally passed
761    /// here, fields without `fk_target`) return a clear
762    /// `sqlx::Error::Protocol` from the terminal naming the bad hop
763    /// and the table it was looked up against. Pre-#42 these
764    /// silently no-op'd.
765    pub fn select_related(mut self, field_name: impl Into<String>) -> Self {
766        self.select_related.push(field_name.into());
767        self
768    }
769
770    /// Eagerly load multiple FK fields in one call.
771    ///
772    /// Sugar for chained `.select_related(name)` calls.
773    pub fn select_related_many(mut self, field_names: &[&str]) -> Self {
774        for name in field_names {
775            self.select_related.push(name.to_string());
776        }
777        self
778    }
779
780    /// JOIN-based eager FK loading — emits `LEFT JOIN <related> ON ...`
781    /// in the main SELECT (with aliased child columns `<field>__<col>`)
782    /// so one round-trip pulls the parent + related rows together.
783    ///
784    /// Trade-off vs. [`Self::select_related`]:
785    ///   - `select_related` runs ONE extra batched query after the
786    ///     main fetch (`SELECT * FROM related WHERE id IN (...)`).
787    ///     Two round-trips total; rows stay narrow.
788    ///   - `join_related` runs the main query AS the join — one
789    ///     round-trip total — at the cost of a wider per-row payload
790    ///     (every related column rides along even for duplicated
791    ///     parents).
792    ///
793    /// Both populate `ForeignKey<U>.resolved` the same way so
794    /// downstream code (templates, serde) doesn't care which path
795    /// was used. Pick `join_related` when round-trip count dominates
796    /// (hot listing pages, small related tables) and `select_related`
797    /// when the related row is wide or only loaded for a subset of
798    /// the parent rows.
799    ///
800    /// Composes with `.select_related(other_fk)` and
801    /// `.prefetch_related(m2m)` — different fields can take different
802    /// paths in the same query.
803    ///
804    /// Multi-hop FK chains are supported: a `"__"`-separated path like
805    /// `"author__manager"` resolves one JOIN per hop in a single query.
806    ///
807    /// **Constraints**: FK fields must live in `model.fields` (M2M links
808    /// route through `prefetch_related`), and every related model along the
809    /// chain must be registered with the framework
810    /// (`App::builder().model::<U>()` or contributed by a plugin) so we can
811    /// resolve its column layout for the aliased SELECT.
812    pub fn join_related(mut self, field_name: impl Into<String>) -> Self {
813        self.join_related.push(JoinReq {
814            path: field_name.into(),
815            kind: None,
816        });
817        self
818    }
819
820    /// Sugar for chained [`Self::join_related`] calls.
821    pub fn join_related_many(mut self, field_names: &[&str]) -> Self {
822        for name in field_names {
823            self.join_related.push(JoinReq {
824                path: (*name).to_string(),
825                kind: None,
826            });
827        }
828        self
829    }
830
831    /// `LEFT JOIN` the related path — keeps parent rows whose relation
832    /// is absent (the relation hydrates as unresolved/None). Accepts a
833    /// nested path (`"plugin__author"`); the join type applies to the
834    /// deepest hop.
835    pub fn left_join_related(mut self, path: impl Into<String>) -> Self {
836        self.join_related.push(JoinReq {
837            path: path.into(),
838            kind: Some(JoinKind::Left),
839        });
840        self
841    }
842
843    /// `INNER JOIN` the related path — drops parent rows whose relation
844    /// is absent. The default for a NOT NULL FK.
845    pub fn inner_join_related(mut self, path: impl Into<String>) -> Self {
846        self.join_related.push(JoinReq {
847            path: path.into(),
848            kind: Some(JoinKind::Inner),
849        });
850        self
851    }
852
853    /// `RIGHT JOIN` the related path. Postgres-unconditional; SQLite
854    /// needs >= 3.39 — a runtime warning fires on older SQLite (see the
855    /// boot/runtime note in the joins docs). The precise version gate
856    /// lives at execute time (the SQLite driver's own error); the warn
857    /// is the early nudge.
858    pub fn right_join_related(mut self, path: impl Into<String>) -> Self {
859        self.join_related.push(JoinReq {
860            path: path.into(),
861            kind: Some(JoinKind::Right),
862        });
863        self
864    }
865
866    /// Eagerly load an M2M relation via a single batched join.
867    ///
868    /// After the main SELECT returns rows, one query of the shape
869    /// `SELECT j.parent_id, child.* FROM <child_table> child INNER
870    /// JOIN <junction> j ON child.<pk> = j.child_id WHERE
871    /// j.parent_id IN (...)` fetches every related child for every
872    /// parent in one round-trip. Each parent's `M2M.resolved` slot
873    /// is populated with its matching children.
874    ///
875    /// The M2M counterpart of [`Self::select_related`] for FKs —
876    /// same goal of killing N+1: a batch-loaded `prefetch_related('tags')`.
877    ///
878    /// ## Reverse-FK collections (post-#44)
879    ///
880    /// `prefetch_related` also loads `ReverseSet<C>` fields — the
881    /// "for each Post, give me every Comment that points at it"
882    /// shape. Declare the field on the parent with
883    /// `#[sqlx(skip)] #[serde(skip)]
884    /// #[umbral(reverse_fk = "<fk_col>")] pub <name>: ReverseSet<C>`
885    /// where `<fk_col>` names the FK column on `C` pointing back.
886    /// One `SELECT * FROM <child> WHERE <fk_col> IN (parent_pks)`
887    /// regardless of parent count — no N+1.
888    ///
889    /// ## Scope (v1)
890    ///
891    /// - **M2M + reverse-FK only.** FK fields go through
892    ///   [`Self::select_related`] (batched IN) or
893    ///   [`Self::join_related`] (LEFT JOIN).
894    /// - **i64 parent PK only.** Same constraint as the rest of the
895    ///   M2M plumbing; models with non-i64 PKs surface a clean
896    ///   compile error.
897    /// - **Unknown field name → loud error** (post-#42). If the
898    ///   name matches neither an M2M nor a `ReverseSet` field,
899    ///   fetch returns a clear `sqlx::Error::Protocol` pointing at
900    ///   the right method.
901    pub fn prefetch_related(mut self, field_name: impl Into<String>) -> Self {
902        self.prefetch_related.push(field_name.into());
903        self
904    }
905
906    /// Eagerly load multiple M2M relations. Sugar for chained
907    /// `.prefetch_related(name)` calls.
908    pub fn prefetch_related_many(mut self, field_names: &[&str]) -> Self {
909        for name in field_names {
910            self.prefetch_related.push(name.to_string());
911        }
912        self
913    }
914
915    /// Convert this QuerySet into a [`Subquery`] suitable for use in
916    /// an `IN (SELECT ...)` predicate. Projects only the named
917    /// column; the accumulated WHERE / ORDER BY survive.
918    ///
919    /// `Post::objects().filter(...).into_subquery("author_id")` →
920    /// `Subquery` you can hand to `user::ID.in_subquery(...)`.
921    pub fn into_subquery(self, col_name: &str) -> crate::orm::Subquery {
922        let mut q = self.build_query_for("sqlite");
923        q.clear_selects();
924        q.column(Alias::new(col_name));
925        crate::orm::Subquery::from_select(q)
926    }
927
928    /// Combine this QuerySet with `other` via SQL `UNION` (gap #28).
929    /// Both QuerySets must produce the same column shape — which
930    /// they always do here because both are typed `QuerySet<T>`.
931    /// Duplicates are removed (the de-duplicating UNION, not UNION
932    /// ALL).
933    pub fn union(self, other: QuerySet<T>) -> Self {
934        self.combine(other, sea_query::UnionType::Distinct)
935    }
936
937    /// Combine this QuerySet with `other` via SQL `INTERSECT`
938    /// (gap #28). Returns rows present in BOTH inputs.
939    pub fn intersect(self, other: QuerySet<T>) -> Self {
940        self.combine(other, sea_query::UnionType::Intersect)
941    }
942
943    /// Combine this QuerySet with `other` via SQL `EXCEPT`
944    /// (gap #28). Returns rows present in `self` but not in `other`.
945    pub fn except(self, other: QuerySet<T>) -> Self {
946        self.combine(other, sea_query::UnionType::Except)
947    }
948
949    /// Internal: attach `other`'s SelectStatement to `self`'s
950    /// SelectStatement with the given UnionType. Both sides apply
951    /// their accumulated predicates / ORDER BY before the union.
952    fn combine(mut self, other: QuerySet<T>, ty: sea_query::UnionType) -> Self {
953        let backend = "sqlite";
954        let other_select = other.build_query_for(backend);
955        // Fold our own predicates into the base query so the union
956        // sees them; further `.filter()` calls on the returned
957        // QuerySet would still apply to the OUTER (combined) query.
958        let mut base = self.build_query_for(backend);
959        self.predicates.clear();
960        base.union(ty, other_select);
961        self.query = base;
962        self
963    }
964
965    /// Emit `SELECT DISTINCT ...` for this query (gap #17). Most
966    /// useful when combined with [`Self::values`] to dedupe a
967    /// column-projected list (`distinct().values(&["tag"])`); the
968    /// full-row DISTINCT is rarely what you want.
969    ///
970    /// Postgres-specific `DISTINCT ON (cols)` is deferred until a
971    /// real consumer surfaces the need — the standard `DISTINCT`
972    /// covers most use cases.
973    pub fn distinct(mut self) -> Self {
974        self.query.distinct();
975        self
976    }
977}
978
979/// Resolve the pool to run a terminal against.
980///
981/// Precedence: explicit `.on(&pool)` / `.on_pg(&pool)` override wins;
982/// then the per-model database alias the Plugin contract published
983/// via `Plugin::database()` (FEATURES.md #6); then the `"default"`
984/// pool. Tests that skip the App builder pass an explicit pool and
985/// bypass the alias lookup entirely.
986/// Validate that every name passed to `.join_related(...)` resolves
987/// to a foreign-key field on `T`. Loud-error path that replaces the
988/// pre-#42 silent no-op (where an unknown field, an M2M field, or a
989/// non-FK column produced an empty JOIN list and a confusing
990/// "ForeignKey::resolved() returned None" downstream).
991fn validate_join_related_fields<T: Model>(fields: &[String]) -> Result<(), sqlx::Error> {
992    for field_name in fields {
993        // Nested path (`"plugin__author"` / `"tags__category"`):
994        // validate via the hop resolver. A leading M2M segment is a
995        // valid chain entry even though it isn't an FK on `T`, so
996        // accept it and let `apply_join_related` route it.
997        if field_name.contains("__") {
998            let first = field_name.split("__").next().unwrap_or(field_name);
999            let leads_m2m = T::M2M_RELATIONS.iter().any(|r| r.field_name == first);
1000            if leads_m2m || resolve_join_hops::<T>(field_name).is_some() {
1001                continue;
1002            }
1003            return Err(sqlx::Error::Protocol(format!(
1004                "umbral::orm::join_related: nested path `{field_name}` on `{}` has an \
1005                 unresolvable hop (each segment must be a FK or M2M to a registered model)",
1006                T::NAME
1007            )));
1008        }
1009        // Try as a regular column first.
1010        let col = T::FIELDS.iter().find(|f| f.name == field_name.as_str());
1011        if let Some(col) = col {
1012            if col.fk_target.is_some() {
1013                continue; // FK column — OK.
1014            }
1015            return Err(sqlx::Error::Protocol(format!(
1016                "umbral::orm::join_related: field `{field_name}` on `{}` is not a foreign \
1017                 key (it has no fk_target)",
1018                T::NAME
1019            )));
1020        }
1021        // M2M field name? Post-#113 these go through the double
1022        // LEFT JOIN path: apply_join_related emits
1023        // `LEFT JOIN <junction> LEFT JOIN <child>` with aliased
1024        // child cols, and fetch()'s dedup-aware path collects M2M
1025        // children per parent. Trade-off documented at the join_related
1026        // docstring — M2M JOINs multiply parent rows by avg
1027        // cardinality, so prefetch_related stays the default for
1028        // any list page where M2M cardinality isn't tiny.
1029        if T::M2M_RELATIONS
1030            .iter()
1031            .any(|r| r.field_name == field_name.as_str())
1032        {
1033            continue;
1034        }
1035        return Err(sqlx::Error::Protocol(format!(
1036            "umbral::orm::join_related: unknown field `{field_name}` on model `{}`",
1037            T::NAME
1038        )));
1039    }
1040    Ok(())
1041}
1042
1043/// One resolved hop of a `join_related` FK chain.
1044#[derive(Debug, Clone)]
1045pub(crate) struct JoinHop {
1046    /// FK column name on the *previous* level's table.
1047    pub(crate) fk_col: String,
1048    /// Table this hop targets.
1049    pub(crate) child_table: String,
1050    /// PK column on `child_table`.
1051    pub(crate) child_pk: String,
1052    /// Was the FK column nullable? (drives auto-inference)
1053    pub(crate) nullable: bool,
1054}
1055
1056/// Resolve a dotted FK path (`"plugin__author"`) into ordered hops.
1057/// Hop 0 reads `T::FIELDS`; deeper hops read the migrate registry's
1058/// `Column`s for the prior hop's target table. Returns `None` (skip,
1059/// emit no JOIN) on any unresolved hop — same forgiving posture as the
1060/// pre-existing one-hop path's silent skip in `to_sql`.
1061///
1062/// FK-only: a path whose FIRST segment is an M2M field is NOT handled
1063/// here (M2M chains route through `apply_join_related`'s M2M branch);
1064/// this returns `None` for such a path.
1065pub(crate) fn resolve_join_hops<T: Model>(path: &str) -> Option<Vec<JoinHop>> {
1066    let registered = crate::migrate::registered_models();
1067    let segs: Vec<&str> = path.split("__").filter(|s| !s.is_empty()).collect();
1068    if segs.is_empty() {
1069        return None;
1070    }
1071    let mut hops = Vec::with_capacity(segs.len());
1072    // Hop 0 off the typed parent.
1073    let f0 = T::FIELDS.iter().find(|f| f.name == segs[0])?;
1074    let t0 = f0.fk_target?;
1075    let m0 = registered.iter().find(|m| m.table == t0)?;
1076    let pk0 = m0.fields.iter().find(|c| c.primary_key)?;
1077    hops.push(JoinHop {
1078        fk_col: segs[0].to_string(),
1079        child_table: t0.to_string(),
1080        child_pk: pk0.name.clone(),
1081        nullable: f0.nullable,
1082    });
1083    let mut current = t0;
1084    for seg in &segs[1..] {
1085        let meta = registered.iter().find(|m| m.table == current)?;
1086        let col = meta.fields.iter().find(|c| c.name == *seg)?;
1087        let tgt = col.fk_target.as_deref()?;
1088        let tmeta = registered.iter().find(|m| m.table == tgt)?;
1089        let pk = tmeta.fields.iter().find(|c| c.primary_key)?;
1090        hops.push(JoinHop {
1091            fk_col: (*seg).to_string(),
1092            child_table: tgt.to_string(),
1093            child_pk: pk.name.clone(),
1094            nullable: col.nullable,
1095        });
1096        current = tgt;
1097    }
1098    Some(hops)
1099}
1100
1101/// Thin wrapper so the backend hydration helpers (a sibling module)
1102/// can resolve a chain without importing the private name directly.
1103pub(crate) fn resolve_join_hops_for<T: Model>(path: &str) -> Option<Vec<JoinHop>> {
1104    resolve_join_hops::<T>(path)
1105}
1106
1107/// Resolve a path whose FIRST segment is an M2M field on `T` into the
1108/// M2M child table + child PK + the onward FK chain hops off that
1109/// child. `onward` is empty for a bare `"tags"`; for `"tags__category"`
1110/// it carries the `category` FK hop off the child table. `None` when
1111/// `segs[0]` isn't an M2M field or any onward hop fails to resolve.
1112pub(crate) fn resolve_m2m_chain<T: Model>(path: &str) -> Option<(String, String, Vec<JoinHop>)> {
1113    let registered = crate::migrate::registered_models();
1114    let segs: Vec<&str> = path.split("__").filter(|s| !s.is_empty()).collect();
1115    let first = segs.first()?;
1116    let rel = T::M2M_RELATIONS.iter().find(|r| r.field_name == *first)?;
1117    let child_meta = registered.iter().find(|m| m.table == rel.target_table)?;
1118    let child_pk = child_meta.fields.iter().find(|c| c.primary_key)?;
1119    let mut onward = Vec::with_capacity(segs.len().saturating_sub(1));
1120    let mut current = rel.target_table;
1121    for seg in &segs[1..] {
1122        let meta = registered.iter().find(|m| m.table == current)?;
1123        let col = meta.fields.iter().find(|c| c.name == *seg)?;
1124        let tgt = col.fk_target.as_deref()?;
1125        let tmeta = registered.iter().find(|m| m.table == tgt)?;
1126        let pk = tmeta.fields.iter().find(|c| c.primary_key)?;
1127        onward.push(JoinHop {
1128            fk_col: (*seg).to_string(),
1129            child_table: tgt.to_string(),
1130            child_pk: pk.name.clone(),
1131            nullable: col.nullable,
1132        });
1133        current = tgt;
1134    }
1135    Some((rel.target_table.to_string(), child_pk.name.clone(), onward))
1136}
1137
1138/// Gap #111 — error returned when a typed terminal (`fetch` / `first`
1139/// / `get`) runs against a QuerySet that has `.only(...)` set. A
1140/// partial-column row can't satisfy `T`'s `FromRow` impl, so the
1141/// caller has to either drop the `.only(...)` (full SELECT, typed
1142/// rows back) or terminate via `.values(&[...])` (JSON rows with
1143/// just the requested columns). The message names the offending
1144/// terminal so the fix is one rename away.
1145fn only_with_typed_terminal_error(terminal: &'static str) -> sqlx::Error {
1146    sqlx::Error::Protocol(format!(
1147        "umbral::orm::{terminal}: cannot run a typed terminal on a QuerySet \
1148         with `.only(...)` set — a partial-column row can't hydrate `T` via \
1149         FromRow. Either drop `.only(...)` to fetch full typed rows, or \
1150         terminate via `.values(&[...])` to get JSON rows with just the \
1151         projected columns."
1152    ))
1153}
1154
1155fn resolve_pool<T: Model>(explicit: Option<DbPool>, op: crate::db::RouteOp) -> DbPool {
1156    if let Some(pool) = explicit {
1157        return pool;
1158    }
1159    // Route through the swappable router when the registry is up.
1160    if let Some(meta) = crate::migrate::model_meta_ref(T::NAME) {
1161        let ctx = crate::db::route_context::current();
1162        let r = crate::db::router::router();
1163        let alias = match op {
1164            crate::db::RouteOp::Read => r.db_for_read(meta, &ctx),
1165            crate::db::RouteOp::Write => r.db_for_write(meta, &ctx),
1166        };
1167        return crate::db::pool_for_dispatched(alias.as_str()).clone();
1168    }
1169    // Registry-less fallback (low-level tests): today's static behavior.
1170    if let Some(alias) = crate::migrate::model_alias(T::NAME) {
1171        return crate::db::pool_for_dispatched(&alias).clone();
1172    }
1173    crate::db::pool_dispatched().clone()
1174}
1175
1176/// Pin a QuerySet to an explicit pool, dispatching SQLite vs Postgres. Used by
1177/// the upsert paths (`get_or_create` / `update_or_create`) so their
1178/// existence-check reads run on the WRITE database — read-your-writes, so a
1179/// read/write-split router never probes a lagging replica and inserts a
1180/// duplicate (or reads a stale row back after the update).
1181fn pin_to_pool<T: Model>(qs: QuerySet<T>, pool: &DbPool) -> QuerySet<T> {
1182    match pool {
1183        DbPool::Sqlite(p) => qs.on(p),
1184        DbPool::Postgres(p) => qs.on_pg(p),
1185    }
1186}
1187
1188// GetError / TryForEachError moved to `errors`; re-exported above.
1189
1190/// Emit a one-shot advisory when a `right_join_related` is applied
1191/// against a SQLite pool.
1192///
1193/// RIGHT/FULL JOIN landed in SQLite 3.39 (June 2022); Postgres has
1194/// always supported it. The boot system check (`check.rs`) can't surface
1195/// this — it's synchronous, has no live pool, and whether a RIGHT join
1196/// is *reachable* is a runtime QuerySet fact rather than static model
1197/// metadata. So the spec's "boot warning" is realized here: the first
1198/// time a RIGHT join is built against a SQLite pool, we `tracing::warn!`
1199/// once per process. We do NOT probe the library version (that needs an
1200/// async round-trip the SQL builder doesn't have) — the precise gate is
1201/// the SQLite driver's own error at execute time on an engine < 3.39;
1202/// this warn is the early nudge, consistent with `check.rs`'s
1203/// `Severity::Warning` posture.
1204///
1205/// Postgres pools and the no-pool case (a pure `to_sql` build with no
1206/// app booted) are silent.
1207fn warn_right_join_on_sqlite() {
1208    use std::sync::Once;
1209    static ONCE: Once = Once::new();
1210    if matches!(
1211        crate::db::try_pool_dispatched(),
1212        Some(crate::db::DbPool::Sqlite(_))
1213    ) {
1214        ONCE.call_once(|| {
1215            tracing::warn!(
1216                "umbral::orm::right_join_related: RIGHT JOIN requires SQLite >= 3.39. \
1217                 If your SQLite is older the query will error at execute time; \
1218                 Postgres is unaffected. Prefer left_/inner_join_related on SQLite \
1219                 unless you've confirmed the engine version."
1220            );
1221        });
1222    }
1223}
1224
1225/// Terminal methods for every `QuerySet<T>` where `T: Model`.
1226///
1227/// Each terminal that materializes `T` carries a FromRow bound on the
1228/// method (not the impl block) — the conjunction of both backends'
1229/// FromRow impls. `#[derive(sqlx::FromRow)]` emits a generic-over-`R`
1230/// impl, so any user struct with standard field types satisfies both
1231/// bounds automatically.
1232impl<T: Model> QuerySet<T> {
1233    /// Render the SQL the QuerySet would execute, without running it.
1234    ///
1235    /// Returns the prepared statement with `?` placeholders for the
1236    /// bound values, exactly the string sqlx would send. Useful for
1237    /// `eprintln!`-style debugging and for tests that want to pin
1238    /// the rendered query without round-tripping through a pool.
1239    ///
1240    /// The bound values are intentionally not surfaced (sqlx's binder
1241    /// types aren't part of umbral's public surface); a `(sql, values)`
1242    /// accessor lands when EXPLAIN-style integration needs it.
1243    ///
1244    /// The rendered placeholder dialect is SQLite's (`?`). When the
1245    /// dispatched pool is Postgres the actual at-execute rendering
1246    /// uses `$1`-style placeholders; the `to_sql` debug surface
1247    /// continues to emit SQLite-style for stability across calls
1248    /// regardless of which pool is registered.
1249    pub fn to_sql(&self) -> String {
1250        let mut q = self.build_query_for("sqlite");
1251        self.apply_join_related(&mut q);
1252        self.apply_only_projection(&mut q);
1253        let (sql, _values) = q.build_sqlx(SqliteQueryBuilder);
1254        sql
1255    }
1256
1257    /// Render the QuerySet's SQL against the **Postgres** dialect,
1258    /// without running it. Companion to [`Self::to_sql`].
1259    ///
1260    /// The two render slightly different placeholder syntax (`?` for
1261    /// SQLite, `$1..$N` for Postgres) and any Postgres-specific
1262    /// operators like the array `@>` / `<@` / `&&` family only render
1263    /// correctly through this entry point — `to_sql`'s SQLite path
1264    /// leaves `$N` tokens in the template untouched. Use this when
1265    /// debugging a Postgres query or asserting on the rendered shape
1266    /// in tests.
1267    pub fn to_sql_pg(&self) -> String {
1268        let mut q = self.build_query_for("postgres");
1269        self.apply_join_related(&mut q);
1270        self.apply_only_projection(&mut q);
1271        let (sql, _values) = q.build_sqlx(PostgresQueryBuilder);
1272        sql
1273    }
1274
1275    /// Internal helper — when `.only(...)` was set, swap the SELECT
1276    /// list for just those columns. Shared by `to_sql` / `to_sql_pg`
1277    /// so the inspection surface stays in sync with what `values()`
1278    /// would emit. No-op when `only_cols` is `None`.
1279    fn apply_only_projection(&self, q: &mut sea_query::SelectStatement) {
1280        if let Some(cols) = &self.only_cols {
1281            q.clear_selects();
1282            for c in cols {
1283                q.column(Alias::new(c.as_str()));
1284            }
1285        }
1286    }
1287
1288    /// Internal helper — when `.join_related(name)` was set, wrap the
1289    /// current query as a subquery and build an outer SELECT that
1290    /// LEFT JOINs every requested related table (with child columns
1291    /// aliased as `<field>__<col>`). The subquery wrapper is the
1292    /// load-bearing trick: WHERE / ORDER BY / LIMIT predicates inside
1293    /// the inner query reference parent columns only — there's no
1294    /// JOIN in scope, so bare names like `id` resolve unambiguously
1295    /// to the parent. Without this wrapper SQLite raises
1296    /// "ambiguous column name: id" on any predicate sharing a column
1297    /// name with a JOIN'd table.
1298    ///
1299    /// No-op when `join_related` is empty. Unknown field names /
1300    /// unregistered related models / FK columns missing `fk_target`
1301    /// are silently skipped — the SQL just won't carry the JOIN and
1302    /// the caller notices when `ForeignKey::resolved()` stays empty.
1303    fn apply_join_related(&self, q: &mut sea_query::SelectStatement) {
1304        if self.join_related.is_empty() {
1305            return;
1306        }
1307        use sea_query::{Expr, Query};
1308        let registered = crate::migrate::registered_models();
1309
1310        // Inner-subquery column trim. When `.only(...)` is also set,
1311        // the outer SELECT only references a subset of parent
1312        // columns (plus the JOIN'd child columns it gets through
1313        // its alias). The inner subquery only needs to expose:
1314        //   - parent columns named in `.only(...)` (intersected
1315        //     with T::FIELDS so the JOIN aliases like
1316        //     `category__name` don't leak in here — those live on
1317        //     the JOIN'd table, not on the parent),
1318        //   - PLUS the FK columns each `.join_related(name)` needs
1319        //     for its `ON __p.<name> = <child>.<pk>` clause.
1320        // WHERE / ORDER BY inside the inner subquery can still
1321        // reference any parent column (SQL doesn't require an
1322        // ORDER BY column to be in the SELECT list), so we don't
1323        // need to promote those. Postgres often skips this prune
1324        // through subquery boundaries; SQLite usually doesn't —
1325        // either way trimming here is a measurable win on wide
1326        // tables (think 30-column Product on a busy hot path).
1327        if let Some(only) = &self.only_cols {
1328            let parent_field_names: std::collections::HashSet<&str> =
1329                T::FIELDS.iter().map(|f| f.name).collect();
1330            let mut needed: std::collections::HashSet<String> = only
1331                .iter()
1332                .filter(|c| parent_field_names.contains(c.as_str()))
1333                .cloned()
1334                .collect();
1335            for jr in &self.join_related {
1336                // Nested paths only need the FIRST hop's FK column at
1337                // the parent level; deeper hops join off the prior
1338                // level's alias, not the parent subquery.
1339                let join_field = jr.path.split("__").next().unwrap_or(jr.path.as_str());
1340                if parent_field_names.contains(join_field) {
1341                    needed.insert(join_field.to_string());
1342                }
1343            }
1344            if !needed.is_empty() {
1345                q.clear_selects();
1346                // Stable ordering so the SQL is deterministic
1347                // across runs (HashSet iteration is not).
1348                let mut ordered: Vec<String> = needed.into_iter().collect();
1349                ordered.sort();
1350                for col in &ordered {
1351                    q.column(Alias::new(col.as_str()));
1352                }
1353            }
1354        }
1355
1356        // Take ownership of the (possibly-trimmed) inner query and
1357        // re-mount it as the FROM clause of the new outer SELECT.
1358        let inner = std::mem::replace(q, Query::select().take());
1359        let parent_alias = Alias::new("__p");
1360        let mut outer = Query::select();
1361        outer.from_subquery(inner, parent_alias.clone());
1362        // Re-project the parent's full column set so the outer SELECT
1363        // exposes them unaliased — FromRow on `T` reads parent
1364        // columns by their bare names (`id`, `name`, ...). When
1365        // `.only(...)` later clears this list in
1366        // `apply_only_projection`, the inner-subquery trim above
1367        // means we still didn't pay for columns we ended up
1368        // dropping anyway.
1369        for f in T::FIELDS {
1370            outer.expr(Expr::col((parent_alias.clone(), Alias::new(f.name))));
1371        }
1372        // Set when any emitted hop is a RIGHT JOIN — drives the
1373        // once-per-process old-SQLite advisory after the emit loop.
1374        let mut emitted_right = false;
1375        for jr in &self.join_related {
1376            let field_name = &jr.path;
1377            // FK chain branch first. A (possibly nested) FK path splits
1378            // on `__` into ordered hops; each hop joins onto the prior
1379            // level's alias, and the DEEPEST hop's child columns are
1380            // aliased by the full dotted path so hydration can rebuild
1381            // the nested relation graph. The single-hop case is
1382            // byte-identical in child-column aliases to the pre-nesting
1383            // path (`<field>__<col>`); only the internal join alias
1384            // gains an `_h{idx}` suffix, which no test asserts.
1385            if let Some(hops) = resolve_join_hops::<T>(field_name) {
1386                let mut prev_alias = parent_alias.clone();
1387                let last = hops.len() - 1;
1388                // Cumulative dotted prefix per hop so EVERY level's own
1389                // columns ride along, aliased by its path-so-far. Hop 0
1390                // of `plugin__author` is `plugin`, hop 1 is
1391                // `plugin__author`. Selecting every level (not just the
1392                // leaf) is what lets hydration rebuild a FULL nested
1393                // object — the intermediate `plugin` row needs its own
1394                // `id`/`name` to deserialise into `ForeignKey<Plugin>`
1395                // before `author` nests inside it.
1396                let segs: Vec<&str> = field_name.split("__").collect();
1397                for (idx, hop) in hops.iter().enumerate() {
1398                    let hop_alias = Alias::new(format!("__j_{field_name}_h{idx}"));
1399                    // Last hop: explicit request, else infer from THIS
1400                    // hop's nullability. Intermediate hops always infer
1401                    // per-hop (an INNER can nest inside an outer
1402                    // LEFT etc.); an explicit kind only pins the leaf.
1403                    let kind = if idx == last {
1404                        jr.kind.unwrap_or(if hop.nullable {
1405                            JoinKind::Left
1406                        } else {
1407                            JoinKind::Inner
1408                        })
1409                    } else if hop.nullable {
1410                        JoinKind::Left
1411                    } else {
1412                        JoinKind::Inner
1413                    };
1414                    emitted_right |= kind == JoinKind::Right;
1415                    outer.join_as(
1416                        kind.sea(),
1417                        crate::db::router::schema_qualified_table(hop.child_table.as_str()),
1418                        hop_alias.clone(),
1419                        Expr::col((prev_alias.clone(), Alias::new(hop.fk_col.as_str())))
1420                            .equals((hop_alias.clone(), Alias::new(hop.child_pk.as_str()))),
1421                    );
1422                    if let Some(meta) = registered.iter().find(|m| m.table == hop.child_table) {
1423                        // Cumulative dotted prefix for this hop's columns.
1424                        let prefix = segs[..=idx].join("__");
1425                        for col in &meta.fields {
1426                            let alias = format!("{}__{}", prefix, col.name);
1427                            outer.expr_as(
1428                                Expr::col((hop_alias.clone(), Alias::new(col.name.as_str()))),
1429                                Alias::new(alias),
1430                            );
1431                        }
1432                    }
1433                    prev_alias = hop_alias;
1434                }
1435                continue;
1436            }
1437
1438            // M2M branch (post-#113). Emit the double LEFT JOIN
1439            // through the junction table:
1440            //   LEFT JOIN <junction> AS __jm_<field>
1441            //     ON __p.<parent_pk> = __jm_<field>.parent_id
1442            //   LEFT JOIN <child_table> AS __j_<field>
1443            //     ON __jm_<field>.child_id = __j_<field>.<child_pk>
1444            // Aliased child cols use the same `<field>__<col>` shape
1445            // as the FK branch so the decode helper can be reused.
1446            // The M2M field is the FIRST segment of the path; a nested
1447            // path like `"tags__category"` passes THROUGH the M2M hop
1448            // and continues with an onward FK chain off the child.
1449            let m2m_seg = field_name.split("__").next().unwrap_or(field_name.as_str());
1450            if let Some(m2m_rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg)
1451                && let Some(parent_pk) = T::FIELDS.iter().find(|f| f.primary_key)
1452                && let Some(child_meta) =
1453                    registered.iter().find(|m| m.table == m2m_rel.target_table)
1454                && let Some(child_pk) = child_meta.fields.iter().find(|c| c.primary_key)
1455            {
1456                // Junction table + aliases key off the M2M field name
1457                // (segs[0]), NOT the full dotted path.
1458                let junction_table = format!("{}_{}", T::TABLE, m2m_seg);
1459                let junction_alias = Alias::new(format!("__jm_{m2m_seg}"));
1460                let child_alias = Alias::new(format!("__j_{m2m_seg}"));
1461                // The junction hop stays LEFT so a parent with zero
1462                // junction rows isn't dropped by the join to the
1463                // junction table itself — the CHILD hop's kind is what
1464                // decides drop/keep. Plain `join_related` (kind None)
1465                // leaves the child LEFT too, preserving the shipped
1466                // double-LEFT-JOIN M2M behavior (a tag-less parent
1467                // survives with an empty M2M slot). An explicit
1468                // inner_join_related drops parents whose relation is
1469                // absent: the junction-LEFT miss yields a NULL child_id,
1470                // then the child INNER on NULL has no match -> the parent
1471                // is dropped, which is the INNER contract.
1472                let child_kind = jr.kind.unwrap_or(JoinKind::Left);
1473                emitted_right |= child_kind == JoinKind::Right;
1474                outer.join_as(
1475                    sea_query::JoinType::LeftJoin,
1476                    crate::db::router::schema_qualified_table(&junction_table),
1477                    junction_alias.clone(),
1478                    Expr::col((parent_alias.clone(), Alias::new(parent_pk.name)))
1479                        .equals((junction_alias.clone(), Alias::new("parent_id"))),
1480                );
1481                outer.join_as(
1482                    child_kind.sea(),
1483                    crate::db::router::schema_qualified_table(m2m_rel.target_table),
1484                    child_alias.clone(),
1485                    Expr::col((junction_alias.clone(), Alias::new("child_id")))
1486                        .equals((child_alias.clone(), Alias::new(child_pk.name.as_str()))),
1487                );
1488                // Child columns aliased by the M2M field name so the
1489                // M2M decode path (`<m2m_field>__<col>`) reads them.
1490                for col in &child_meta.fields {
1491                    let alias = format!("{}__{}", m2m_seg, col.name);
1492                    outer.expr_as(
1493                        Expr::col((child_alias.clone(), Alias::new(col.name.as_str()))),
1494                        Alias::new(alias),
1495                    );
1496                }
1497                // Onward FK chain off the child (segs[1..]). Each hop
1498                // joins onto the prior level's alias and aliases its
1499                // columns by the cumulative dotted path
1500                // (`tags__category__name`) so the M2M decode path can
1501                // nest the onward object into each child row.
1502                if let Some((_child_table, _child_pk, onward)) = resolve_m2m_chain::<T>(field_name)
1503                {
1504                    let segs: Vec<&str> = field_name.split("__").collect();
1505                    let mut prev_alias = child_alias.clone();
1506                    for (i, hop) in onward.iter().enumerate() {
1507                        // segs index for this hop: segs[0] is the M2M
1508                        // field, segs[1] is onward[0], etc.
1509                        let seg_idx = i + 1;
1510                        let hop_alias = Alias::new(format!("__j_{m2m_seg}_o{i}"));
1511                        let kind = if hop.nullable {
1512                            JoinKind::Left
1513                        } else {
1514                            JoinKind::Inner
1515                        };
1516                        outer.join_as(
1517                            kind.sea(),
1518                            crate::db::router::schema_qualified_table(hop.child_table.as_str()),
1519                            hop_alias.clone(),
1520                            Expr::col((prev_alias.clone(), Alias::new(hop.fk_col.as_str())))
1521                                .equals((hop_alias.clone(), Alias::new(hop.child_pk.as_str()))),
1522                        );
1523                        if let Some(meta) = registered.iter().find(|m| m.table == hop.child_table) {
1524                            let prefix = segs[..=seg_idx].join("__");
1525                            for col in &meta.fields {
1526                                let alias = format!("{}__{}", prefix, col.name);
1527                                outer.expr_as(
1528                                    Expr::col((hop_alias.clone(), Alias::new(col.name.as_str()))),
1529                                    Alias::new(alias),
1530                                );
1531                            }
1532                        }
1533                        prev_alias = hop_alias;
1534                    }
1535                }
1536                continue;
1537            }
1538        }
1539        // A RIGHT JOIN against SQLite needs >= 3.39; warn once per
1540        // process. Postgres / no-pool builds stay silent.
1541        if emitted_right {
1542            warn_right_join_on_sqlite();
1543        }
1544        *q = outer;
1545    }
1546
1547    /// Run the SELECT and return every matching row.
1548    ///
1549    /// If `.select_related(name)` was called, a follow-up batch query
1550    /// populates `ForeignKey<U>.resolved` for each named field before
1551    /// the rows are returned.
1552    pub async fn fetch(self) -> Result<Vec<T>, sqlx::Error>
1553    where
1554        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1555            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1556            + HydrateRelated,
1557    {
1558        if self.only_cols.is_some() {
1559            return Err(only_with_typed_terminal_error("fetch"));
1560        }
1561        let sr_fields = self.select_related.clone();
1562        let prefetch_fields = self.prefetch_related.clone();
1563        let join_reqs = self.join_related.clone();
1564        let join_fields: Vec<String> = join_reqs.iter().map(|j| j.path.clone()).collect();
1565        // Validate join_related field names up front so a typo or an
1566        // M2M field name doesn't silently no-op (it used to render a
1567        // SELECT with no JOIN). Pre-#42 the failure mode was
1568        // `ForeignKey::resolved()` stays None and the caller debugs
1569        // the wrong thing. Now they get a typed error.
1570        validate_join_related_fields::<T>(&join_fields)?;
1571        // The turbofish on `query_as_with::<DB, _, _>` is load-bearing:
1572        // with both `sqlx-sqlite` and `sqlx-postgres` features on
1573        // sea-query-binder, `SqlxValues` implements `IntoArguments` for
1574        // both backends, so the compiler can't infer DB from the values
1575        // alone. Naming DB explicitly pins which `FromRow` bound is
1576        // checked.
1577        // Split join_fields into FK vs M2M groups up front. The
1578        // M2M branch needs parent dedup (one parent row per JOIN
1579        // combo would surface duplicate Ts to the caller); the FK
1580        // branch is one-to-one with rows.
1581        // A path whose FIRST segment is an M2M field routes to the M2M
1582        // group even when it continues with an onward FK chain
1583        // (`"tags__category"`): the junction double-join + parent dedup
1584        // live on the M2M side, and the onward FK nests into each child.
1585        let (m2m_join_fields, fk_join_fields): (Vec<String>, Vec<String>) =
1586            join_fields.iter().cloned().partition(|f| {
1587                let first = f.split("__").next().unwrap_or(f.as_str());
1588                T::M2M_RELATIONS.iter().any(|r| r.field_name == first)
1589            });
1590        let has_m2m_join = !m2m_join_fields.is_empty();
1591
1592        let mut rows = match resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read)
1593        {
1594            DbPool::Sqlite(pool) => {
1595                let mut q = self.build_query_for("sqlite");
1596                self.apply_join_related(&mut q);
1597                let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1598                if join_fields.is_empty() {
1599                    sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
1600                        .fetch_all(&pool)
1601                        .await?
1602                } else if !has_m2m_join {
1603                    // FK-only JOIN path: one row in → one T out.
1604                    let raw_rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
1605                        .fetch_all(&pool)
1606                        .await?;
1607                    let mut typed = Vec::with_capacity(raw_rows.len());
1608                    for row in &raw_rows {
1609                        let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
1610                        backend_sqlite::hydrate_joined_rels::<T>(&mut t, row, &fk_join_fields)?;
1611                        typed.push(t);
1612                    }
1613                    typed
1614                } else {
1615                    // Mixed (FK + M2M) or pure M2M JOIN path:
1616                    // dedup parents, collect M2M children per
1617                    // (parent_pk, field).
1618                    let raw_rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
1619                        .fetch_all(&pool)
1620                        .await?;
1621                    dedup_decode_sqlite::<T>(&raw_rows, &fk_join_fields, &m2m_join_fields)?
1622                }
1623            }
1624            DbPool::Postgres(pool) => {
1625                let mut q = self.build_query_for("postgres");
1626                self.apply_join_related(&mut q);
1627                let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1628                if join_fields.is_empty() {
1629                    sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
1630                        .fetch_all(&pool)
1631                        .await?
1632                } else if !has_m2m_join {
1633                    let raw_rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
1634                        .fetch_all(&pool)
1635                        .await?;
1636                    let mut typed = Vec::with_capacity(raw_rows.len());
1637                    for row in &raw_rows {
1638                        let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
1639                        backend_pg::hydrate_joined_rels::<T>(&mut t, row, &fk_join_fields)?;
1640                        typed.push(t);
1641                    }
1642                    typed
1643                } else {
1644                    let raw_rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
1645                        .fetch_all(&pool)
1646                        .await?;
1647                    dedup_decode_pg::<T>(&raw_rows, &fk_join_fields, &m2m_join_fields)?
1648                }
1649            }
1650        };
1651        // BUG-16 step 2: wire each row's PK into its `M2M<U>` slots so
1652        // `add`/`remove`/`clear` know which parent they belong to.
1653        // No-op for models with no M2M fields.
1654        for r in &mut rows {
1655            r.set_m2m_parent_ids();
1656        }
1657        if !sr_fields.is_empty() {
1658            let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1659            hydrate_select_related::<T>(&mut rows, &sr_fields, &pool).await?;
1660        }
1661        if !prefetch_fields.is_empty() {
1662            let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1663            hydrate_prefetch_related::<T>(&mut rows, &prefetch_fields, &pool).await?;
1664        }
1665        Ok(rows)
1666    }
1667
1668    /// Feature 29 Phase 1 — chunked streaming via a callback.
1669    ///
1670    /// Runs the SELECT in pages of `chunk_size` rows and invokes
1671    /// `callback` once per row. Memory bound = `chunk_size *
1672    /// sizeof::<T>` instead of the full row count `fetch()` would
1673    /// buffer, so this is the right shape for million-row exports,
1674    /// migrations, and batch transforms.
1675    ///
1676    /// Deliberately NOT named `iterator()` — that name suggests a
1677    /// `Stream`-shaped return value, which would force a
1678    /// `futures-util` dep. The callback shape is idiomatic Rust,
1679    /// requires no new crates, and ships the same memory bound. A
1680    /// future `iterator()` returning `BoxStream<T>` can land later
1681    /// once `futures-util` is in the workspace for some other reason
1682    /// (likely SSE / WebSockets).
1683    ///
1684    /// Error contract: the callback may return any error type `E`.
1685    /// SQL failures become `TryForEachError::Sqlx`; callback errors
1686    /// become `TryForEachError::Callback(e)`. The first error stops
1687    /// the walk — subsequent rows are not fetched.
1688    ///
1689    /// Caveats: pages are stable only if the result set isn't being
1690    /// mutated concurrently. For consistent-snapshot iteration over
1691    /// a live table, wrap the call in a serialised-or-repeatable-read
1692    /// transaction. `select_related` and `prefetch_related` hooks
1693    /// are NOT applied on each row — `try_for_each` is intentionally
1694    /// the "raw column data, one row at a time" terminal.
1695    pub async fn try_for_each<F, E>(
1696        self,
1697        chunk_size: usize,
1698        mut callback: F,
1699    ) -> Result<(), TryForEachError<E>>
1700    where
1701        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1702            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1703            + HydrateRelated,
1704        F: FnMut(T) -> Result<(), E>,
1705    {
1706        let chunk_size = chunk_size.max(1);
1707        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1708        let mut offset: u64 = 0;
1709        loop {
1710            let mut rows: Vec<T> = match &pool {
1711                DbPool::Sqlite(pg) => {
1712                    let mut q = self.build_query_for("sqlite");
1713                    q.limit(chunk_size as u64).offset(offset);
1714                    let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1715                    sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
1716                        .fetch_all(pg)
1717                        .await
1718                        .map_err(TryForEachError::Sqlx)?
1719                }
1720                DbPool::Postgres(pg) => {
1721                    let mut q = self.build_query_for("postgres");
1722                    q.limit(chunk_size as u64).offset(offset);
1723                    let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1724                    sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
1725                        .fetch_all(pg)
1726                        .await
1727                        .map_err(TryForEachError::Sqlx)?
1728                }
1729            };
1730            let fetched = rows.len();
1731            if fetched == 0 {
1732                break;
1733            }
1734            for row in rows.drain(..) {
1735                callback(row).map_err(TryForEachError::Callback)?;
1736            }
1737            if fetched < chunk_size {
1738                break;
1739            }
1740            offset += fetched as u64;
1741        }
1742        Ok(())
1743    }
1744
1745    /// Run the SELECT with LIMIT 1 and return the first row, if any.
1746    pub async fn first(mut self) -> Result<Option<T>, sqlx::Error>
1747    where
1748        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1749            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1750            + HydrateRelated,
1751    {
1752        if self.only_cols.is_some() {
1753            return Err(only_with_typed_terminal_error("first"));
1754        }
1755        // Review #2: delegate to `fetch()` with LIMIT 1 so select_related,
1756        // prefetch_related, AND join_related are all hydrated. `first()`
1757        // used to build a plain query and hydrate only select_related, so
1758        // `.prefetch_related("tags").first()` returned an unprefetched row
1759        // and `.join_related("author").first()` an unresolved join — both
1760        // silently. (For a to-many `join_related`, LIMIT 1 truncates the
1761        // joined children the same way `fetch()` does with `.limit(1)`;
1762        // prefer `prefetch_related` there.)
1763        self.query.limit(1);
1764        let rows = self.fetch().await?;
1765        Ok(rows.into_iter().next())
1766    }
1767
1768    /// Return the row with the smallest value in `col_name`. Sugar
1769    /// for `order_by(col.asc()).first()`. The `earliest('created_at')`
1770    /// terminal.
1771    ///
1772    /// Takes a `&'static str` column name (same shape as
1773    /// `select_related`) so the call site stays terse:
1774    /// `.earliest("created_at")` reads naturally without spelling out
1775    /// `.asc()`.
1776    pub async fn earliest(self, col_name: &'static str) -> Result<Option<T>, sqlx::Error>
1777    where
1778        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1779            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1780            + HydrateRelated,
1781    {
1782        self.order_by(OrderExpr::new(col_name, false)).first().await
1783    }
1784
1785    /// Return the row with the largest value in `col_name`. Sugar
1786    /// for `order_by(col.desc()).first()`. The `latest('created_at')`
1787    /// terminal.
1788    pub async fn latest(self, col_name: &'static str) -> Result<Option<T>, sqlx::Error>
1789    where
1790        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1791            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1792            + HydrateRelated,
1793    {
1794        self.order_by(OrderExpr::new(col_name, true)).first().await
1795    }
1796
1797    /// Fetch many rows by their primary keys and return a
1798    /// `HashMap<T::PrimaryKey, T>` keyed by PK. The everyday companion to
1799    /// a cached list of ids — `User::objects().in_bulk(user_ids)` gives
1800    /// you direct lookup access without a second `.iter().find(...)` pass
1801    /// per id.
1802    ///
1803    /// Missing ids are silently absent from the map; callers that
1804    /// need the existence check can compare `map.len()` to
1805    /// `pks.len()`. Empty input is a no-op (returns the empty map).
1806    ///
1807    /// PK-agnostic (PK lift — was `Vec<i64>` / `HashMap<i64, T>`): the key
1808    /// is the model's `PrimaryKey` type, so i64-, String/slug-, and
1809    /// Uuid-keyed models all work. The map key requires `Hash + Eq`,
1810    /// which every standard PK type (integers, `String`, `Uuid`) satisfies.
1811    pub async fn in_bulk(
1812        self,
1813        pks: Vec<T::PrimaryKey>,
1814    ) -> Result<HashMap<T::PrimaryKey, T>, sqlx::Error>
1815    where
1816        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1817            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1818            + HydrateRelated,
1819        T::PrimaryKey: std::hash::Hash + Eq,
1820    {
1821        if pks.is_empty() {
1822            return Ok(HashMap::new());
1823        }
1824        let pk_name = pk_field::<T>().map(|f| f.name).unwrap_or("id");
1825        // Each PK converts to a `sea_query::Value` (the `PrimaryKey` trait
1826        // bounds `Into<sea_query::Value>`); wrap as a `SimpleExpr` for the
1827        // IN-list so any PK shape binds correctly.
1828        let pk_pred: Predicate<T> = Predicate::new(
1829            Expr::col(Alias::new(pk_name)).is_in(
1830                pks.into_iter()
1831                    .map(|p| sea_query::SimpleExpr::Value(p.into())),
1832            ),
1833        );
1834        let rows = self.filter(pk_pred).fetch().await?;
1835        let mut out: HashMap<T::PrimaryKey, T> = HashMap::with_capacity(rows.len());
1836        for row in rows {
1837            out.insert(row.primary_key(), row);
1838        }
1839        Ok(out)
1840    }
1841
1842    /// Return the database's execution plan for this query as a
1843    /// plain-text string. Doesn't run the underlying query — just
1844    /// asks the DB how it would be executed.
1845    ///
1846    /// Backend dispatch:
1847    ///
1848    /// - SQLite: `EXPLAIN QUERY PLAN <sql>` — returns the planner's
1849    ///   nested loop hierarchy, one row per access step.
1850    /// - Postgres: `EXPLAIN <sql>` — returns the default text plan.
1851    ///   For machine-readable output use raw sqlx with
1852    ///   `EXPLAIN (FORMAT JSON)`; the framework defaults to text
1853    ///   because most callers want eyeball-able output.
1854    ///
1855    /// Lines are joined with newlines. The returned string is what a
1856    /// developer would paste into a debugger or a perf-review issue.
1857    pub async fn explain(self) -> Result<String, sqlx::Error> {
1858        // Annotations are part of the built query (they render inside
1859        // build_query_for), so the plan below includes them — but a
1860        // poisoned annotation (unknown relation) must fail loudly
1861        // here, not silently vanish from the plan.
1862        self.check_annotations()?;
1863        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1864        let backend = pool.backend_name();
1865        let q = self.build_query_for(backend);
1866        match pool {
1867            DbPool::Sqlite(pool) => {
1868                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1869                let explain_sql = format!("EXPLAIN QUERY PLAN {sql}");
1870                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&explain_sql, vals)
1871                    .fetch_all(&pool)
1872                    .await?;
1873                let mut out = String::new();
1874                for row in &rows {
1875                    use sqlx::Row;
1876                    // SQLite returns: id, parent, notused, detail.
1877                    // The `detail` column is the human-readable step.
1878                    let detail: String = row.try_get("detail")?;
1879                    if !out.is_empty() {
1880                        out.push('\n');
1881                    }
1882                    out.push_str(&detail);
1883                }
1884                Ok(out)
1885            }
1886            DbPool::Postgres(pool) => {
1887                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1888                let explain_sql = format!("EXPLAIN {sql}");
1889                let rows = sqlx::query_with::<sqlx::Postgres, _>(&explain_sql, vals)
1890                    .fetch_all(&pool)
1891                    .await?;
1892                let mut out = String::new();
1893                for row in &rows {
1894                    use sqlx::Row;
1895                    // Postgres EXPLAIN returns one column named
1896                    // "QUERY PLAN", one row per line of the plan.
1897                    let line: String = row.try_get("QUERY PLAN")?;
1898                    if !out.is_empty() {
1899                        out.push('\n');
1900                    }
1901                    out.push_str(&line);
1902                }
1903                Ok(out)
1904            }
1905        }
1906    }
1907
1908    /// Run `SELECT COUNT(*)` against the same FROM + WHERE.
1909    ///
1910    /// Reshapes the query rather than wrapping the existing SELECT: the
1911    /// projection becomes `COUNT(*)` and LIMIT/OFFSET drop away. ORDER
1912    /// BY is harmless on a scalar aggregate and is left in place. The
1913    /// row type is `(i64,)` so the FromRow constraint comes from sqlx's
1914    /// tuple impl rather than the user struct — count() doesn't need
1915    /// T's FromRow bounds.
1916    pub async fn count(self) -> Result<i64, sqlx::Error> {
1917        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1918        let backend = pool.backend_name();
1919        // Build the dialect-appropriate filtered query first, then
1920        // rebuild as COUNT. Doing it in this order keeps the predicate
1921        // walk pluggable per backend without duplicating the COUNT
1922        // rewrite logic across branches.
1923        let mut rebuilt = self.build_query_for(backend);
1924        rebuilt.clear_selects();
1925        // Postgres rejects `"*"` as a quoted identifier (SQLite tolerates
1926        // it); use sea_query's Asterisk token which renders bare `*`
1927        // on both backends.
1928        rebuilt.expr(Func::count(Expr::col(sea_query::Asterisk)));
1929        rebuilt.reset_limit();
1930        rebuilt.reset_offset();
1931
1932        match pool {
1933            DbPool::Sqlite(pool) => {
1934                let (sql, values) = rebuilt.build_sqlx(SqliteQueryBuilder);
1935                let (n,): (i64,) = sqlx::query_as_with::<sqlx::Sqlite, (i64,), _>(&sql, values)
1936                    .fetch_one(&pool)
1937                    .await?;
1938                Ok(n)
1939            }
1940            DbPool::Postgres(pool) => {
1941                let (sql, values) = rebuilt.build_sqlx(PostgresQueryBuilder);
1942                let (n,): (i64,) = sqlx::query_as_with::<sqlx::Postgres, (i64,), _>(&sql, values)
1943                    .fetch_one(&pool)
1944                    .await?;
1945                Ok(n)
1946            }
1947        }
1948    }
1949
1950    /// Return whether any row matches.
1951    ///
1952    /// M1 keeps the simple form: add LIMIT 1, fetch, check non-empty.
1953    /// A later milestone may swap the projection for `SELECT 1` to
1954    /// skip column materialisation.
1955    pub async fn exists(self) -> Result<bool, sqlx::Error>
1956    where
1957        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1958            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1959            + HydrateRelated,
1960    {
1961        let rows = self.limit(1).fetch().await?;
1962        Ok(!rows.is_empty())
1963    }
1964
1965    /// `.get()` — the exactly-one terminal.
1966    ///
1967    /// Returns `Ok(row)` when the filter chain matches exactly one
1968    /// row. The two not-exactly-one cases each get their own
1969    /// `GetError` variant so the caller can branch deliberately:
1970    ///
1971    /// - [`GetError::NotFound`] — zero rows matched. The right
1972    ///   choice for "fetch the row this user just clicked on; 404
1973    ///   if it's gone."
1974    /// - [`GetError::MultipleObjectsReturned`] — more than one row
1975    ///   matched. The right choice for filters that should be
1976    ///   uniquely-keyed (e.g. `.filter(user::EMAIL.eq("..."))`
1977    ///   when email has a UNIQUE constraint); a result of 2+ is a
1978    ///   data-integrity bug worth crashing on.
1979    /// - The underlying sqlx error wraps as [`GetError::Sqlx`].
1980    ///
1981    /// Internally this issues `SELECT ... LIMIT 2` — the cheapest
1982    /// way to distinguish "one row" from "many." The second row, if
1983    /// it exists, isn't materialised beyond the bare FromRow call.
1984    ///
1985    /// ```ignore
1986    /// match Post::objects().filter(post::ID.eq(42)).get().await {
1987    ///     Ok(p)                                            => /* render */,
1988    ///     Err(GetError::NotFound)                          => /* 404 */,
1989    ///     Err(GetError::MultipleObjectsReturned)           => unreachable!("ID is unique"),
1990    ///     Err(GetError::Sqlx(e))                           => /* 500 */,
1991    /// }
1992    /// ```
1993    pub async fn get(self) -> Result<T, GetError>
1994    where
1995        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1996            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1997            + HydrateRelated,
1998    {
1999        let mut rows = self.limit(2).fetch().await.map_err(GetError::Sqlx)?;
2000        match rows.len() {
2001            0 => Err(GetError::NotFound),
2002            1 => Ok(rows.pop().unwrap()),
2003            _ => Err(GetError::MultipleObjectsReturned),
2004        }
2005    }
2006
2007    // =====================================================================
2008    // Postgres-only terminals (Phase 4.1).
2009    //
2010    // Models with Postgres-only field types (`Vec<T>` arrays, the future
2011    // Hstore / CIDR / FullTextSearch types) can't satisfy the dual
2012    // FromRow bound on `fetch` / `first` / `count` / `exists`. These
2013    // `_pg` variants bound on `FromRow<PgRow>` alone, take the pool as
2014    // an argument, and skip the dispatch — the call site explicitly
2015    // says "this model is Postgres-only."
2016    //
2017    // For models with portable fields, the existing `fetch` etc. stay
2018    // the recommended call: they pick up the ambient pool and route
2019    // through `.on(&pool)` / `.on_pg(&pool)` overrides exactly as
2020    // Phase 2.5 documented.
2021    // =====================================================================
2022
2023    // =====================================================================
2024    // Write terminals — DELETE and UPDATE.
2025    //
2026    // Both apply the accumulated filter predicates as the WHERE clause,
2027    // dispatch to the resolved pool's backend, and return the affected-
2028    // rows count from sqlx. No row materialisation — DELETE is keyless,
2029    // and UPDATE doesn't do a RETURNING read-back at v1 (use
2030    // `.filter(...).fetch()` after a write if you need the updated
2031    // rows back).
2032    //
2033    // **Without a `.filter(...)`, both terminals affect every row in
2034    // the table.** That mirrors raw SQL semantics; the type system
2035    // can't distinguish "I forgot the filter" from "I really meant to
2036    // truncate." Users protecting against accidental full-table writes
2037    // wrap their callers or assert a row count via `.count()` first.
2038    // =====================================================================
2039
2040    /// Project the query to only the named columns, returning a
2041    /// vector of `serde_json::Value::Object` rows instead of typed
2042    /// `T` instances. The columns-projection terminal: `values('id', 'title')`.
2043    ///
2044    /// Use when a list view only needs a few fields — skipping the
2045    /// 50KB body BLOB on every Post saves both memory and the
2046    /// FromRow hydration overhead. Each returned `Value` is an
2047    /// object keyed by the requested column names, with values
2048    /// typed per the column's declared SqlType (integers stay
2049    /// integers, booleans stay booleans, dates render as ISO
2050    /// strings).
2051    ///
2052    /// Unknown column names fail loudly with
2053    /// `sqlx::Error::Protocol` naming the offending column.
2054    /// Composes with `filter`, `exclude`, `order_by`, `limit`,
2055    /// `offset` exactly the way the typed terminals do.
2056    ///
2057    /// ```rust,ignore
2058    /// let rows = Post::objects()
2059    ///     .filter(post::PUBLISHED.eq(true))
2060    ///     .order_by(post::ID.desc())
2061    ///     .values(&["id", "title"])
2062    ///     .await?;
2063    /// // [ { "id": 3, "title": "c" }, ... ]
2064    /// ```
2065    pub async fn values(self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
2066        // Gap #46 follow-up: if any name uses `__` traversal
2067        // (`author__id`), route to the JOIN-aware path that builds
2068        // nested per-relation JSON objects. The unbranched path
2069        // below stays byte-for-byte identical for the common
2070        // parent-cols-only case.
2071        if columns.iter().any(|c| c.contains("__")) {
2072            return self.values_with_traversal(columns).await;
2073        }
2074        let meta = crate::migrate::ModelMeta::for_::<T>();
2075        // Resolve every requested name against the model's metadata
2076        // up front so an unknown column errors before any SQL runs.
2077        let mut chosen: Vec<&crate::migrate::Column> = Vec::with_capacity(columns.len());
2078        for name in columns {
2079            let col = meta
2080                .fields
2081                .iter()
2082                .find(|c| c.name == *name)
2083                .ok_or_else(|| {
2084                    sqlx::Error::Protocol(format!(
2085                        "umbral::orm::values: unknown column `{}` on model `{}`",
2086                        name,
2087                        T::NAME
2088                    ))
2089                })?;
2090            chosen.push(col);
2091        }
2092        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2093        let backend = pool.backend_name();
2094        // Build the base query (predicates + ORDER BY) then swap its
2095        // SELECT list for only the requested columns.
2096        let mut q = self.build_query_for(backend);
2097        q.clear_selects();
2098        for col in &chosen {
2099            q.column(Alias::new(col.name.as_str()));
2100        }
2101        match pool {
2102            DbPool::Sqlite(pool) => {
2103                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2104                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2105                    .fetch_all(&pool)
2106                    .await?;
2107                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2108                for row in &rows {
2109                    let mut obj = serde_json::Map::with_capacity(chosen.len());
2110                    for col in &chosen {
2111                        let v = crate::orm::dynamic::decode_to_json(row, col)?;
2112                        obj.insert(col.name.clone(), v);
2113                    }
2114                    out.push(JsonValue::Object(obj));
2115                }
2116                Ok(out)
2117            }
2118            DbPool::Postgres(pool) => {
2119                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2120                let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2121                    .fetch_all(&pool)
2122                    .await?;
2123                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2124                for row in &rows {
2125                    let mut obj = serde_json::Map::with_capacity(chosen.len());
2126                    for col in &chosen {
2127                        let v = crate::orm::dynamic::decode_pg_to_json(row, col)?;
2128                        obj.insert(col.name.clone(), v);
2129                    }
2130                    out.push(JsonValue::Object(obj));
2131                }
2132                Ok(out)
2133            }
2134        }
2135    }
2136
2137    /// `.values("author__name")`-style traversal. One-hop only at
2138    /// v1 (`a__b`, not `a__b__c` — that fails loudly so the user
2139    /// doesn't get a silent partial result). Emits one LEFT JOIN
2140    /// per distinct relation referenced, aliases child columns as
2141    /// `<rel>__<col>`, and returns each row as a nested JSON
2142    /// object: `{id, title, author: {id, name}, editor: {id}}`.
2143    ///
2144    /// A LEFT JOIN miss (nullable FK pointing at nothing) maps the
2145    /// whole relation key to `Value::Null` rather than a nested
2146    /// object full of nulls — caller code that branches on
2147    /// `obj["author"].is_null()` works naturally.
2148    ///
2149    /// Validates every name up front so a typo errors before any
2150    /// SQL runs. Unknown parent col / non-FK relation name /
2151    /// unknown child col / deeper-than-one-hop path all surface
2152    /// distinct messages.
2153    async fn values_with_traversal(self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
2154        use sea_query::{Expr, Query};
2155        let meta = crate::migrate::ModelMeta::for_::<T>();
2156        let registered = crate::migrate::registered_models();
2157
2158        // Split each column name into (relation, child_col) or
2159        // (None, parent_col). Reject paths with more than one `__`
2160        // hop — nested traversal across two relation layers is a
2161        // separate piece of work (it'd need to chain JOINs and
2162        // build doubly-nested JSON).
2163        let mut parent_cols: Vec<String> = Vec::new();
2164        let mut per_rel: std::collections::BTreeMap<String, Vec<String>> =
2165            std::collections::BTreeMap::new();
2166        for raw in columns {
2167            let mut parts = raw.splitn(3, "__");
2168            let first = parts.next().unwrap_or("");
2169            let second = parts.next();
2170            let third = parts.next();
2171            if third.is_some() {
2172                return Err(sqlx::Error::Protocol(format!(
2173                    "umbral::orm::values: nested `{raw}` is not supported in v1 \
2174                     (one-hop only — `a__b`, not `a__b__c`)"
2175                )));
2176            }
2177            match second {
2178                Some(child) => {
2179                    per_rel
2180                        .entry(first.to_string())
2181                        .or_default()
2182                        .push(child.to_string());
2183                }
2184                None => parent_cols.push(first.to_string()),
2185            }
2186        }
2187
2188        // Validate every parent name against T::FIELDS.
2189        for name in &parent_cols {
2190            if !meta.fields.iter().any(|c| c.name == *name) {
2191                return Err(sqlx::Error::Protocol(format!(
2192                    "umbral::orm::values: unknown column `{name}` on model `{}`",
2193                    T::NAME
2194                )));
2195            }
2196        }
2197
2198        // Validate every relation + child trio. Build a struct per
2199        // relation that the SQL/decoder loops below need.
2200        struct RelInfo<'a> {
2201            rel_name: String,
2202            related_table: &'a str,
2203            related_pk: &'a crate::migrate::Column,
2204            child_cols: Vec<&'a crate::migrate::Column>,
2205        }
2206        let mut rel_infos: Vec<RelInfo<'_>> = Vec::with_capacity(per_rel.len());
2207        for (rel_name, child_names) in &per_rel {
2208            let fk_field = meta.fields.iter().find(|c| c.name == *rel_name);
2209            let Some(fk_field) = fk_field else {
2210                return Err(sqlx::Error::Protocol(format!(
2211                    "umbral::orm::values: unknown relation `{rel_name}` on model `{}` \
2212                     (used in `{rel_name}__...`)",
2213                    T::NAME
2214                )));
2215            };
2216            let Some(related_table) = fk_field.fk_target.as_deref() else {
2217                return Err(sqlx::Error::Protocol(format!(
2218                    "umbral::orm::values: field `{rel_name}` on `{}` is not a foreign \
2219                     key — `__` traversal only works through FK fields",
2220                    T::NAME
2221                )));
2222            };
2223            let Some(related_meta) = registered.iter().find(|m| m.table == related_table) else {
2224                return Err(sqlx::Error::Protocol(format!(
2225                    "umbral::orm::values: related model for table `{related_table}` \
2226                     is not registered"
2227                )));
2228            };
2229            let Some(related_pk) = related_meta.fields.iter().find(|c| c.primary_key) else {
2230                return Err(sqlx::Error::Protocol(format!(
2231                    "umbral::orm::values: related model `{related_table}` has no \
2232                     primary key column"
2233                )));
2234            };
2235            let mut child_cols: Vec<&crate::migrate::Column> =
2236                Vec::with_capacity(child_names.len());
2237            for name in child_names {
2238                let col = related_meta
2239                    .fields
2240                    .iter()
2241                    .find(|c| c.name == *name)
2242                    .ok_or_else(|| {
2243                        sqlx::Error::Protocol(format!(
2244                            "umbral::orm::values: unknown child column `{name}` on \
2245                             related model `{related_table}` (full path `{rel_name}__{name}`)"
2246                        ))
2247                    })?;
2248                child_cols.push(col);
2249            }
2250            rel_infos.push(RelInfo {
2251                rel_name: rel_name.clone(),
2252                related_table,
2253                related_pk,
2254                child_cols,
2255            });
2256        }
2257
2258        // Build the SQL. Subquery-wrap the parent (WHERE / ORDER
2259        // BY / LIMIT all stay scoped to it) so JOIN'd tables can't
2260        // shadow bare-column predicates — same trick
2261        // apply_join_related uses.
2262        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2263        let backend = pool.backend_name();
2264        let inner = self.build_query_for(backend);
2265        let parent_alias = Alias::new("__p");
2266        let mut outer = Query::select();
2267        outer.from_subquery(inner, parent_alias.clone());
2268        // Outer SELECT: parent cols (bare), aliased child cols.
2269        for name in &parent_cols {
2270            outer.expr_as(
2271                Expr::col((parent_alias.clone(), Alias::new(name.as_str()))),
2272                Alias::new(name.as_str()),
2273            );
2274        }
2275        for info in &rel_infos {
2276            let join_alias = Alias::new(format!("__j_{}", info.rel_name));
2277            outer.join_as(
2278                sea_query::JoinType::LeftJoin,
2279                crate::db::router::schema_qualified_table(info.related_table),
2280                join_alias.clone(),
2281                Expr::col((parent_alias.clone(), Alias::new(info.rel_name.as_str()))).equals((
2282                    join_alias.clone(),
2283                    Alias::new(info.related_pk.name.as_str()),
2284                )),
2285            );
2286            // Always include the related PK alias so the decoder
2287            // can detect a LEFT JOIN miss → emit Value::Null for
2288            // the whole relation. Plus every requested child col.
2289            let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2290            outer.expr_as(
2291                Expr::col((
2292                    join_alias.clone(),
2293                    Alias::new(info.related_pk.name.as_str()),
2294                )),
2295                Alias::new(pk_alias),
2296            );
2297            for col in &info.child_cols {
2298                let alias = format!("{}__{}", info.rel_name, col.name);
2299                outer.expr_as(
2300                    Expr::col((join_alias.clone(), Alias::new(col.name.as_str()))),
2301                    Alias::new(alias),
2302                );
2303            }
2304        }
2305
2306        // Execute + decode. Per-backend dispatch.
2307        match pool {
2308            DbPool::Sqlite(pool) => {
2309                let (sql, vals) = outer.build_sqlx(SqliteQueryBuilder);
2310                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2311                    .fetch_all(&pool)
2312                    .await?;
2313                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2314                for row in &rows {
2315                    let mut obj = serde_json::Map::new();
2316                    // Parent cols decode by their bare alias name.
2317                    for name in &parent_cols {
2318                        if let Some(col) = meta.fields.iter().find(|c| c.name == *name) {
2319                            let v = crate::orm::dynamic::decode_to_json_aliased(row, col, name)?;
2320                            obj.insert(name.clone(), v);
2321                        }
2322                    }
2323                    // Per-relation nested object — `null` on LEFT JOIN miss.
2324                    for info in &rel_infos {
2325                        let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2326                        let pk_is_null =
2327                            backend_sqlite::joined_pk_is_null(row, &info.related_pk, &pk_alias);
2328                        if pk_is_null {
2329                            obj.insert(info.rel_name.clone(), JsonValue::Null);
2330                            continue;
2331                        }
2332                        let mut nested = serde_json::Map::with_capacity(info.child_cols.len());
2333                        for col in &info.child_cols {
2334                            let alias = format!("{}__{}", info.rel_name, col.name);
2335                            let v = crate::orm::dynamic::decode_to_json_aliased(row, col, &alias)?;
2336                            nested.insert(col.name.clone(), v);
2337                        }
2338                        obj.insert(info.rel_name.clone(), JsonValue::Object(nested));
2339                    }
2340                    out.push(JsonValue::Object(obj));
2341                }
2342                Ok(out)
2343            }
2344            DbPool::Postgres(pool) => {
2345                let (sql, vals) = outer.build_sqlx(PostgresQueryBuilder);
2346                let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2347                    .fetch_all(&pool)
2348                    .await?;
2349                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2350                for row in &rows {
2351                    let mut obj = serde_json::Map::new();
2352                    for name in &parent_cols {
2353                        if let Some(col) = meta.fields.iter().find(|c| c.name == *name) {
2354                            let v = crate::orm::dynamic::decode_pg_to_json_aliased(row, col, name)?;
2355                            obj.insert(name.clone(), v);
2356                        }
2357                    }
2358                    for info in &rel_infos {
2359                        let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2360                        let pk_is_null =
2361                            backend_pg::joined_pk_is_null(row, &info.related_pk, &pk_alias);
2362                        if pk_is_null {
2363                            obj.insert(info.rel_name.clone(), JsonValue::Null);
2364                            continue;
2365                        }
2366                        let mut nested = serde_json::Map::with_capacity(info.child_cols.len());
2367                        for col in &info.child_cols {
2368                            let alias = format!("{}__{}", info.rel_name, col.name);
2369                            let v =
2370                                crate::orm::dynamic::decode_pg_to_json_aliased(row, col, &alias)?;
2371                            nested.insert(col.name.clone(), v);
2372                        }
2373                        obj.insert(info.rel_name.clone(), JsonValue::Object(nested));
2374                    }
2375                    out.push(JsonValue::Object(obj));
2376                }
2377                Ok(out)
2378            }
2379        }
2380    }
2381
2382    /// Single-row aggregate. Runs `SELECT AGG(col) AS name, ...` with
2383    /// the QuerySet's accumulated WHERE clause (ORDER BY / LIMIT /
2384    /// OFFSET are dropped — they make no sense over an aggregate
2385    /// without GROUP BY).
2386    ///
2387    /// Returns a `serde_json::Value::Object` keyed by the supplied
2388    /// names. COUNT comes back as an integer; AVG as a float; SUM /
2389    /// MAX / MIN inherit the source column's declared type.
2390    ///
2391    /// ```rust,ignore
2392    /// use umbral::orm::Aggregate;
2393    /// let summary = Post::objects()
2394    ///     .filter(post::PUBLISHED.eq(true))
2395    ///     .aggregate(&[
2396    ///         ("count", Aggregate::count()),
2397    ///         ("total", Aggregate::sum("view_count")),
2398    ///     ])
2399    ///     .await?;
2400    /// // { "count": 42, "total": 9999 }
2401    /// ```
2402    pub async fn aggregate(
2403        self,
2404        aggs: &[(&str, crate::orm::Aggregate)],
2405    ) -> Result<JsonValue, sqlx::Error> {
2406        let meta = crate::migrate::ModelMeta::for_::<T>();
2407        // Validate every aggregate's source column exists.
2408        for (name, agg) in aggs {
2409            if let Some(col) = agg.source_column()
2410                && !meta.fields.iter().any(|c| c.name == col)
2411            {
2412                return Err(sqlx::Error::Protocol(format!(
2413                    "umbral::orm::aggregate: unknown column `{}` on model `{}` for aggregate `{}`",
2414                    col,
2415                    T::NAME,
2416                    name
2417                )));
2418            }
2419        }
2420        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2421        let backend = pool.backend_name();
2422        let mut q = self.build_query_for(backend);
2423        q.clear_selects();
2424        q.reset_limit();
2425        q.reset_offset();
2426        for (name, agg) in aggs {
2427            q.expr_as(agg.to_simple_expr(), Alias::new(*name));
2428        }
2429        match pool {
2430            DbPool::Sqlite(pool) => {
2431                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2432                let row = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2433                    .fetch_one(&pool)
2434                    .await?;
2435                let mut obj = serde_json::Map::with_capacity(aggs.len());
2436                for (name, agg) in aggs {
2437                    let source_ty = agg
2438                        .source_column()
2439                        .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2440                    obj.insert(
2441                        name.to_string(),
2442                        backend_sqlite::decode_agg(&row, name, agg, source_ty)?,
2443                    );
2444                }
2445                Ok(JsonValue::Object(obj))
2446            }
2447            DbPool::Postgres(pool) => {
2448                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2449                let row = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2450                    .fetch_one(&pool)
2451                    .await?;
2452                let mut obj = serde_json::Map::with_capacity(aggs.len());
2453                for (name, agg) in aggs {
2454                    let source_ty = agg
2455                        .source_column()
2456                        .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2457                    obj.insert(
2458                        name.to_string(),
2459                        backend_pg::decode_agg(&row, name, agg, source_ty)?,
2460                    );
2461                }
2462                Ok(JsonValue::Object(obj))
2463            }
2464        }
2465    }
2466
2467    /// Grouped aggregate. Runs `SELECT <group_cols>, AGG(col) AS name,
2468    /// ... GROUP BY <group_cols>` with the accumulated WHERE clause.
2469    ///
2470    /// Returns one `Value::Object` per group, with both the group
2471    /// columns and each named aggregate as fields. Group columns are
2472    /// decoded per their declared SqlType (so an integer
2473    /// `author_id` stays a JSON number).
2474    ///
2475    /// ```rust,ignore
2476    /// let by_author = Post::objects()
2477    ///     .annotate(&["author_id"], &[("count", Aggregate::count())])
2478    ///     .await?;
2479    /// // [ { "author_id": 1, "count": 3 }, { "author_id": 2, "count": 2 } ]
2480    /// ```
2481    pub async fn annotate(
2482        self,
2483        group_cols: &[&str],
2484        aggs: &[(&str, crate::orm::Aggregate)],
2485    ) -> Result<Vec<JsonValue>, sqlx::Error> {
2486        let meta = crate::migrate::ModelMeta::for_::<T>();
2487        // Resolve group columns up front so unknown names fail
2488        // before any SQL runs.
2489        let mut chosen_groups: Vec<&crate::migrate::Column> = Vec::with_capacity(group_cols.len());
2490        for name in group_cols {
2491            let col = meta
2492                .fields
2493                .iter()
2494                .find(|c| c.name == *name)
2495                .ok_or_else(|| {
2496                    sqlx::Error::Protocol(format!(
2497                        "umbral::orm::annotate: unknown group column `{}` on model `{}`",
2498                        name,
2499                        T::NAME
2500                    ))
2501                })?;
2502            chosen_groups.push(col);
2503        }
2504        // Validate aggregate source columns.
2505        for (name, agg) in aggs {
2506            if let Some(col) = agg.source_column()
2507                && !meta.fields.iter().any(|c| c.name == col)
2508            {
2509                return Err(sqlx::Error::Protocol(format!(
2510                    "umbral::orm::annotate: unknown column `{}` on model `{}` for aggregate `{}`",
2511                    col,
2512                    T::NAME,
2513                    name
2514                )));
2515            }
2516        }
2517        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2518        let backend = pool.backend_name();
2519        let mut q = self.build_query_for(backend);
2520        q.clear_selects();
2521        // GROUP BY columns appear in the SELECT list AND the GROUP BY
2522        // clause. Aggregates only in the SELECT.
2523        for col in &chosen_groups {
2524            q.column(Alias::new(col.name.as_str()));
2525            q.add_group_by([sea_query::SimpleExpr::Column(sea_query::ColumnRef::Column(
2526                Alias::new(col.name.as_str()).into_iden(),
2527            ))]);
2528        }
2529        for (name, agg) in aggs {
2530            q.expr_as(agg.to_simple_expr(), Alias::new(*name));
2531        }
2532        match pool {
2533            DbPool::Sqlite(pool) => {
2534                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2535                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2536                    .fetch_all(&pool)
2537                    .await?;
2538                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2539                for row in &rows {
2540                    let mut obj = serde_json::Map::with_capacity(chosen_groups.len() + aggs.len());
2541                    for col in &chosen_groups {
2542                        obj.insert(
2543                            col.name.clone(),
2544                            crate::orm::dynamic::decode_to_json(row, col)?,
2545                        );
2546                    }
2547                    for (name, agg) in aggs {
2548                        let source_ty = agg
2549                            .source_column()
2550                            .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2551                        obj.insert(
2552                            name.to_string(),
2553                            backend_sqlite::decode_agg(row, name, agg, source_ty)?,
2554                        );
2555                    }
2556                    out.push(JsonValue::Object(obj));
2557                }
2558                Ok(out)
2559            }
2560            DbPool::Postgres(pool) => {
2561                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2562                let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2563                    .fetch_all(&pool)
2564                    .await?;
2565                let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2566                for row in &rows {
2567                    let mut obj = serde_json::Map::with_capacity(chosen_groups.len() + aggs.len());
2568                    for col in &chosen_groups {
2569                        obj.insert(
2570                            col.name.clone(),
2571                            crate::orm::dynamic::decode_pg_to_json(row, col)?,
2572                        );
2573                    }
2574                    for (name, agg) in aggs {
2575                        let source_ty = agg
2576                            .source_column()
2577                            .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2578                        obj.insert(
2579                            name.to_string(),
2580                            backend_pg::decode_agg(row, name, agg, source_ty)?,
2581                        );
2582                    }
2583                    out.push(JsonValue::Object(obj));
2584                }
2585                Ok(out)
2586            }
2587        }
2588    }
2589
2590    /// The chainable `annotate(alias=Agg("relation"))`: attach a
2591    /// related-aggregate annotation to this QuerySet. The annotation
2592    /// is **query-builder state** — it renders as a correlated scalar
2593    /// subquery inside the one SELECT every terminal builds, so it
2594    /// composes with `.filter` / `.order_by` / `.limit`, stacks with
2595    /// further annotations, and shows up in [`Self::explain`] /
2596    /// [`Self::to_sql`] out of the box. Never a side query, never an
2597    /// N+1.
2598    ///
2599    /// `relation` names a `ReverseSet` relation on the model
2600    /// (`#[umbral(reverse_fk = "...")]`), the same names
2601    /// `prefetch_related` accepts. When no declared relation matches,
2602    /// the resolver AUTO-DISCOVERS the relation (gaps2 #45): it scans
2603    /// the model registry for any child whose FK targets this parent's
2604    /// table and matches `relation` against the child's conventional
2605    /// name forms (table name, `snake_case` / lowercase struct name,
2606    /// any of those with a `_set` suffix). Declared relations always
2607    /// take precedence; an ambiguous auto-match (two children, or a
2608    /// child with two FKs to this parent) poisons the annotation with
2609    /// an error that names the candidates and points at the
2610    /// `#[umbral(reverse_fk = "...")]` escape hatch. Any
2611    /// [`crate::orm::Aggregate`] works; non-count aggregates name a
2612    /// column on the CHILD model:
2613    ///
2614    /// ```rust,ignore
2615    /// let rows = Plugin::objects()
2616    ///     .filter(plugin::MODERATION.eq("approved"))
2617    ///     .annotate_count("comment_set")                                // COUNT(*)
2618    ///     .annotate_related("rating_avg", "review_set", Aggregate::avg("rating"))
2619    ///     .fetch_annotated()
2620    ///     .await?;                       // Vec<(Plugin, Map<alias, value>)>
2621    /// ```
2622    ///
2623    /// An unknown relation name doesn't panic the (infallible)
2624    /// builder — it poisons the annotation, and every fallible
2625    /// consumer (`fetch_annotated`, `explain`) reports it loudly.
2626    /// v1 caveats: child rows aggregate unconditionally — a
2627    /// child-side predicate (a filtered count)
2628    /// and child soft-delete awareness are tracked follow-ups
2629    /// (gaps2 #39).
2630    pub fn annotate_related(
2631        mut self,
2632        alias: &str,
2633        relation: &str,
2634        agg: crate::orm::Aggregate,
2635    ) -> Self {
2636        let rev_spec = T::REVERSE_FK_RELATIONS
2637            .iter()
2638            .find(|r| r.field_name == relation);
2639        let m2m_spec = T::M2M_RELATIONS.iter().find(|r| r.field_name == relation);
2640
2641        let pk = T::FIELDS
2642            .iter()
2643            .find(|f| f.primary_key)
2644            .map(|f| f.name)
2645            .unwrap_or("id");
2646
2647        let mut child_soft_delete = false;
2648        let mut m2m_junction: Option<String> = None;
2649
2650        let resolved = if let Some(spec) = rev_spec {
2651            child_soft_delete = spec.soft_delete;
2652            Ok((
2653                spec.target_table.to_string(),
2654                spec.fk_column.to_string(),
2655                T::TABLE.to_string(),
2656                pk.to_string(),
2657            ))
2658        } else if let Some(spec) = m2m_spec {
2659            // M2M count: junction table = "<parent>_<field>", columns
2660            // parent_id / child_id. The subquery counts junction rows.
2661            m2m_junction = Some(format!("{}_{}", T::TABLE, spec.field_name));
2662            // child_table / fk_column are unused for the M2M shape, but
2663            // the tuple still carries parent_table + parent_pk for the
2664            // correlation in build_query_for.
2665            Ok((
2666                spec.target_table.to_string(),
2667                "child_id".to_string(),
2668                T::TABLE.to_string(),
2669                pk.to_string(),
2670            ))
2671        } else {
2672            // No DECLARED relation matched. Fall back to auto-discovery
2673            // (gaps2 #45): scan the model registry for any child whose
2674            // FK points back at this parent's table, and match `relation`
2675            // against the conventional name forms. Declared relations
2676            // always win above; this only runs as a fallback.
2677            match discover_reverse_relation::<T>(relation) {
2678                AutoDiscovery::Resolved {
2679                    child_table,
2680                    fk_column,
2681                    soft_delete,
2682                } => {
2683                    child_soft_delete = soft_delete;
2684                    Ok((child_table, fk_column, T::TABLE.to_string(), pk.to_string()))
2685                }
2686                AutoDiscovery::Ambiguous(candidates) => Err(format!(
2687                    "umbral::orm::annotate_related: ambiguous reverse relation `{relation}` on `{}` — candidates: [{}]; declare a `#[umbral(reverse_fk = \"<fk>\")] ReverseSet<Child>` field to disambiguate",
2688                    T::NAME,
2689                    candidates.join(", "),
2690                )),
2691                AutoDiscovery::NotFound(discoverable) => {
2692                    let declared = T::REVERSE_FK_RELATIONS
2693                        .iter()
2694                        .map(|r| r.field_name)
2695                        .collect::<Vec<_>>()
2696                        .join(", ");
2697                    let m2m = T::M2M_RELATIONS
2698                        .iter()
2699                        .map(|r| r.field_name)
2700                        .collect::<Vec<_>>()
2701                        .join(", ");
2702                    Err(format!(
2703                        "umbral::orm::annotate_related: `{relation}` is not a reverse-FK or M2M relation on `{}` — reverse-FK relations: [{declared}], M2M relations: [{m2m}], auto-discoverable children: [{}]",
2704                        T::NAME,
2705                        discoverable.join(", "),
2706                    ))
2707                }
2708            }
2709        };
2710
2711        self.annotations.push(RelatedAnnotation {
2712            alias: alias.to_string(),
2713            agg,
2714            resolved,
2715            child_soft_delete,
2716            child_filter: None,
2717            m2m_junction,
2718        });
2719        self
2720    }
2721
2722    /// Sugar for the overwhelmingly common annotation:
2723    /// `annotate_related("<relation>_count", relation, Aggregate::count())`.
2724    /// `.annotate_count("comment_set")` exposes the value under the
2725    /// `comment_set_count` alias in [`Self::fetch_annotated`].
2726    pub fn annotate_count(self, relation: &str) -> Self {
2727        let alias = format!("{relation}_count");
2728        self.annotate_related(&alias, relation, crate::orm::Aggregate::count())
2729    }
2730
2731    /// Like [`Self::annotate_count`] but counts only the children
2732    /// matching `pred` — a filtered count over `"comments"`.
2733    /// `C` is the CHILD model, so the predicate is typed against the
2734    /// child's columns (`comment::MODERATION.eq("visible")`). The
2735    /// predicate renders into the correlated count subquery's WHERE
2736    /// alongside the FK correlation and the auto soft-delete filter.
2737    ///
2738    /// ```rust,ignore
2739    /// Plugin::objects()
2740    ///     .annotate_count_where::<PluginComment>(
2741    ///         "visible_comments",
2742    ///         "comment_set",
2743    ///         plugin_comment::MODERATION.eq("visible"),
2744    ///     )
2745    /// ```
2746    pub fn annotate_count_where<C: crate::orm::Model>(
2747        self,
2748        alias: &str,
2749        relation: &str,
2750        pred: crate::orm::Predicate<C>,
2751    ) -> Self {
2752        // Render the child predicate to a backend-default SimpleExpr.
2753        // The count subquery embeds one expression; the equality /
2754        // comparison predicates used for child filters render the same
2755        // on both backends, so the default `cond` is correct.
2756        let child_filter = pred.cond_for("postgres");
2757        let mut queryset = self.annotate_related(alias, relation, crate::orm::Aggregate::count());
2758        // The just-pushed annotation is the last one; attach the filter.
2759        if let Some(last) = queryset.annotations.last_mut() {
2760            last.child_filter = Some(child_filter);
2761        }
2762        queryset
2763    }
2764
2765    /// Loud-failure check for poisoned annotations (unknown relation
2766    /// names recorded by the infallible builder). Called by every
2767    /// fallible consumer before SQL runs.
2768    fn check_annotations(&self) -> Result<(), sqlx::Error> {
2769        for ann in &self.annotations {
2770            if let Err(msg) = &ann.resolved {
2771                return Err(sqlx::Error::Protocol(msg.clone()));
2772            }
2773        }
2774        Ok(())
2775    }
2776
2777    /// Run the SELECT and return every matching row **with its
2778    /// annotation values** — the execution terminal for
2779    /// [`Self::annotate_related`] / [`Self::annotate_count`]. One
2780    /// query; each row's annotations arrive as an `alias → JSON
2781    /// value` map (count → integer, AVG → float/null, SUM/MAX/MIN →
2782    /// typed per the child column, NULL on empty sets for the
2783    /// non-count aggregates).
2784    pub async fn fetch_annotated(
2785        self,
2786    ) -> Result<Vec<(T, serde_json::Map<String, JsonValue>)>, sqlx::Error>
2787    where
2788        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
2789            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
2790    {
2791        self.check_annotations()?;
2792        // Child-column types for SUM/MAX/MIN decoding, resolved from
2793        // the runtime registry (the child model is known by table
2794        // name only). `None` falls back to decode_agg's string path.
2795        let source_types: Vec<(String, crate::orm::Aggregate, Option<crate::orm::SqlType>)> = {
2796            let registry_up = crate::migrate::is_initialised();
2797            self.annotations
2798                .iter()
2799                .map(|ann| {
2800                    let ty = match (&ann.resolved, registry_up, ann.agg.source_column()) {
2801                        (Ok((child_table, ..)), true, Some(col)) => {
2802                            crate::migrate::registered_models()
2803                                .into_iter()
2804                                .find(|m| m.table == *child_table)
2805                                .and_then(|m| m.fields.iter().find(|f| f.name == col).map(|f| f.ty))
2806                        }
2807                        _ => None,
2808                    };
2809                    (ann.alias.clone(), ann.agg.clone(), ty)
2810                })
2811                .collect()
2812        };
2813
2814        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2815        match pool {
2816            DbPool::Sqlite(pool) => {
2817                let q = self.build_query_for("sqlite");
2818                let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2819                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2820                    .fetch_all(&pool)
2821                    .await?;
2822                let mut out = Vec::with_capacity(rows.len());
2823                for row in &rows {
2824                    let t = <T as sqlx::FromRow<_>>::from_row(row)?;
2825                    let mut anns = serde_json::Map::with_capacity(source_types.len());
2826                    for (alias, agg, ty) in &source_types {
2827                        anns.insert(
2828                            alias.clone(),
2829                            backend_sqlite::decode_agg(row, alias, agg, *ty)?,
2830                        );
2831                    }
2832                    out.push((t, anns));
2833                }
2834                Ok(out)
2835            }
2836            DbPool::Postgres(pool) => {
2837                let q = self.build_query_for("postgres");
2838                let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2839                let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2840                    .fetch_all(&pool)
2841                    .await?;
2842                let mut out = Vec::with_capacity(rows.len());
2843                for row in &rows {
2844                    let t = <T as sqlx::FromRow<_>>::from_row(row)?;
2845                    let mut anns = serde_json::Map::with_capacity(source_types.len());
2846                    for (alias, agg, ty) in &source_types {
2847                        anns.insert(alias.clone(), backend_pg::decode_agg(row, alias, agg, *ty)?);
2848                    }
2849                    out.push((t, anns));
2850                }
2851                Ok(out)
2852            }
2853        }
2854    }
2855
2856    /// `DELETE FROM table WHERE <predicates>`. Returns the number of
2857    /// rows deleted. With no `.filter` calls, deletes every row.
2858    ///
2859    /// Fires `bulk_post_delete:<table>` once with the list of removed
2860    /// PKs when at least one row was deleted. Per-row `pre_delete` /
2861    /// `post_delete` are NOT fired by this path — use
2862    /// [`Manager::delete_instance`] when per-row signal semantics are
2863    /// required.
2864    ///
2865    /// Feature #72: for `#[umbral(soft_delete)]` models this rewrites
2866    /// to `UPDATE ... SET deleted_at = NOW() WHERE ...` so rows
2867    /// survive in the DB (filtered out of subsequent queries by the
2868    /// auto `WHERE deleted_at IS NULL`). Call `.hard_delete()`
2869    /// beforehand for a real DELETE (GDPR purge, test cleanup).
2870    pub async fn delete(self) -> Result<u64, sqlx::Error> {
2871        // Feature #72 — soft-delete redirect. The whole `delete()`
2872        // contract collapses to an UPDATE setting `deleted_at`. We
2873        // keep the bulk_post_delete signal so subscribers see the
2874        // same event shape regardless of the underlying SQL.
2875        if self.soft_delete_active && !self.hard_delete {
2876            return self.soft_delete_update().await;
2877        }
2878        let atomic = self.should_atomic_wrap();
2879        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
2880        let backend = pool.backend_name();
2881        let mut stmt = self.build_delete_for(backend);
2882        let pk = pk_field::<T>();
2883        if let Some(field) = pk {
2884            stmt.returning_col(Alias::new(field.name));
2885        }
2886        let ids: Vec<JsonValue> = match pool {
2887            DbPool::Sqlite(pool) => {
2888                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
2889                let rows = if atomic {
2890                    let mut tx = pool.begin().await?;
2891                    let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
2892                        .fetch_all(&mut *tx)
2893                        .await;
2894                    match r {
2895                        Ok(rows) => {
2896                            tx.commit().await?;
2897                            rows
2898                        }
2899                        Err(e) => {
2900                            let _ = tx.rollback().await;
2901                            return Err(e);
2902                        }
2903                    }
2904                } else {
2905                    sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
2906                        .fetch_all(&pool)
2907                        .await?
2908                };
2909                match pk {
2910                    Some(field) => rows
2911                        .iter()
2912                        .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
2913                        .collect::<Result<_, _>>()?,
2914                    None => Vec::new(),
2915                }
2916            }
2917            DbPool::Postgres(pool) => {
2918                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
2919                let rows = if atomic {
2920                    let mut tx = pool.begin().await?;
2921                    let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
2922                        .fetch_all(&mut *tx)
2923                        .await;
2924                    match r {
2925                        Ok(rows) => {
2926                            tx.commit().await?;
2927                            rows
2928                        }
2929                        Err(e) => {
2930                            let _ = tx.rollback().await;
2931                            return Err(e);
2932                        }
2933                    }
2934                } else {
2935                    sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
2936                        .fetch_all(&pool)
2937                        .await?
2938                };
2939                match pk {
2940                    Some(field) => rows
2941                        .iter()
2942                        .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
2943                        .collect::<Result<_, _>>()?,
2944                    None => Vec::new(),
2945                }
2946            }
2947        };
2948        let count = ids.len() as u64;
2949        if !ids.is_empty() {
2950            crate::signals::emit_bulk_post_delete::<T>(ids).await;
2951        }
2952        Ok(count)
2953    }
2954
2955    /// `UPDATE table SET col = <expr> WHERE <predicates>` using an
2956    /// F-expression for the new value.
2957    ///
2958    /// This is the companion to `update_values` for atomic column
2959    /// arithmetic. `F::col("views").add(1)` produces an [`FExpr`] that
2960    /// renders as `SET views = views + 1` — the database computes the
2961    /// increment atomically on the server side rather than needing a
2962    /// read-modify-write round-trip in application code.
2963    ///
2964    /// ```rust,ignore
2965    /// use umbral::orm::F;
2966    ///
2967    /// Post::objects()
2968    ///     .filter(post::ID.eq(42))
2969    ///     .update_expr("views", F::col("views").add(1))
2970    ///     .await?;
2971    /// ```
2972    ///
2973    /// Mixing `update_values` and `update_expr` for different columns in
2974    /// one statement requires two separate calls. A combined API (a map
2975    /// where values can be either JSON or FExpr) would require a new sum
2976    /// type; deferred until a consumer surfaces the need.
2977    pub async fn update_expr(
2978        self,
2979        col_name: &str,
2980        expr: FExpr,
2981    ) -> Result<u64, crate::orm::write::WriteError> {
2982        use crate::orm::write::WriteError;
2983        // Validate the column exists on the model.
2984        let field = T::FIELDS
2985            .iter()
2986            .find(|f| f.name == col_name)
2987            .ok_or_else(|| WriteError::UnknownColumn {
2988                field: col_name.to_string(),
2989            })?;
2990        if field.primary_key {
2991            // Silently skip PK rewrites, same as update_values.
2992            return Ok(0);
2993        }
2994        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
2995        let backend = pool.backend_name();
2996
2997        let mut stmt = sea_query::Query::update();
2998        stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
2999        stmt.value(Alias::new(field.name), expr.to_simple_expr());
3000        for p in &self.predicates {
3001            stmt.and_where(p.cond_for(backend));
3002        }
3003        if self.soft_delete_active {
3004            if self.only_deleted {
3005                stmt.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
3006            } else if !self.with_deleted {
3007                stmt.and_where(Expr::col(Alias::new("deleted_at")).is_null());
3008            }
3009        }
3010        let pk = pk_field::<T>();
3011        if let Some(pkf) = pk {
3012            stmt.returning_col(Alias::new(pkf.name));
3013        }
3014
3015        let ids: Vec<JsonValue> = match pool {
3016            DbPool::Sqlite(pool) => {
3017                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3018                let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3019                    .fetch_all(&pool)
3020                    .await?;
3021                match pk {
3022                    Some(pkf) => rows
3023                        .iter()
3024                        .map(|r| backend_sqlite::pk_to_json(r, pkf.name, pkf.ty))
3025                        .collect::<Result<_, _>>()
3026                        .map_err(crate::orm::write::WriteError::Sqlx)?,
3027                    None => Vec::new(),
3028                }
3029            }
3030            DbPool::Postgres(pool) => {
3031                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3032                let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3033                    .fetch_all(&pool)
3034                    .await?;
3035                match pk {
3036                    Some(pkf) => rows
3037                        .iter()
3038                        .map(|r| backend_pg::pk_to_json(r, pkf.name, pkf.ty))
3039                        .collect::<Result<_, _>>()
3040                        .map_err(crate::orm::write::WriteError::Sqlx)?,
3041                    None => Vec::new(),
3042                }
3043            }
3044        };
3045        let count = ids.len() as u64;
3046        if !ids.is_empty() {
3047            crate::signals::emit_bulk_post_save::<T>(ids, false).await;
3048        }
3049        Ok(count)
3050    }
3051
3052    /// `UPDATE table SET k=v[, ...] WHERE <predicates>`. The values
3053    /// map provides `column_name → JSON value` pairs; each is
3054    /// converted to a `sea_query::Value` per the column's declared
3055    /// `SqlType` via [`crate::orm::write::json_to_sea_value`]. Returns
3056    /// the number of rows affected.
3057    ///
3058    /// Unknown columns in the map fail loudly with
3059    /// `WriteError::UnknownColumn`. JSON `null` is rejected for
3060    /// non-nullable columns; supplying a column that exists but is
3061    /// absent from the map is silently a no-op (the column keeps its
3062    /// current value — PATCH semantics, not PUT).
3063    ///
3064    /// Fires `bulk_post_save:<table>` once with `{ ids, created:
3065    /// false, actor }` when at least one row matched. Per-row
3066    /// `pre_save` / `post_save` are NOT fired — use [`Manager::save`]
3067    /// when per-row signal semantics are required.
3068    pub async fn update_values(
3069        self,
3070        values: serde_json::Map<String, serde_json::Value>,
3071    ) -> Result<u64, crate::orm::write::WriteError> {
3072        let atomic = self.should_atomic_wrap();
3073        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
3074        let backend = pool.backend_name();
3075        let mut stmt = self.build_update_for(backend, &values)?;
3076        // RETURNING <pk> so bulk_post_save can include the matched ids.
3077        let pk = pk_field::<T>();
3078        if let Some(field) = pk {
3079            stmt.returning_col(Alias::new(field.name));
3080        }
3081        let ids: Vec<JsonValue> = match pool {
3082            DbPool::Sqlite(pool) => {
3083                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3084                let rows = if atomic {
3085                    let mut tx = pool
3086                        .begin()
3087                        .await
3088                        .map_err(crate::orm::write::WriteError::Sqlx)?;
3089                    let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3090                        .fetch_all(&mut *tx)
3091                        .await;
3092                    match r {
3093                        Ok(rows) => {
3094                            tx.commit()
3095                                .await
3096                                .map_err(crate::orm::write::WriteError::Sqlx)?;
3097                            rows
3098                        }
3099                        Err(e) => {
3100                            let _ = tx.rollback().await;
3101                            return Err(crate::orm::write::WriteError::Sqlx(e));
3102                        }
3103                    }
3104                } else {
3105                    sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3106                        .fetch_all(&pool)
3107                        .await?
3108                };
3109                match pk {
3110                    Some(field) => rows
3111                        .iter()
3112                        .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3113                        .collect::<Result<_, _>>()
3114                        .map_err(crate::orm::write::WriteError::Sqlx)?,
3115                    None => Vec::new(),
3116                }
3117            }
3118            DbPool::Postgres(pool) => {
3119                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3120                let rows = if atomic {
3121                    let mut tx = pool
3122                        .begin()
3123                        .await
3124                        .map_err(crate::orm::write::WriteError::Sqlx)?;
3125                    let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3126                        .fetch_all(&mut *tx)
3127                        .await;
3128                    match r {
3129                        Ok(rows) => {
3130                            tx.commit()
3131                                .await
3132                                .map_err(crate::orm::write::WriteError::Sqlx)?;
3133                            rows
3134                        }
3135                        Err(e) => {
3136                            let _ = tx.rollback().await;
3137                            return Err(crate::orm::write::WriteError::Sqlx(e));
3138                        }
3139                    }
3140                } else {
3141                    sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3142                        .fetch_all(&pool)
3143                        .await?
3144                };
3145                match pk {
3146                    Some(field) => rows
3147                        .iter()
3148                        .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3149                        .collect::<Result<_, _>>()
3150                        .map_err(crate::orm::write::WriteError::Sqlx)?,
3151                    None => Vec::new(),
3152                }
3153            }
3154        };
3155        let count = ids.len() as u64;
3156        if !ids.is_empty() {
3157            crate::signals::emit_bulk_post_save::<T>(ids, false).await;
3158        }
3159        Ok(count)
3160    }
3161
3162    /// Helper: build the DELETE statement for the active backend.
3163    /// Public-by-virtue-of-being-pub(crate) so the `_pg` and (future)
3164    /// `_sqlite` explicit-pool variants can share the SQL builder.
3165    fn build_delete_for(&self, backend_name: &str) -> sea_query::DeleteStatement {
3166        let mut stmt = Query::delete();
3167        stmt.from_table(crate::db::router::schema_qualified_table(T::TABLE));
3168        for p in &self.predicates {
3169            stmt.and_where(p.cond_for(backend_name));
3170        }
3171        stmt
3172    }
3173
3174    /// Feature #72 — soft-delete rewrite: turn `DELETE FROM table
3175    /// WHERE ...` into `UPDATE table SET deleted_at = NOW() WHERE
3176    /// ... AND deleted_at IS NULL`. The trailing `IS NULL` guard
3177    /// makes the operation idempotent: re-soft-deleting an already-
3178    /// soft-deleted row doesn't bump its timestamp. Fires
3179    /// `bulk_post_delete:<table>` so subscribers see the same event
3180    /// shape as a hard delete.
3181    async fn soft_delete_update(self) -> Result<u64, sqlx::Error> {
3182        let atomic = self.should_atomic_wrap();
3183        let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
3184        let backend = pool.backend_name();
3185        let now = chrono::Utc::now();
3186        let mut stmt = sea_query::Query::update();
3187        stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
3188        stmt.value(
3189            Alias::new("deleted_at"),
3190            sea_query::Value::ChronoDateTimeUtc(Some(Box::new(now))),
3191        );
3192        for p in &self.predicates {
3193            stmt.and_where(p.cond_for(backend));
3194        }
3195        // Idempotency guard — never bump an already-set deleted_at.
3196        stmt.and_where(sea_query::Expr::col(Alias::new("deleted_at")).is_null());
3197        let pk = pk_field::<T>();
3198        if let Some(pkf) = pk {
3199            stmt.returning_col(Alias::new(pkf.name));
3200        }
3201        let ids: Vec<JsonValue> = match pool {
3202            DbPool::Sqlite(pool) => {
3203                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3204                let rows = if atomic {
3205                    let mut tx = pool.begin().await?;
3206                    let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3207                        .fetch_all(&mut *tx)
3208                        .await;
3209                    match r {
3210                        Ok(rows) => {
3211                            tx.commit().await?;
3212                            rows
3213                        }
3214                        Err(e) => {
3215                            let _ = tx.rollback().await;
3216                            return Err(e);
3217                        }
3218                    }
3219                } else {
3220                    sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3221                        .fetch_all(&pool)
3222                        .await?
3223                };
3224                match pk {
3225                    Some(field) => rows
3226                        .iter()
3227                        .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3228                        .collect::<Result<_, _>>()?,
3229                    None => Vec::new(),
3230                }
3231            }
3232            DbPool::Postgres(pool) => {
3233                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3234                let rows = if atomic {
3235                    let mut tx = pool.begin().await?;
3236                    let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3237                        .fetch_all(&mut *tx)
3238                        .await;
3239                    match r {
3240                        Ok(rows) => {
3241                            tx.commit().await?;
3242                            rows
3243                        }
3244                        Err(e) => {
3245                            let _ = tx.rollback().await;
3246                            return Err(e);
3247                        }
3248                    }
3249                } else {
3250                    sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3251                        .fetch_all(&pool)
3252                        .await?
3253                };
3254                match pk {
3255                    Some(field) => rows
3256                        .iter()
3257                        .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3258                        .collect::<Result<_, _>>()?,
3259                    None => Vec::new(),
3260                }
3261            }
3262        };
3263        let count = ids.len() as u64;
3264        if !ids.is_empty() {
3265            crate::signals::emit_bulk_post_delete::<T>(ids).await;
3266        }
3267        Ok(count)
3268    }
3269
3270    /// Helper: build the UPDATE statement for the active backend.
3271    /// Walks the `values` map, validates each column against the
3272    /// model's `FIELDS` metadata, converts the JSON value via
3273    /// `write::json_to_sea_value`, and threads the accumulated
3274    /// predicates into the WHERE clause.
3275    fn build_update_for(
3276        &self,
3277        backend_name: &str,
3278        values: &serde_json::Map<String, serde_json::Value>,
3279    ) -> Result<sea_query::UpdateStatement, crate::orm::write::WriteError> {
3280        use crate::orm::write::{WriteError, json_to_sea_value};
3281        let mut stmt = Query::update();
3282        stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
3283        for (col_name, val) in values {
3284            // Look up the column on the model. Unknown column names
3285            // fail loudly here rather than producing a bad UPDATE.
3286            let field = T::FIELDS
3287                .iter()
3288                .find(|f| f.name == col_name.as_str())
3289                .ok_or_else(|| WriteError::UnknownColumn {
3290                    field: col_name.clone(),
3291                })?;
3292            // Reject attempts to overwrite the PK via update_values.
3293            // The QuerySet's WHERE clause is the only way to identify
3294            // rows; rewriting the PK while filtering on the old one
3295            // is a footgun.
3296            if field.primary_key {
3297                continue;
3298            }
3299            let sea_value =
3300                json_to_sea_value(field.ty, val, field.nullable, field.name, fk_pk_hint(field))?;
3301            stmt.value(Alias::new(field.name), sea_value);
3302        }
3303        for p in &self.predicates {
3304            stmt.and_where(p.cond_for(backend_name));
3305        }
3306        if self.soft_delete_active {
3307            if self.only_deleted {
3308                stmt.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
3309            } else if !self.with_deleted {
3310                stmt.and_where(Expr::col(Alias::new("deleted_at")).is_null());
3311            }
3312        }
3313        Ok(stmt)
3314    }
3315
3316    /// Run the SELECT against an explicit `PgPool` and return every
3317    /// matching row. Bound by `FromRow<PgRow>` alone so models with
3318    /// Postgres-only field types compile.
3319    pub async fn fetch_pg(self, pool: &sqlx::PgPool) -> Result<Vec<T>, sqlx::Error>
3320    where
3321        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3322    {
3323        let q = self.build_query_for("postgres");
3324        let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
3325        sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3326            .fetch_all(pool)
3327            .await
3328    }
3329
3330    /// Run the SELECT against an explicit `PgPool` with LIMIT 1.
3331    pub async fn first_pg(mut self, pool: &sqlx::PgPool) -> Result<Option<T>, sqlx::Error>
3332    where
3333        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3334    {
3335        self.query.limit(1);
3336        let q = self.build_query_for("postgres");
3337        let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
3338        sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3339            .fetch_optional(pool)
3340            .await
3341    }
3342
3343    /// Run `SELECT COUNT(*)` against an explicit `PgPool`. No FromRow
3344    /// bound on `T` — the count tuple type is `(i64,)`.
3345    pub async fn count_pg(self, pool: &sqlx::PgPool) -> Result<i64, sqlx::Error> {
3346        let mut rebuilt = self.build_query_for("postgres");
3347        rebuilt.clear_selects();
3348        rebuilt.expr(Func::count(Expr::col(sea_query::Asterisk)));
3349        rebuilt.reset_limit();
3350        rebuilt.reset_offset();
3351        let (sql, values) = rebuilt.build_sqlx(PostgresQueryBuilder);
3352        let (n,): (i64,) = sqlx::query_as_with::<sqlx::Postgres, (i64,), _>(&sql, values)
3353            .fetch_one(pool)
3354            .await?;
3355        Ok(n)
3356    }
3357
3358    /// Return whether any row matches, against an explicit `PgPool`.
3359    pub async fn exists_pg(self, pool: &sqlx::PgPool) -> Result<bool, sqlx::Error>
3360    where
3361        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3362    {
3363        let rows = self.limit(1).fetch_pg(pool).await?;
3364        Ok(!rows.is_empty())
3365    }
3366
3367    /// Exactly-one terminal against an explicit `PgPool`.
3368    /// See [`QuerySet::get`] for the error-variant semantics.
3369    pub async fn get_pg(self, pool: &sqlx::PgPool) -> Result<T, GetError>
3370    where
3371        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3372    {
3373        let mut rows = self.limit(2).fetch_pg(pool).await.map_err(GetError::Sqlx)?;
3374        match rows.len() {
3375            0 => Err(GetError::NotFound),
3376            1 => Ok(rows.pop().unwrap()),
3377            _ => Err(GetError::MultipleObjectsReturned),
3378        }
3379    }
3380}
3381
3382/// Delegating chainable + terminal surface on `Manager<T>`.
3383///
3384/// Lets users write `Post::objects().filter(...).fetch().await` without
3385/// a separate `.query()` hop. Each method constructs the initial
3386/// `SelectStatement` against `T::TABLE` with one column per
3387/// `T::FIELDS` entry, wraps it in a fresh `QuerySet<T>`, and forwards.
3388impl<T: Model> Manager<T> {
3389    fn queryset(&self) -> QuerySet<T> {
3390        let columns: Vec<Alias> = T::FIELDS.iter().map(|f| Alias::new(f.name)).collect();
3391        let query = Query::select()
3392            .columns(columns)
3393            .from(crate::db::router::schema_qualified_table(T::TABLE))
3394            .take();
3395        let mut qs = QuerySet::new(query);
3396        // BUG-8: seed the default ORDER BY from `Model::ORDERING` so
3397        // terminals that don't see an explicit `.order_by(...)` still
3398        // get a deterministic row order.
3399        qs.default_ordering = T::ORDERING.to_vec();
3400        // Propagate the Manager's atomic override so QuerySet
3401        // terminals inherit it without the caller re-specifying.
3402        qs.atomic = self.atomic;
3403        // Feature #72 — snapshot the model's soft-delete opt-in into
3404        // the QuerySet so the build_query_for path knows whether to
3405        // auto-inject `WHERE deleted_at IS NULL`. Without this
3406        // snapshot the `impl<T> QuerySet<T>` path can't read
3407        // `T::SOFT_DELETE` (T is unbounded there).
3408        qs.soft_delete_active = T::SOFT_DELETE;
3409        qs
3410    }
3411
3412    /// Resolve whether write terminals on this Manager should auto-wrap
3413    /// in a transaction. Per-call override > builder global.
3414    fn should_atomic_wrap(&self) -> bool {
3415        self.atomic.unwrap_or_else(crate::db::atomic_default)
3416    }
3417
3418    /// See `QuerySet::filter`.
3419    pub fn filter(&self, p: Predicate<T>) -> QuerySet<T> {
3420        self.queryset().filter(p)
3421    }
3422
3423    /// See `QuerySet::exclude`.
3424    pub fn exclude(&self, p: Predicate<T>) -> QuerySet<T> {
3425        self.queryset().exclude(p)
3426    }
3427
3428    /// A bare [`QuerySet`] over every row — the `Model::objects().all()` form.
3429    ///
3430    /// The entry point when you need a `QuerySet` terminal without a
3431    /// filter: a grouped aggregate over the whole table, an unfiltered
3432    /// `aggregate`, or just an explicit "all rows" for readability.
3433    pub fn all(&self) -> QuerySet<T> {
3434        self.queryset()
3435    }
3436
3437    /// See [`QuerySet::aggregate`] — single-row aggregate over every row.
3438    ///
3439    /// Forwards from the manager so `Model::objects().aggregate(...)`
3440    /// works without an intervening `.filter(...)` / `.on(...)`.
3441    pub async fn aggregate(
3442        &self,
3443        aggs: &[(&str, crate::orm::Aggregate)],
3444    ) -> Result<JsonValue, sqlx::Error> {
3445        self.queryset().aggregate(aggs).await
3446    }
3447
3448    /// See [`QuerySet::annotate`] — grouped aggregate (`GROUP BY <group_cols>`).
3449    ///
3450    /// Forwards from the manager so the documented
3451    /// `Model::objects().annotate(&["status"], &[("count", Aggregate::count())])`
3452    /// (a grouped count over `"status"`) compiles
3453    /// directly, without a filter first.
3454    pub async fn annotate(
3455        &self,
3456        group_cols: &[&str],
3457        aggs: &[(&str, crate::orm::Aggregate)],
3458    ) -> Result<Vec<JsonValue>, sqlx::Error> {
3459        self.queryset().annotate(group_cols, aggs).await
3460    }
3461
3462    /// Feature #72 — see `QuerySet::with_deleted`.
3463    pub fn with_deleted(&self) -> QuerySet<T> {
3464        self.queryset().with_deleted()
3465    }
3466
3467    /// Feature #72 — see `QuerySet::only_deleted`.
3468    pub fn only_deleted(&self) -> QuerySet<T> {
3469        self.queryset().only_deleted()
3470    }
3471
3472    /// Gap #111 — see [`QuerySet::only`].
3473    pub fn only(&self, cols: &[&str]) -> QuerySet<T> {
3474        self.queryset().only(cols)
3475    }
3476
3477    /// See [`QuerySet::join_related`].
3478    pub fn join_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3479        self.queryset().join_related(field_name)
3480    }
3481
3482    /// See [`QuerySet::join_related_many`].
3483    pub fn join_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3484        self.queryset().join_related_many(field_names)
3485    }
3486
3487    /// See [`QuerySet::left_join_related`].
3488    pub fn left_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3489        self.queryset().left_join_related(path)
3490    }
3491
3492    /// See [`QuerySet::inner_join_related`].
3493    pub fn inner_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3494        self.queryset().inner_join_related(path)
3495    }
3496
3497    /// See [`QuerySet::right_join_related`].
3498    pub fn right_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3499        self.queryset().right_join_related(path)
3500    }
3501
3502    /// See [`QuerySet::select_related`].
3503    pub fn select_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3504        self.queryset().select_related(field_name)
3505    }
3506
3507    /// See [`QuerySet::select_related_many`].
3508    pub fn select_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3509        self.queryset().select_related_many(field_names)
3510    }
3511
3512    /// See [`QuerySet::prefetch_related`].
3513    pub fn prefetch_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3514        self.queryset().prefetch_related(field_name)
3515    }
3516
3517    /// See [`QuerySet::prefetch_related_many`].
3518    pub fn prefetch_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3519        self.queryset().prefetch_related_many(field_names)
3520    }
3521
3522    /// Feature #72 — see `QuerySet::hard_delete`.
3523    pub fn hard_delete(&self) -> QuerySet<T> {
3524        self.queryset().hard_delete()
3525    }
3526
3527    /// See `QuerySet::into_subquery`.
3528    pub fn into_subquery(&self, col_name: &str) -> crate::orm::Subquery {
3529        self.queryset().into_subquery(col_name)
3530    }
3531
3532    /// See `QuerySet::order_by`.
3533    pub fn order_by(&self, o: OrderExpr<T>) -> QuerySet<T> {
3534        self.queryset().order_by(o)
3535    }
3536
3537    /// See `QuerySet::limit`.
3538    pub fn limit(&self, n: u64) -> QuerySet<T> {
3539        self.queryset().limit(n)
3540    }
3541
3542    /// See `QuerySet::offset`.
3543    pub fn offset(&self, n: u64) -> QuerySet<T> {
3544        self.queryset().offset(n)
3545    }
3546
3547    /// See `QuerySet::on`.
3548    pub fn on(&self, pool: &sqlx::SqlitePool) -> QuerySet<T> {
3549        self.queryset().on(pool)
3550    }
3551
3552    /// See `QuerySet::on_pg`.
3553    pub fn on_pg(&self, pool: &sqlx::PgPool) -> QuerySet<T> {
3554        self.queryset().on_pg(pool)
3555    }
3556
3557    /// See `QuerySet::fetch`.
3558    pub async fn fetch(&self) -> Result<Vec<T>, sqlx::Error>
3559    where
3560        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3561            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3562            + HydrateRelated,
3563    {
3564        self.queryset().fetch().await
3565    }
3566
3567    /// See `QuerySet::first`.
3568    pub async fn first(&self) -> Result<Option<T>, sqlx::Error>
3569    where
3570        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3571            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3572            + HydrateRelated,
3573    {
3574        self.queryset().first().await
3575    }
3576
3577    /// See `QuerySet::count`.
3578    pub async fn count(&self) -> Result<i64, sqlx::Error> {
3579        self.queryset().count().await
3580    }
3581
3582    /// See [`QuerySet::annotate_related`] — starts an annotated chain
3583    /// from the manager, like `filter` does.
3584    pub fn annotate_related(
3585        &self,
3586        alias: &str,
3587        relation: &str,
3588        agg: crate::orm::Aggregate,
3589    ) -> QuerySet<T> {
3590        self.queryset().annotate_related(alias, relation, agg)
3591    }
3592
3593    /// See [`QuerySet::annotate_count`].
3594    pub fn annotate_count(&self, relation: &str) -> QuerySet<T> {
3595        self.queryset().annotate_count(relation)
3596    }
3597
3598    /// See [`QuerySet::annotate_count_where`] — starts a filtered
3599    /// annotated chain from the manager.
3600    pub fn annotate_count_where<C: crate::orm::Model>(
3601        &self,
3602        alias: &str,
3603        relation: &str,
3604        pred: crate::orm::Predicate<C>,
3605    ) -> QuerySet<T> {
3606        self.queryset()
3607            .annotate_count_where::<C>(alias, relation, pred)
3608    }
3609
3610    /// See [`QuerySet::fetch_annotated`].
3611    pub async fn fetch_annotated(
3612        &self,
3613    ) -> Result<Vec<(T, serde_json::Map<String, JsonValue>)>, sqlx::Error>
3614    where
3615        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3616            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3617    {
3618        self.queryset().fetch_annotated().await
3619    }
3620
3621    /// See [`QuerySet::values`].
3622    pub async fn values(&self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
3623        self.queryset().values(columns).await
3624    }
3625
3626    /// See `QuerySet::exists`.
3627    pub async fn exists(&self) -> Result<bool, sqlx::Error>
3628    where
3629        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3630            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3631            + HydrateRelated,
3632    {
3633        self.queryset().exists().await
3634    }
3635
3636    /// `.get(predicate)` — sugar for `.filter(predicate).get()`.
3637    ///
3638    /// The one-liner: `User::objects().get(user::ID.eq(1))`.
3639    /// See [`QuerySet::get`] for error-variant semantics.
3640    pub async fn get(&self, p: Predicate<T>) -> Result<T, GetError>
3641    where
3642        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3643            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3644            + HydrateRelated,
3645    {
3646        self.queryset().filter(p).get().await
3647    }
3648
3649    /// See [`QuerySet::fetch_pg`].
3650    pub async fn fetch_pg(&self, pool: &sqlx::PgPool) -> Result<Vec<T>, sqlx::Error>
3651    where
3652        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3653    {
3654        self.queryset().fetch_pg(pool).await
3655    }
3656
3657    /// See [`QuerySet::first_pg`].
3658    pub async fn first_pg(&self, pool: &sqlx::PgPool) -> Result<Option<T>, sqlx::Error>
3659    where
3660        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3661    {
3662        self.queryset().first_pg(pool).await
3663    }
3664
3665    /// See [`QuerySet::count_pg`].
3666    pub async fn count_pg(&self, pool: &sqlx::PgPool) -> Result<i64, sqlx::Error> {
3667        self.queryset().count_pg(pool).await
3668    }
3669
3670    /// See [`QuerySet::exists_pg`].
3671    pub async fn exists_pg(&self, pool: &sqlx::PgPool) -> Result<bool, sqlx::Error>
3672    where
3673        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3674    {
3675        self.queryset().exists_pg(pool).await
3676    }
3677
3678    /// Postgres-only sugar for `.filter(predicate).get_pg(pool)`.
3679    pub async fn get_pg(&self, pool: &sqlx::PgPool, p: Predicate<T>) -> Result<T, GetError>
3680    where
3681        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3682    {
3683        self.queryset().filter(p).get_pg(pool).await
3684    }
3685
3686    // =====================================================================
3687    // Write methods — INSERT.
3688    //
3689    // `create(instance)` does one row; `bulk_create([...])` does many in
3690    // a single multi-VALUES INSERT. Both serialise the instance(s) to a
3691    // JSON map via `serde::Serialize`, look up each field in the model's
3692    // `FIELDS` metadata, and bind values through
3693    // [`crate::orm::write::json_to_sea_value`].
3694    //
3695    // PK handling:
3696    // - Default value (0 for ints, nil for UUIDs, empty for String):
3697    //   omitted from the INSERT column list so the DB autoincrement /
3698    //   default kicks in.
3699    // - Explicit non-default value: included in the INSERT so the
3700    //   caller can supply UUIDs / slug PKs themselves.
3701    // =====================================================================
3702
3703    /// INSERT one row, return the row as it now exists in the
3704    /// database (with any autoincrement PK populated). Uses the
3705    /// ambient pool via `Manager::queryset().resolve_pool`.
3706    pub async fn create(&self, mut instance: T) -> Result<T, crate::orm::write::WriteError>
3707    where
3708        T: serde::Serialize
3709            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3710            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3711            + HydrateRelated,
3712    {
3713        use crate::orm::write::WriteError;
3714        let map = serialize_to_map(&instance)?;
3715
3716        // Same pre-DB validation pipeline the dynamic
3717        // `insert_json` path runs — choices + FK existence +
3718        // M2M shape. Empty-string + required-field checks are
3719        // intentionally relaxed on the typed path: a Rust
3720        // `pub title: String` field set to `""` is the caller's
3721        // deliberate choice, not a form-default leak, and
3722        // missing-required can't happen because the struct
3723        // forced the caller to supply every column at compile
3724        // time. We only validate the things the typed path
3725        // can't catch at compile time.
3726        let meta = crate::migrate::ModelMeta::for_::<T>();
3727        let validation_errors = crate::orm::validation::validate_on_typed_create(&meta, &map).await;
3728        if !validation_errors.is_empty() {
3729            return Err(WriteError::Multiple {
3730                errors: validation_errors,
3731            });
3732        }
3733
3734        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3735        let backend = pool.backend_name();
3736        let stmt = build_insert_one_for::<T>(backend, &map)?;
3737        let atomic = self.should_atomic_wrap();
3738        // Post-execution SQL classification: turns the DB's
3739        // UNIQUE / FK / NOT NULL / CHECK violations into the
3740        // structured `WriteError` variants instead of a raw
3741        // `Sqlx(_)` 500. Symmetric with `DynQuerySet::insert_json`.
3742        match pool {
3743            DbPool::Sqlite(pool) => {
3744                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3745                let row_result = if atomic {
3746                    let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3747                    let r = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
3748                        .fetch_one(&mut *tx)
3749                        .await;
3750                    match r {
3751                        Ok(row) => {
3752                            tx.commit().await.map_err(WriteError::Sqlx)?;
3753                            Ok(row)
3754                        }
3755                        Err(e) => {
3756                            let _ = tx.rollback().await;
3757                            Err(e)
3758                        }
3759                    }
3760                } else {
3761                    sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
3762                        .fetch_one(&pool)
3763                        .await
3764                };
3765                let mut row = row_result.map_err(|e| {
3766                    crate::orm::validation::classify_sql_error(&e, &map)
3767                        .unwrap_or(WriteError::Sqlx(e))
3768                })?;
3769                // BUG-16 step 2: every materialised row, including the
3770                // post-INSERT readback, needs `parent_id` +
3771                // `junction_table` seeded on its M2M slots — otherwise
3772                // `row.tags.add(...)` is a silent no-op.
3773                row.set_m2m_parent_ids();
3774                // Carry form-staged M2M pending ids from the caller's
3775                // instance onto the readback row, then flush them to
3776                // junction rows now that parent_id + junction_table are
3777                // seeded on the readback row.
3778                instance.take_pending_m2m_into(&mut row);
3779                row.write_pending_m2m().await?;
3780                Ok(row)
3781            }
3782            DbPool::Postgres(pool) => {
3783                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3784                let row_result = if atomic {
3785                    let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3786                    let r = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3787                        .fetch_one(&mut *tx)
3788                        .await;
3789                    match r {
3790                        Ok(row) => {
3791                            tx.commit().await.map_err(WriteError::Sqlx)?;
3792                            Ok(row)
3793                        }
3794                        Err(e) => {
3795                            let _ = tx.rollback().await;
3796                            Err(e)
3797                        }
3798                    }
3799                } else {
3800                    sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3801                        .fetch_one(&pool)
3802                        .await
3803                };
3804                let mut row = row_result.map_err(|e| {
3805                    crate::orm::validation::classify_sql_error(&e, &map)
3806                        .unwrap_or(WriteError::Sqlx(e))
3807                })?;
3808                row.set_m2m_parent_ids();
3809                instance.take_pending_m2m_into(&mut row);
3810                row.write_pending_m2m().await?;
3811                Ok(row)
3812            }
3813        }
3814    }
3815
3816    /// INSERT many rows in a single statement. Returns the number of
3817    /// rows inserted. The full populated rows aren't materialised —
3818    /// use a follow-up `Model::objects().filter(...).fetch()` if you
3819    /// need them.
3820    ///
3821    /// Empty input is a no-op (returns Ok(0)) — the alternative
3822    /// (building an `INSERT INTO t () VALUES ()` and failing at the
3823    /// DB) doesn't help anyone.
3824    ///
3825    /// Fires `bulk_post_save:<table>` once with `{ ids, created: true,
3826    /// actor }` when at least one row was inserted. Per-row
3827    /// `pre_save` / `post_save` are NOT fired — use [`Self::save`]
3828    /// when per-row signal semantics are required.
3829    pub async fn bulk_create(&self, instances: Vec<T>) -> Result<u64, crate::orm::write::WriteError>
3830    where
3831        T: serde::Serialize,
3832    {
3833        use crate::orm::write::WriteError;
3834        if instances.is_empty() {
3835            return Ok(0);
3836        }
3837        let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
3838        let maps = maps?;
3839        // Validate every instance through the typed-create
3840        // pipeline. Collected into one `Multiple` so a caller
3841        // that submitted ten rows and got two bad ones can fix
3842        // both in one pass.
3843        let meta = crate::migrate::ModelMeta::for_::<T>();
3844        let mut all_errors: Vec<WriteError> = Vec::new();
3845        for map in &maps {
3846            let errs = crate::orm::validation::validate_on_typed_create(&meta, map).await;
3847            all_errors.extend(errs);
3848        }
3849        if !all_errors.is_empty() {
3850            return Err(WriteError::Multiple { errors: all_errors });
3851        }
3852        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3853        let backend = pool.backend_name();
3854        let mut stmt = build_insert_many_for::<T>(backend, &maps)?;
3855        // First row's map is used to enrich UNIQUE / FK
3856        // messages with the offending value when the engine
3857        // doesn't name it. Imperfect for bulk (the failing row
3858        // could be later in the batch) but better than the raw
3859        // sqlx error.
3860        let first_map = maps.first().cloned().unwrap_or_default();
3861        // Add `RETURNING <pk>` so the bulk_post_save signal payload can
3862        // carry the inserted PKs. Both backends support this — SQLite
3863        // since 3.35, Postgres natively. Replaces the previous
3864        // `execute()` + rows_affected path; count comes from the
3865        // returned row vector instead.
3866        let pk = pk_field::<T>();
3867        if let Some(field) = pk {
3868            stmt.returning_col(Alias::new(field.name));
3869        }
3870        let atomic = self.should_atomic_wrap();
3871        let ids: Vec<JsonValue> = match pool {
3872            DbPool::Sqlite(pool) => {
3873                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3874                let rows = if atomic {
3875                    let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3876                    let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3877                        .fetch_all(&mut *tx)
3878                        .await;
3879                    match r {
3880                        Ok(rows) => {
3881                            tx.commit().await.map_err(WriteError::Sqlx)?;
3882                            rows
3883                        }
3884                        Err(e) => {
3885                            let _ = tx.rollback().await;
3886                            return Err(crate::orm::validation::classify_sql_error(&e, &first_map)
3887                                .unwrap_or(WriteError::Sqlx(e)));
3888                        }
3889                    }
3890                } else {
3891                    sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3892                        .fetch_all(&pool)
3893                        .await
3894                        .map_err(|e| {
3895                            crate::orm::validation::classify_sql_error(&e, &first_map)
3896                                .unwrap_or(WriteError::Sqlx(e))
3897                        })?
3898                };
3899                match pk {
3900                    Some(field) => rows
3901                        .iter()
3902                        .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3903                        .collect::<Result<_, _>>()
3904                        .map_err(WriteError::Sqlx)?,
3905                    None => Vec::new(),
3906                }
3907            }
3908            DbPool::Postgres(pool) => {
3909                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3910                let rows = if atomic {
3911                    let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3912                    let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3913                        .fetch_all(&mut *tx)
3914                        .await;
3915                    match r {
3916                        Ok(rows) => {
3917                            tx.commit().await.map_err(WriteError::Sqlx)?;
3918                            rows
3919                        }
3920                        Err(e) => {
3921                            let _ = tx.rollback().await;
3922                            return Err(crate::orm::validation::classify_sql_error(&e, &first_map)
3923                                .unwrap_or(WriteError::Sqlx(e)));
3924                        }
3925                    }
3926                } else {
3927                    sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3928                        .fetch_all(&pool)
3929                        .await
3930                        .map_err(|e| {
3931                            crate::orm::validation::classify_sql_error(&e, &first_map)
3932                                .unwrap_or(WriteError::Sqlx(e))
3933                        })?
3934                };
3935                match pk {
3936                    Some(field) => rows
3937                        .iter()
3938                        .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3939                        .collect::<Result<_, _>>()
3940                        .map_err(WriteError::Sqlx)?,
3941                    None => Vec::new(),
3942                }
3943            }
3944        };
3945        let count = ids.len() as u64;
3946        if !ids.is_empty() {
3947            crate::signals::emit_bulk_post_save::<T>(ids, true).await;
3948        }
3949        Ok(count)
3950    }
3951
3952    /// The `get_or_create` terminal: fetch the first row matching `predicate`;
3953    /// if none exists, insert `defaults` and return it. Returns
3954    /// `(row, created)` so the caller can branch on whether the write
3955    /// happened. Two queries on the miss path (filter+first then create),
3956    /// one query on the hit path.
3957    ///
3958    /// ## Concurrency
3959    ///
3960    /// Convergent under concurrent callers: if two callers both miss the
3961    /// SELECT and race to INSERT, the one that loses gets a
3962    /// `UniqueViolation`; that error is caught here and the existing row
3963    /// is re-fetched, so both callers return the same row with
3964    /// `created = false` for the loser. A UNIQUE constraint on the
3965    /// predicate columns is required for true at-most-one semantics — the
3966    /// constraint is what makes the convergence deterministic.
3967    pub async fn get_or_create(
3968        &self,
3969        predicate: Predicate<T>,
3970        defaults: T,
3971    ) -> Result<(T, bool), crate::orm::write::WriteError>
3972    where
3973        T: serde::Serialize
3974            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3975            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3976            + HydrateRelated,
3977    {
3978        use crate::orm::write::WriteError;
3979
3980        // Read-your-writes: probe for the existing row on the WRITE database,
3981        // not a (possibly lagging) read replica — otherwise a read/write-split
3982        // router could miss a just-written row and insert a duplicate. The
3983        // following `create()` already resolves the same write target.
3984        let write_pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3985        if let Some(existing) = pin_to_pool(self.filter(predicate.clone()), &write_pool)
3986            .first()
3987            .await
3988            .map_err(WriteError::Sqlx)?
3989        {
3990            return Ok((existing, false));
3991        }
3992
3993        // Attempt the INSERT. On a UNIQUE violation (concurrent writer won the
3994        // race between our SELECT and this INSERT), catch the error and
3995        // re-SELECT to return the now-existing row with created=false.
3996        // This is the standard try-insert-then-fetch convergence pattern.
3997        //
3998        // Note on transaction semantics: a plain INSERT is atomic at the
3999        // statement level. Wrapping SELECT+INSERT in a serialisable transaction
4000        // would be stricter but requires SAVEPOINT support to recover from the
4001        // Postgres "aborted transaction" state after a constraint violation —
4002        // a per-operation SAVEPOINT would add two extra round-trips on every
4003        // write for marginal gain. The UNIQUE-constraint backstop plus this
4004        // re-fetch gives the same observable guarantee: callers always converge
4005        // on the same row and never see a spurious UniqueViolation.
4006        match self.create(defaults).await {
4007            Ok(created) => Ok((created, true)),
4008            Err(WriteError::UniqueViolation { .. }) => {
4009                // A concurrent writer inserted the row between our SELECT and
4010                // our INSERT. Re-fetch the now-existing row.
4011                let existing = pin_to_pool(self.filter(predicate), &write_pool)
4012                    .first()
4013                    .await
4014                    .map_err(WriteError::Sqlx)?
4015                    .ok_or_else(|| {
4016                        WriteError::Sqlx(sqlx::Error::Protocol(
4017                            "get_or_create: row vanished after UniqueViolation re-fetch"
4018                                .to_string(),
4019                        ))
4020                    })?;
4021                Ok((existing, false))
4022            }
4023            Err(e) => Err(e),
4024        }
4025    }
4026
4027    /// The `update_or_create` terminal: fetch the first row matching
4028    /// `predicate`; if found, update its non-PK columns with the
4029    /// `defaults` instance's values and return the fresh row;
4030    /// otherwise insert `defaults` and return it. Returns
4031    /// `(row, created)` so the caller can branch on the path taken.
4032    ///
4033    /// The defaults' PK is intentionally ignored on the update path —
4034    /// the matched row keeps its original PK. On the insert path the
4035    /// defaults' PK is honoured (autoincrement sentinel `0` → DB
4036    /// assigns; explicit value → DB uses it).
4037    ///
4038    /// ## Concurrency
4039    ///
4040    /// Convergent under concurrent callers: if two callers both miss the
4041    /// SELECT and race to INSERT, the loser gets a `UniqueViolation`; that
4042    /// error is caught here and the existing row is re-fetched, then the
4043    /// update is applied to it. Both callers converge on the same row, with
4044    /// `created = false` for the loser. A UNIQUE constraint on the predicate
4045    /// columns is required for deterministic convergence.
4046    ///
4047    /// Implementation: 2 queries on the hit path (`first` + UPDATE + re-fetch),
4048    /// 2 queries on the miss+create path (`first` + `create`), or
4049    /// 4 queries on the miss+race path (`first` + failed-INSERT + `first`
4050    /// + UPDATE + re-fetch).
4051    pub async fn update_or_create(
4052        &self,
4053        predicate: Predicate<T>,
4054        defaults: T,
4055    ) -> Result<(T, bool), crate::orm::write::WriteError>
4056    where
4057        T: serde::Serialize
4058            + Clone
4059            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4060            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
4061            + HydrateRelated,
4062    {
4063        use crate::orm::write::WriteError;
4064        let pk = pk_field::<T>().ok_or_else(|| {
4065            WriteError::Sqlx(sqlx::Error::Protocol(
4066                "update_or_create: model has no primary key".to_string(),
4067            ))
4068        })?;
4069        let pk_name = pk.name;
4070
4071        // Read-your-writes: the existence probe and the post-update re-fetch
4072        // run on the WRITE database, so a read/write-split router doesn't miss
4073        // a just-written row (duplicate insert) or read a stale row back. The
4074        // intervening UPDATE already routes to the same write target.
4075        let write_pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4076
4077        // Shared helper: given the existing row (already fetched) and the
4078        // defaults instance, apply the non-PK column update and return the
4079        // re-fetched row. Used by both the direct-hit path and the
4080        // UniqueViolation-convergence path so the update logic is in one place.
4081        macro_rules! do_update {
4082            ($existing:expr, $defaults:expr) => {{
4083                let existing: T = $existing;
4084                let defaults: T = $defaults;
4085
4086                // Serialize defaults, drop the PK so the matched row's PK is
4087                // preserved, then UPDATE WHERE <pk_col> = <existing_pk>.
4088                let mut update_map = serialize_to_map(&defaults)?;
4089                update_map.remove(pk_name);
4090
4091                // Build a PK predicate from the existing row's serialized PK
4092                // value. Goes through serde_json so any PK type (i64, String,
4093                // Uuid) round-trips correctly through sea-query.
4094                let existing_json =
4095                    serde_json::to_value(&existing).map_err(WriteError::SerializeFailed)?;
4096                let pk_value_json = existing_json
4097                    .get(pk_name)
4098                    .cloned()
4099                    .unwrap_or(serde_json::Value::Null);
4100                let pk_sea = crate::orm::write::json_to_sea_value(
4101                    pk.ty,
4102                    &pk_value_json,
4103                    false,
4104                    pk_name,
4105                    None,
4106                )?;
4107                let pk_pred: Predicate<T> = Predicate::new(
4108                    sea_query::Expr::col(sea_query::Alias::new(pk_name)).eq(pk_sea),
4109                );
4110
4111                // Run the UPDATE.
4112                self.filter(pk_pred).update_values(update_map).await?;
4113
4114                // Re-fetch to return the populated row.
4115                let pk_sea2 = crate::orm::write::json_to_sea_value(
4116                    pk.ty,
4117                    &pk_value_json,
4118                    false,
4119                    pk_name,
4120                    None,
4121                )?;
4122                let refetch_pred: Predicate<T> = Predicate::new(
4123                    sea_query::Expr::col(sea_query::Alias::new(pk_name)).eq(pk_sea2),
4124                );
4125                pin_to_pool(self.filter(refetch_pred), &write_pool)
4126                    .first()
4127                    .await
4128                    .map_err(WriteError::Sqlx)?
4129                    .ok_or_else(|| {
4130                        WriteError::Sqlx(sqlx::Error::Protocol(
4131                            "update_or_create: row vanished between UPDATE and re-fetch"
4132                                .to_string(),
4133                        ))
4134                    })?
4135            }};
4136        }
4137
4138        if let Some(existing) = pin_to_pool(self.filter(predicate.clone()), &write_pool)
4139            .first()
4140            .await
4141            .map_err(WriteError::Sqlx)?
4142        {
4143            let updated = do_update!(existing, defaults);
4144            return Ok((updated, false));
4145        }
4146
4147        // Attempt the INSERT. On a UNIQUE violation (concurrent writer won the
4148        // race between our SELECT and this INSERT), catch the error, re-fetch
4149        // the now-existing row, and apply the update to it — same convergence
4150        // as get_or_create but with an extra UPDATE step.
4151        match self.create(defaults.clone()).await {
4152            Ok(created) => Ok((created, true)),
4153            Err(WriteError::UniqueViolation { .. }) => {
4154                // A concurrent writer inserted the row between our SELECT and
4155                // our INSERT. Re-fetch then update, same as the direct-hit path.
4156                let existing = pin_to_pool(self.filter(predicate), &write_pool)
4157                    .first()
4158                    .await
4159                    .map_err(WriteError::Sqlx)?
4160                    .ok_or_else(|| {
4161                        WriteError::Sqlx(sqlx::Error::Protocol(
4162                            "update_or_create: row vanished after UniqueViolation re-fetch"
4163                                .to_string(),
4164                        ))
4165                    })?;
4166                let updated = do_update!(existing, defaults);
4167                Ok((updated, false))
4168            }
4169            Err(e) => Err(e),
4170        }
4171    }
4172
4173    /// INSERT-or-UPDATE keyed on the primary key. The row's PK column
4174    /// is the conflict target; on a hit, every non-PK column is
4175    /// overwritten with the supplied value. Returns the row as the DB
4176    /// stored it (post-upsert).
4177    ///
4178    /// Both backends use `INSERT ... ON CONFLICT(<pk>) DO UPDATE SET
4179    /// col = excluded.col, ...`. The SQLite and Postgres syntax happens
4180    /// to match exactly here so a single sea-query `OnConflict` builder
4181    /// covers both.
4182    pub async fn upsert(&self, instance: T) -> Result<T, crate::orm::write::WriteError>
4183    where
4184        T: serde::Serialize
4185            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4186            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4187    {
4188        let map = serialize_to_map(&instance)?;
4189        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4190        let backend = pool.backend_name();
4191        let mut stmt = build_insert_one_for::<T>(backend, &map)?;
4192
4193        // Conflict target = PK column. update_columns = every non-PK
4194        // column the body included. sea-query renders `DO UPDATE SET
4195        // col = excluded.col` (SQLite) / `DO UPDATE SET col =
4196        // EXCLUDED.col` (PG) — both forms work cross-dialect.
4197        let pk_name = T::FIELDS
4198            .iter()
4199            .find(|f| f.primary_key)
4200            .map(|f| f.name)
4201            .ok_or_else(|| {
4202                crate::orm::write::WriteError::Sqlx(sqlx::Error::Protocol(
4203                    "upsert: model has no primary key — use get_or_create or create instead"
4204                        .to_string(),
4205                ))
4206            })?;
4207        let update_cols: Vec<Alias> = T::FIELDS
4208            .iter()
4209            .filter(|f| !f.primary_key && map.contains_key(f.name))
4210            .map(|f| Alias::new(f.name))
4211            .collect();
4212        let mut on_conflict = sea_query::OnConflict::column(Alias::new(pk_name));
4213        if !update_cols.is_empty() {
4214            on_conflict.update_columns(update_cols);
4215        } else {
4216            // No non-PK columns to overwrite — this is a "INSERT OR
4217            // IGNORE" shape. sea-query encodes that as `DO NOTHING`.
4218            on_conflict.do_nothing();
4219        }
4220        stmt.on_conflict(on_conflict);
4221
4222        match pool {
4223            DbPool::Sqlite(pool) => {
4224                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4225                let row = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4226                    .fetch_one(&pool)
4227                    .await?;
4228                Ok(row)
4229            }
4230            DbPool::Postgres(pool) => {
4231                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4232                let row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4233                    .fetch_one(&pool)
4234                    .await?;
4235                Ok(row)
4236            }
4237        }
4238    }
4239
4240    /// `create` against an explicit Postgres pool. The Postgres
4241    /// counterpart of [`Self::create`] for models with Postgres-only
4242    /// field types (Array, Inet, MacAddr, FullText), whose `FromRow`
4243    /// impl exists only for `PgRow`.
4244    pub async fn create_pg(
4245        &self,
4246        instance: T,
4247        pool: &sqlx::PgPool,
4248    ) -> Result<T, crate::orm::write::WriteError>
4249    where
4250        T: serde::Serialize + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4251    {
4252        let map = serialize_to_map(&instance)?;
4253        let stmt = build_insert_one_for::<T>("postgres", &map)?;
4254        let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4255        let row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4256            .fetch_one(pool)
4257            .await?;
4258        Ok(row)
4259    }
4260
4261    /// Apply per-row differing values to a list of instances in one
4262    /// statement. Each instance carries its own PK and its own
4263    /// column values; the generated SQL uses one `CASE pk WHEN ...
4264    /// THEN ... END` per non-PK column, plus a `WHERE pk IN (...)`
4265    /// to scope the update.
4266    ///
4267    /// A `bulk_update(objs, fields)` that
4268    /// updates every non-PK column rather than asking the caller to
4269    /// list them. Returns the number of rows affected.
4270    ///
4271    /// Empty input is a no-op (returns 0). The pattern works on both
4272    /// SQLite and Postgres — the `CASE` expression is SQL-standard.
4273    ///
4274    /// Limitations:
4275    /// - All instances must have a non-default PK (the caller has
4276    ///   already loaded the rows). Default-PK instances are skipped.
4277    /// - Bulk-write signals are NOT fired by this path — it's the
4278    ///   `Manager::bulk_create` analogue for UPDATE, deliberately
4279    ///   silent for speed.
4280    pub async fn bulk_update(&self, instances: Vec<T>) -> Result<u64, crate::orm::write::WriteError>
4281    where
4282        T: serde::Serialize,
4283    {
4284        use crate::orm::write::{WriteError, is_default_pk, json_to_sea_value};
4285        if instances.is_empty() {
4286            return Ok(0);
4287        }
4288        let pk = pk_field::<T>().ok_or_else(|| {
4289            WriteError::Sqlx(sqlx::Error::Protocol(
4290                "bulk_update: model has no primary key".to_string(),
4291            ))
4292        })?;
4293        let pk_name = pk.name;
4294        let pk_ty = pk.ty;
4295
4296        // Serialize every instance, collecting (pk_value, full_map)
4297        // for the CASE branches and the IN clause. Skip rows whose
4298        // PK is still the default sentinel — they were never
4299        // persisted and a bulk UPDATE on them is a no-op anyway.
4300        let mut serialized: Vec<(
4301            serde_json::Value,
4302            serde_json::Map<String, serde_json::Value>,
4303        )> = Vec::with_capacity(instances.len());
4304        for instance in &instances {
4305            let map = serialize_to_map(instance)?;
4306            let pk_val = map.get(pk_name).cloned().unwrap_or(serde_json::Value::Null);
4307            if is_default_pk(pk_ty, &pk_val) {
4308                continue;
4309            }
4310            serialized.push((pk_val, map));
4311        }
4312        if serialized.is_empty() {
4313            return Ok(0);
4314        }
4315
4316        // Collect the list of non-PK columns to update from the
4317        // first row (every row contributes the same column set
4318        // because they're typed instances of T).
4319        let update_cols: Vec<&crate::orm::FieldSpec> =
4320            T::FIELDS.iter().filter(|f| !f.primary_key).collect();
4321
4322        // Build the UPDATE: one CASE per column, IN clause for the
4323        // WHERE. Goes through sea-query's update statement for
4324        // backend portability.
4325        let mut stmt = sea_query::Query::update();
4326        stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
4327
4328        for field in &update_cols {
4329            // CASE pk_col
4330            //   WHEN <pk1> THEN <val1>
4331            //   WHEN <pk2> THEN <val2>
4332            //   ...
4333            // END
4334            let mut case = sea_query::CaseStatement::new();
4335            for (pk_val, map) in &serialized {
4336                let val = map
4337                    .get(field.name)
4338                    .cloned()
4339                    .unwrap_or(serde_json::Value::Null);
4340                let cell = json_to_sea_value(
4341                    field.ty,
4342                    &val,
4343                    field.nullable,
4344                    field.name,
4345                    fk_pk_hint(field),
4346                )?;
4347                let pk_sea = json_to_sea_value(pk_ty, pk_val, false, pk_name, None)?;
4348                case = case.case(sea_query::Expr::col(Alias::new(pk_name)).eq(pk_sea), cell);
4349            }
4350            stmt.value(Alias::new(field.name), case);
4351        }
4352
4353        // WHERE pk IN (<pk1>, <pk2>, ...)
4354        let pk_seas: Vec<sea_query::Value> = serialized
4355            .iter()
4356            .map(|(pk_val, _)| json_to_sea_value(pk_ty, pk_val, false, pk_name, None))
4357            .collect::<Result<_, _>>()?;
4358        stmt.and_where(sea_query::Expr::col(Alias::new(pk_name)).is_in(pk_seas));
4359
4360        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4361        let affected = match pool {
4362            DbPool::Sqlite(pool) => {
4363                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4364                sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4365                    .execute(&pool)
4366                    .await
4367                    .map_err(WriteError::Sqlx)?
4368                    .rows_affected()
4369            }
4370            DbPool::Postgres(pool) => {
4371                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4372                sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4373                    .execute(&pool)
4374                    .await
4375                    .map_err(WriteError::Sqlx)?
4376                    .rows_affected()
4377            }
4378        };
4379        Ok(affected)
4380    }
4381
4382    /// Run a hand-written SQL query and return typed `Vec<T>` rows.
4383    ///
4384    /// The escape hatch for queries the QuerySet builder can't (or
4385    /// shouldn't) model — CTEs, vendor-specific functions, ad-hoc
4386    /// reporting. Delegates to `sqlx::query_as` against the ambient
4387    /// pool and dispatches on backend, so user code stays portable.
4388    /// The string is sent verbatim; no parameter binding (use
4389    /// `Predicate` / the typed query path for parameterised
4390    /// queries). Inject input only after manual sanitisation.
4391    ///
4392    /// Skips the `select_related` / `prefetch_related` chain — those
4393    /// only apply to the typed QuerySet build path.
4394    pub async fn raw(&self, sql: &str) -> Result<Vec<T>, sqlx::Error>
4395    where
4396        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4397            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4398    {
4399        // Routing: `raw` resolves to the READ database (most raw statements
4400        // are SELECTs, which this returns as `Vec<T>`). Under a read/write-
4401        // split router, a raw statement that WRITES must pin the write pool
4402        // explicitly via `.on(&pool)` / `.on_pg(&pool)`, since the router
4403        // cannot inspect arbitrary SQL to know it mutates.
4404        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Read);
4405        match pool {
4406            DbPool::Sqlite(pool) => {
4407                sqlx::query_as::<sqlx::Sqlite, T>(sql)
4408                    .fetch_all(&pool)
4409                    .await
4410            }
4411            DbPool::Postgres(pool) => {
4412                sqlx::query_as::<sqlx::Postgres, T>(sql)
4413                    .fetch_all(&pool)
4414                    .await
4415            }
4416        }
4417    }
4418
4419    /// `bulk_create` against an explicit Postgres pool.
4420    pub async fn bulk_create_pg(
4421        &self,
4422        instances: Vec<T>,
4423        pool: &sqlx::PgPool,
4424    ) -> Result<u64, crate::orm::write::WriteError>
4425    where
4426        T: serde::Serialize,
4427    {
4428        if instances.is_empty() {
4429            return Ok(0);
4430        }
4431        let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
4432        let maps = maps?;
4433        let stmt = build_insert_many_for::<T>("postgres", &maps)?;
4434        let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4435        let result = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4436            .execute(pool)
4437            .await?;
4438        Ok(result.rows_affected())
4439    }
4440}
4441
4442// QuerySetTx (struct + impl) moved to `super::tx`; re-exported above.
4443
4444impl<T: Model> Manager<T> {
4445    /// Begin a new query on this manager attached to the given open transaction.
4446    ///
4447    /// Sugar for `T::objects().on_tx(tx)` — lets callers skip the intermediate
4448    /// `QuerySet` construction when they want to go straight to a terminal:
4449    ///
4450    /// ```rust,ignore
4451    /// umbral::db::transaction(|tx| async move {
4452    ///     let post = Post::objects().on_tx(tx).create(new_post).await?;
4453    ///     Ok::<_, MyError>(post)
4454    /// }).await?;
4455    /// ```
4456    pub fn on_tx<'a>(&self, tx: &'a mut crate::db::Transaction) -> QuerySetTx<'a, T> {
4457        self.queryset().on_tx(tx)
4458    }
4459
4460    /// INSERT one row inside `tx` and return the populated row.
4461    ///
4462    /// This is the primary Manager-level entry point for transactional writes.
4463    /// Equivalent to `Post::objects().on_tx(tx).create(instance)` but more
4464    /// ergonomic when you only need the one INSERT (no filter chain needed).
4465    ///
4466    /// ```rust,ignore
4467    /// umbral::db::transaction(|tx| async move {
4468    ///     let post = Post::objects().create_in_tx(new_post, tx).await?;
4469    ///     Ok::<_, MyError>(post)
4470    /// }).await?;
4471    /// ```
4472    pub async fn create_in_tx(
4473        &self,
4474        instance: T,
4475        tx: &mut crate::db::Transaction,
4476    ) -> Result<T, crate::orm::write::WriteError>
4477    where
4478        T: serde::Serialize
4479            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4480            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
4481            + HydrateRelated,
4482    {
4483        let map = serialize_to_map(&instance)?;
4484        let stmt = build_insert_one_for::<T>(tx.backend_name(), &map)?;
4485        match tx.backend_name() {
4486            "sqlite" => {
4487                let inner = tx.as_sqlite_mut().unwrap();
4488                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4489                let mut row = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4490                    .fetch_one(&mut **inner)
4491                    .await?;
4492                row.set_m2m_parent_ids();
4493                Ok(row)
4494            }
4495            _ => {
4496                let inner = tx.as_pg_mut().unwrap();
4497                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4498                let mut row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4499                    .fetch_one(&mut **inner)
4500                    .await?;
4501                row.set_m2m_parent_ids();
4502                Ok(row)
4503            }
4504        }
4505    }
4506
4507    /// INSERT many rows inside `tx`.
4508    ///
4509    /// Returns the number of rows inserted. Empty input is a no-op.
4510    pub async fn bulk_create_in_tx(
4511        &self,
4512        instances: Vec<T>,
4513        tx: &mut crate::db::Transaction,
4514    ) -> Result<u64, crate::orm::write::WriteError>
4515    where
4516        T: serde::Serialize,
4517    {
4518        if instances.is_empty() {
4519            return Ok(0);
4520        }
4521        let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
4522        let maps = maps?;
4523        let stmt = build_insert_many_for::<T>(tx.backend_name(), &maps)?;
4524        match tx.backend_name() {
4525            "sqlite" => {
4526                let inner = tx.as_sqlite_mut().unwrap();
4527                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4528                let result = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4529                    .execute(&mut **inner)
4530                    .await?;
4531                Ok(result.rows_affected())
4532            }
4533            _ => {
4534                let inner = tx.as_pg_mut().unwrap();
4535                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4536                let result = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4537                    .execute(&mut **inner)
4538                    .await?;
4539                Ok(result.rows_affected())
4540            }
4541        }
4542    }
4543
4544    // =====================================================================
4545    // Per-instance signal-firing write methods.
4546    //
4547    // `save(instance)` and `delete_instance(instance)` are the methods that
4548    // fire the ORM lifecycle signals (`pre_save` / `post_save` /
4549    // `pre_delete` / `post_delete`). The existing bulk methods
4550    // (`create`, `bulk_create`, `QuerySet::update_values`,
4551    // `QuerySet::delete`) remain signal-free by design:
4552    // bulk operations bypass signals for performance.
4553    //
4554    // Signal name format: `<event>:<table>` — e.g. `post_save:post`.
4555    // Payload shapes:
4556    //   save:   `{ "instance": <M as JSON>, "created": bool }`
4557    //   delete: `{ "instance": <M as JSON> }`
4558    //
4559    // The `created` flag on save follows the convention:
4560    //   `true`  when the PK is the default sentinel → INSERT path.
4561    //   `false` when the PK is non-default           → UPDATE path.
4562    // =====================================================================
4563
4564    /// Save one instance, firing `pre_save` + `post_save` signals.
4565    ///
4566    /// Determines INSERT vs UPDATE by checking whether the primary key
4567    /// is the autoincrement sentinel (`0` for integers, nil UUID, empty
4568    /// string). If it is, an INSERT is performed (`created = true`);
4569    /// otherwise an `UPDATE ... WHERE pk = <value>` is run (`created = false`).
4570    ///
4571    /// Returns the row as it exists in the database after the write
4572    /// (populated PK for inserts, same row for updates).
4573    ///
4574    /// ## Signal contract
4575    ///
4576    /// - `pre_save:<table>` fires before the database write with
4577    ///   `{ "instance": ..., "created": bool, "actor": ... }`.
4578    /// - `post_save:<table>` fires after the database write with the
4579    ///   DB-read-back row and the same envelope keys.
4580    ///
4581    /// The `"actor"` value is set by the nearest enclosing
4582    /// [`crate::signals::with_actor`] scope; `Value::Null` when no
4583    /// scope is active.
4584    ///
4585    /// ## Bulk paths fire bulk signals, not per-row signals
4586    ///
4587    /// `Manager::create`, `Manager::bulk_create`, and
4588    /// `QuerySet::update_values` / `QuerySet::delete` do NOT fire
4589    /// per-row `post_save` / `post_delete`. They fire
4590    /// `bulk_post_save:<table>` / `bulk_post_delete:<table>` once per
4591    /// statement with the affected PKs in the payload. Use `save` /
4592    /// `delete_instance` when per-row signal semantics are needed.
4593    pub async fn save(&self, instance: T) -> Result<T, crate::orm::write::SaveError>
4594    where
4595        T: serde::Serialize
4596            + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4597            + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4598    {
4599        use crate::orm::write::{SaveError, is_default_pk};
4600        // Determine INSERT vs UPDATE by inspecting the PK field.
4601        let pk_field = T::FIELDS
4602            .iter()
4603            .find(|f| f.primary_key)
4604            .ok_or(SaveError::NoPrimaryKey)?;
4605        let map = serialize_to_map(&instance).map_err(SaveError::Write)?;
4606        let pk_val = map
4607            .get(pk_field.name)
4608            .cloned()
4609            .unwrap_or(serde_json::Value::Null);
4610        let created = is_default_pk(pk_field.ty, &pk_val);
4611
4612        // Fire pre_save before the write.
4613        crate::signals::emit_pre_save::<T>(&instance, created).await;
4614
4615        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4616        let backend = pool.backend_name();
4617
4618        if created {
4619            // INSERT path.
4620            let stmt = build_insert_one_for::<T>(backend, &map).map_err(SaveError::Write)?;
4621            let row = match pool {
4622                DbPool::Sqlite(pool) => {
4623                    let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4624                    sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4625                        .fetch_one(&pool)
4626                        .await
4627                        .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4628                }
4629                DbPool::Postgres(pool) => {
4630                    let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4631                    sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4632                        .fetch_one(&pool)
4633                        .await
4634                        .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4635                }
4636            };
4637            // Fire post_save with the DB-populated row.
4638            crate::signals::emit_post_save::<T>(&row, true).await;
4639            Ok(row)
4640        } else {
4641            // UPDATE path: UPDATE ... WHERE <pk> = <value> RETURNING *.
4642            use sea_query::{Alias, Expr, Query};
4643
4644            // gaps2 #92 — snapshot the pre-UPDATE row for `pre_update` /
4645            // `post_update` subscribers, but ONLY when one exists. The
4646            // extra SELECT-by-PK is gated on `has_subscribers` so the
4647            // common UPDATE path (no `*_update` listener) pays nothing.
4648            // Best-effort TOCTOU: the snapshot reads the row before the
4649            // UPDATE; a concurrent writer between the two is accepted.
4650            let pre_table = T::TABLE;
4651            let want_pre = crate::signals::has_subscribers(&format!("pre_update:{pre_table}"));
4652            let want_post = crate::signals::has_subscribers(&format!("post_update:{pre_table}"));
4653            let previous: Option<T> = if want_pre || want_post {
4654                let mut sel = Query::select();
4655                sel.from(crate::db::router::schema_qualified_table(T::TABLE));
4656                for field in T::FIELDS {
4657                    sel.column(Alias::new(field.name));
4658                }
4659                let pk_sea_sel = crate::orm::write::json_to_sea_value(
4660                    pk_field.ty,
4661                    &pk_val,
4662                    false,
4663                    pk_field.name,
4664                    None,
4665                )
4666                .map_err(SaveError::Write)?;
4667                sel.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea_sel));
4668                sel.limit(1);
4669                match pool {
4670                    DbPool::Sqlite(ref pool) => {
4671                        let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
4672                        sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4673                            .fetch_optional(pool)
4674                            .await
4675                            .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4676                    }
4677                    DbPool::Postgres(ref pool) => {
4678                        let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
4679                        sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4680                            .fetch_optional(pool)
4681                            .await
4682                            .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4683                    }
4684                }
4685            } else {
4686                None
4687            };
4688            // Fire pre_update before the UPDATE when both a snapshot exists
4689            // and a subscriber wants it.
4690            if want_pre {
4691                if let Some(prev) = &previous {
4692                    crate::signals::emit_pre_update::<T>(prev, &instance).await;
4693                }
4694            }
4695
4696            let mut stmt = Query::update();
4697            stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
4698            for field in T::FIELDS {
4699                if field.primary_key {
4700                    continue;
4701                }
4702                let val = map
4703                    .get(field.name)
4704                    .cloned()
4705                    .unwrap_or(serde_json::Value::Null);
4706                let sea_val = crate::orm::write::json_to_sea_value(
4707                    field.ty,
4708                    &val,
4709                    field.nullable,
4710                    field.name,
4711                    fk_pk_hint(field),
4712                )
4713                .map_err(SaveError::Write)?;
4714                stmt.value(Alias::new(field.name), sea_val);
4715            }
4716            // WHERE pk = <value>
4717            let pk_sea = crate::orm::write::json_to_sea_value(
4718                pk_field.ty,
4719                &pk_val,
4720                false,
4721                pk_field.name,
4722                None,
4723            )
4724            .map_err(SaveError::Write)?;
4725            stmt.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4726            // RETURNING * so we can return the updated row.
4727            stmt.returning_all();
4728
4729            let row = match pool {
4730                DbPool::Sqlite(pool) => {
4731                    let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4732                    sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4733                        .fetch_one(&pool)
4734                        .await
4735                        .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4736                }
4737                DbPool::Postgres(pool) => {
4738                    let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4739                    sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4740                        .fetch_one(&pool)
4741                        .await
4742                        .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4743                }
4744            };
4745            // Fire post_save with created=false.
4746            crate::signals::emit_post_save::<T>(&row, false).await;
4747            // gaps2 #92 — post_update carries the pre-UPDATE snapshot (old)
4748            // and the freshly-written row (new). Only when a subscriber
4749            // exists and the snapshot was captured.
4750            if want_post {
4751                if let Some(prev) = &previous {
4752                    crate::signals::emit_post_update::<T>(prev, &row).await;
4753                }
4754            }
4755            Ok(row)
4756        }
4757    }
4758
4759    /// Delete one instance by primary key, firing `pre_delete` +
4760    /// `post_delete` signals.
4761    ///
4762    /// Issues `DELETE FROM <table> WHERE <pk> = <value>`. Returns the
4763    /// number of rows affected (0 if the row was already gone, 1 otherwise).
4764    ///
4765    /// ## Signal contract
4766    ///
4767    /// - `pre_delete:<table>` fires before the DELETE with
4768    ///   `{ "instance": ..., "actor": ... }`.
4769    /// - `post_delete:<table>` fires after the DELETE with the same
4770    ///   payload shape.
4771    ///
4772    /// The `"actor"` value is set by the nearest enclosing
4773    /// [`crate::signals::with_actor`] scope; `Value::Null` when no
4774    /// scope is active. The instance value passed to both signals is
4775    /// the value supplied by the caller — not a DB read-back. If you
4776    /// need the freshest DB state before deletion, fetch it first with
4777    /// `.get(...)` then pass to this method.
4778    ///
4779    /// ## Bulk paths fire bulk signals
4780    ///
4781    /// `QuerySet::delete()` (the filter-chain DELETE) fires
4782    /// `bulk_post_delete:<table>` with the list of affected PKs, not
4783    /// per-row `pre_delete` / `post_delete`. Use `delete_instance` for
4784    /// per-row signal semantics.
4785    pub async fn delete_instance(&self, instance: &T) -> Result<u64, crate::orm::write::SaveError>
4786    where
4787        T: serde::Serialize,
4788    {
4789        use crate::orm::write::SaveError;
4790        let pk_field = T::FIELDS
4791            .iter()
4792            .find(|f| f.primary_key)
4793            .ok_or(SaveError::NoPrimaryKey)?;
4794        let map = serialize_to_map(instance).map_err(SaveError::Write)?;
4795        let pk_val = map
4796            .get(pk_field.name)
4797            .cloned()
4798            .unwrap_or(serde_json::Value::Null);
4799
4800        // Fire pre_delete before the write.
4801        crate::signals::emit_pre_delete::<T>(instance).await;
4802
4803        let pk_sea =
4804            crate::orm::write::json_to_sea_value(pk_field.ty, &pk_val, false, pk_field.name, None)
4805                .map_err(SaveError::Write)?;
4806
4807        use sea_query::{Alias, Expr, Query};
4808        // Feature #72 — soft-delete redirect. For models tagged
4809        // `#[umbral(soft_delete)]`, set deleted_at instead of issuing
4810        // DELETE. Pre/post_delete signals still fire because the
4811        // logical contract ("this row is gone from the visible
4812        // table") is preserved — only the physical SQL changed.
4813        // Hard-delete is not exposed through delete_instance (it's
4814        // a typed per-row helper); call `QuerySet::filter(pk =
4815        // instance.id).hard_delete().delete()` when you need it.
4816        let stmt_sql = if T::SOFT_DELETE {
4817            let now = chrono::Utc::now();
4818            let mut up = Query::update();
4819            up.table(crate::db::router::schema_qualified_table(T::TABLE));
4820            up.value(
4821                Alias::new("deleted_at"),
4822                sea_query::Value::ChronoDateTimeUtc(Some(Box::new(now))),
4823            );
4824            up.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4825            // Idempotency guard — don't bump an already-set timestamp.
4826            up.and_where(Expr::col(Alias::new("deleted_at")).is_null());
4827            SoftOrHardStatement::Update(up)
4828        } else {
4829            let mut stmt = Query::delete();
4830            stmt.from_table(crate::db::router::schema_qualified_table(T::TABLE));
4831            stmt.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4832            SoftOrHardStatement::Delete(stmt)
4833        };
4834
4835        let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4836        let affected = match (&pool, stmt_sql) {
4837            (DbPool::Sqlite(pool), SoftOrHardStatement::Delete(stmt)) => {
4838                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4839                sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4840                    .execute(pool)
4841                    .await
4842                    .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4843                    .rows_affected()
4844            }
4845            (DbPool::Postgres(pool), SoftOrHardStatement::Delete(stmt)) => {
4846                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4847                sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4848                    .execute(pool)
4849                    .await
4850                    .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4851                    .rows_affected()
4852            }
4853            (DbPool::Sqlite(pool), SoftOrHardStatement::Update(stmt)) => {
4854                let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4855                sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4856                    .execute(pool)
4857                    .await
4858                    .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4859                    .rows_affected()
4860            }
4861            (DbPool::Postgres(pool), SoftOrHardStatement::Update(stmt)) => {
4862                let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4863                sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4864                    .execute(pool)
4865                    .await
4866                    .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4867                    .rows_affected()
4868            }
4869        };
4870
4871        // Fire post_delete after the write.
4872        crate::signals::emit_post_delete::<T>(instance).await;
4873
4874        Ok(affected)
4875    }
4876}
4877
4878/// Internal enum used by `Manager::delete_instance` to dispatch on
4879/// soft-delete vs hard-delete without duplicating the four backend ×
4880/// statement match arms inline. One variant per SQL shape.
4881enum SoftOrHardStatement {
4882    Delete(sea_query::DeleteStatement),
4883    Update(sea_query::UpdateStatement),
4884}
4885
4886// Hydration helpers (hydrate_select_related, hydrate_select_related_nested,
4887// hydrate_prefetch_related, hydrate_reverse_fk_for_field, fetch_related_as_json,
4888// fetch_reverse_fk_children) moved to `super::hydration`.
4889
4890// Insert builders (serialize_to_map, build_insert_one_for, build_insert_many_for)
4891// and pk_field moved to `super::write_helpers`.
4892
4893// `decode_agg_sqlite` / `decode_agg_pg` moved to
4894// `backend_sqlite::decode_agg` / `backend_pg::decode_agg`.
4895
4896// =========================================================================
4897// #113: M2M-via-JOIN dedup decoders
4898//
4899// When .join_related() includes one or more M2M fields, the result
4900// set has one row per (parent, child) combo, so a parent with N
4901// matching children appears N times. The caller wants ONE T per
4902// parent with the M2M slot populated.
4903//
4904// The algorithm:
4905//   - First time we see a parent PK: decode T via FromRow, hydrate
4906//     any FK joins from this row.
4907//   - Every subsequent row for the same parent: extract the M2M
4908//     child JsonValue (or skip on LEFT JOIN miss).
4909//   - Dedup children by (parent_pk, field, child_pk) so that
4910//     joining TWO M2Ms doesn't multiply the child sets (the JOIN
4911//     produces parent × m2m1 × m2m2 rows).
4912//   - Hand each parent its M2M buckets via set_m2m_resolved_json.
4913// =========================================================================
4914
4915fn dedup_decode_sqlite<T: Model + HydrateRelated>(
4916    raw_rows: &[sqlx::sqlite::SqliteRow],
4917    fk_join_fields: &[String],
4918    m2m_join_fields: &[String],
4919) -> Result<Vec<T>, sqlx::Error>
4920where
4921    T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>,
4922{
4923    // PK-agnostic dedup: read the parent PK back through the shape-aware
4924    // decoder (using the parent model's PK SqlType) and key by `pk_key`,
4925    // so i64 / String / Uuid parents all dedup correctly.
4926    let parent_pk_col = crate::migrate::ModelMeta::for_::<T>()
4927        .fields
4928        .into_iter()
4929        .find(|c| c.primary_key)
4930        .ok_or_else(|| {
4931            sqlx::Error::Protocol(format!(
4932                "umbral::orm::join_related: model `{}` has no primary key, M2M JOIN \
4933                 dedup requires one",
4934                T::NAME
4935            ))
4936        })?;
4937    let registered = crate::migrate::registered_models();
4938    let mut typed: Vec<T> = Vec::new();
4939    let mut idx_by_pk: HashMap<String, usize> = HashMap::new();
4940    // (parent_pk_key, field) → Vec<JsonValue> + a Set of seen child PK keys.
4941    let mut buckets: HashMap<(String, String), Vec<JsonValue>> = HashMap::new();
4942    let mut seen_children: HashMap<(String, String), std::collections::HashSet<String>> =
4943        HashMap::new();
4944    for row in raw_rows {
4945        let Ok(parent_json) = crate::orm::dynamic::decode_to_json(row, &parent_pk_col) else {
4946            continue;
4947        };
4948        let parent_key = crate::orm::pk_key(&parent_json);
4949        if let std::collections::hash_map::Entry::Vacant(e) = idx_by_pk.entry(parent_key.clone()) {
4950            let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
4951            backend_sqlite::hydrate_joined_rels::<T>(&mut t, row, fk_join_fields)?;
4952            e.insert(typed.len());
4953            typed.push(t);
4954        }
4955        for m2m_field in m2m_join_fields {
4956            // The M2M slot is keyed by the FIRST segment (the M2M field
4957            // name); the full `m2m_field` path may carry an onward FK
4958            // chain (`"tags__category"`) the child decoder nests.
4959            let m2m_seg = m2m_field.split("__").next().unwrap_or(m2m_field.as_str());
4960            let Some(rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg) else {
4961                continue;
4962            };
4963            let Some(child_meta) = registered.iter().find(|m| m.table == rel.target_table) else {
4964                continue;
4965            };
4966            let Some(child_json) =
4967                backend_sqlite::extract_m2m_child_json::<T>(row, m2m_field, child_meta)?
4968            else {
4969                continue;
4970            };
4971            // Dedup by child PK (PK-agnostic) so multi-M2M cartesian
4972            // doesn't duplicate this field's children.
4973            let child_key = child_json
4974                .as_object()
4975                .and_then(|m| {
4976                    let pk_col = child_meta.fields.iter().find(|c| c.primary_key)?;
4977                    m.get(&pk_col.name).map(crate::orm::pk_key)
4978                })
4979                .unwrap_or_default();
4980            let key = (parent_key.clone(), m2m_seg.to_string());
4981            let seen = seen_children.entry(key.clone()).or_default();
4982            if seen.insert(child_key) {
4983                buckets.entry(key).or_default().push(child_json);
4984            }
4985        }
4986    }
4987    for ((parent_key, field), children) in buckets {
4988        if let Some(&idx) = idx_by_pk.get(&parent_key) {
4989            typed[idx].set_m2m_resolved_json(&field, children);
4990        }
4991    }
4992    // LEFT JOIN miss handling: walk every (parent, field) pair
4993    // we expected to populate and zero-init any slot that never
4994    // got a hit. Without this a parent with no matching M2M
4995    // children would leave its slot None — distinguishable from
4996    // "loaded, empty" only by callers checking the absence.
4997    for (parent_key, &idx) in idx_by_pk.iter() {
4998        for field in m2m_join_fields {
4999            let seg = field.split("__").next().unwrap_or(field.as_str());
5000            if !seen_children.contains_key(&(parent_key.clone(), seg.to_string())) {
5001                typed[idx].set_m2m_resolved_json(seg, Vec::new());
5002            }
5003        }
5004    }
5005    Ok(typed)
5006}
5007
5008fn dedup_decode_pg<T: Model + HydrateRelated>(
5009    raw_rows: &[sqlx::postgres::PgRow],
5010    fk_join_fields: &[String],
5011    m2m_join_fields: &[String],
5012) -> Result<Vec<T>, sqlx::Error>
5013where
5014    T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
5015{
5016    // PK-agnostic dedup — see the SQLite variant.
5017    let parent_pk_col = crate::migrate::ModelMeta::for_::<T>()
5018        .fields
5019        .into_iter()
5020        .find(|c| c.primary_key)
5021        .ok_or_else(|| {
5022            sqlx::Error::Protocol(format!(
5023                "umbral::orm::join_related: model `{}` has no primary key, M2M JOIN \
5024                 dedup requires one",
5025                T::NAME
5026            ))
5027        })?;
5028    let registered = crate::migrate::registered_models();
5029    let mut typed: Vec<T> = Vec::new();
5030    let mut idx_by_pk: HashMap<String, usize> = HashMap::new();
5031    let mut buckets: HashMap<(String, String), Vec<JsonValue>> = HashMap::new();
5032    let mut seen_children: HashMap<(String, String), std::collections::HashSet<String>> =
5033        HashMap::new();
5034    for row in raw_rows {
5035        let Ok(parent_json) = crate::orm::dynamic::decode_pg_to_json(row, &parent_pk_col) else {
5036            continue;
5037        };
5038        let parent_key = crate::orm::pk_key(&parent_json);
5039        if let std::collections::hash_map::Entry::Vacant(e) = idx_by_pk.entry(parent_key.clone()) {
5040            let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
5041            backend_pg::hydrate_joined_rels::<T>(&mut t, row, fk_join_fields)?;
5042            e.insert(typed.len());
5043            typed.push(t);
5044        }
5045        for m2m_field in m2m_join_fields {
5046            let m2m_seg = m2m_field.split("__").next().unwrap_or(m2m_field.as_str());
5047            let Some(rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg) else {
5048                continue;
5049            };
5050            let Some(child_meta) = registered.iter().find(|m| m.table == rel.target_table) else {
5051                continue;
5052            };
5053            let Some(child_json) =
5054                backend_pg::extract_m2m_child_json::<T>(row, m2m_field, child_meta)?
5055            else {
5056                continue;
5057            };
5058            let child_key = child_json
5059                .as_object()
5060                .and_then(|m| {
5061                    let pk_col = child_meta.fields.iter().find(|c| c.primary_key)?;
5062                    m.get(&pk_col.name).map(crate::orm::pk_key)
5063                })
5064                .unwrap_or_default();
5065            let key = (parent_key.clone(), m2m_seg.to_string());
5066            let seen = seen_children.entry(key.clone()).or_default();
5067            if seen.insert(child_key) {
5068                buckets.entry(key).or_default().push(child_json);
5069            }
5070        }
5071    }
5072    for ((parent_key, field), children) in buckets {
5073        if let Some(&idx) = idx_by_pk.get(&parent_key) {
5074            typed[idx].set_m2m_resolved_json(&field, children);
5075        }
5076    }
5077    // Same LEFT JOIN miss zero-init as the SQLite path.
5078    for (parent_key, &idx) in idx_by_pk.iter() {
5079        for field in m2m_join_fields {
5080            let seg = field.split("__").next().unwrap_or(field.as_str());
5081            if !seen_children.contains_key(&(parent_key.clone(), seg.to_string())) {
5082                typed[idx].set_m2m_resolved_json(seg, Vec::new());
5083            }
5084        }
5085    }
5086    Ok(typed)
5087}