umbral_core/orm/queryset/mod.rs
1//! `QuerySet<T>` and `Manager<T>`: chainable lazy SQL builder + entry
2//! point.
3//!
4//! `T::objects()` returns a `Manager<T>`; chaining `.filter` / `.order_by`
5//! / `.limit` / etc. on it (or on a `QuerySet<T>` directly) yields a new
6//! `QuerySet<T>`. Terminals (`.fetch`, `.first`, `.count`, `.exists`)
7//! await an async DB roundtrip via the ambient or explicit pool.
8//!
9//! At M1 the surface is intentionally narrow per
10//! `docs/specs/03-orm-querysets.md`: filter / order_by / limit / offset
11//! for chaining, and fetch / first / count / exists for terminals. No
12//! exclude / distinct / values / annotate / aggregate / update / delete
13//! yet — those land as later milestones surface real need.
14//!
15//! M2 lifted the terminals and the `Manager` delegation onto a generic
16//! `T: Model` bound. The table name comes from `T::TABLE`, the SELECT
17//! column list from `T::FIELDS`, and row materialisation from the
18//! `FromRow` bound the terminals carry. M3 generates the `Model` impl
19//! from `#[derive(Model)]`.
20//!
21//! ## Phase 2.5 — backend-agnostic terminals
22//!
23//! Through Phase 2 the QuerySet stored a `SqlitePool` and built every
24//! query with sea-query's `SqliteQueryBuilder`. Phase 2.5 widens that:
25//! the explicit-pool slot is `Option<DbPool>`, `.on(&SqlitePool)` keeps
26//! working unchanged, and a new `.on_pg(&PgPool)` registers a Postgres
27//! pool. The terminal methods dispatch on the resolved pool variant —
28//! SQLite path uses `SqliteQueryBuilder` + a `SqlitePool` executor;
29//! Postgres path uses `PostgresQueryBuilder` + a `PgPool` executor.
30//!
31//! The row-materialization bound on each terminal is the conjunction
32//! of both backends' `FromRow` impls. `#[derive(sqlx::FromRow)]` emits
33//! a generic-over-`R` impl, so a user struct with standard field
34//! types satisfies both bounds without any per-backend ceremony.
35
36mod backend_pg;
37mod backend_sqlite;
38mod errors;
39pub(crate) mod hydration;
40mod tx;
41mod write_helpers;
42
43pub use errors::{GetError, TryForEachError};
44use hydration::{hydrate_prefetch_related, hydrate_select_related};
45pub use tx::QuerySetTx;
46use write_helpers::{
47 build_insert_many_for, build_insert_one_for, fk_pk_hint, pk_field, serialize_to_map,
48};
49
50use std::collections::HashMap;
51use std::marker::PhantomData;
52
53use sea_query::{
54 Alias, Expr, Func, IntoIden, Order, PostgresQueryBuilder, Query, SqliteQueryBuilder,
55};
56use sea_query_binder::SqlxBinder;
57use serde_json::Value as JsonValue;
58
59use crate::db::DbPool;
60use crate::orm::{FExpr, HydrateRelated, Model, OrderExpr, Predicate};
61use umbral_casing::to_snake_case;
62
63/// Entry point for queries on a model.
64///
65/// `Manager<T>` wraps a freshly-constructed `QuerySet<T>` and exposes
66/// the same chainable surface. The user never constructs one directly;
67/// `Post::objects()` is the only door.
68pub struct Manager<T> {
69 _phantom: PhantomData<T>,
70 /// Per-Manager override for the `atomic_transactions` builder
71 /// default. `None` = inherit the global default; `Some(true)` =
72 /// wrap subsequent writes in a transaction; `Some(false)` =
73 /// explicitly opt out. Propagates into the QuerySet `queryset()`
74 /// constructs.
75 atomic: Option<bool>,
76}
77
78impl<T> Manager<T> {
79 pub(crate) fn new() -> Self {
80 Self {
81 _phantom: PhantomData,
82 atomic: None,
83 }
84 }
85
86 /// Wrap every write terminal that hangs off this Manager in a
87 /// transaction. Equivalent to calling `.atomic()` on each
88 /// QuerySet derived from it. Per-call `.non_atomic()` overrides.
89 pub fn atomic(mut self) -> Self {
90 self.atomic = Some(true);
91 self
92 }
93
94 /// Opt this Manager (and every QuerySet derived from it) out of
95 /// the global `App::builder().atomic_transactions(true)` default.
96 pub fn non_atomic(mut self) -> Self {
97 self.atomic = Some(false);
98 self
99 }
100}
101
102impl<T> Default for Manager<T> {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108/// SQL join flavor recorded per `join_related` hop. `None` in a
109/// `JoinReq` means "infer from FK nullability" (gap 4c); an explicit
110/// `left_/inner_/right_join_related` records `Some(..)`.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum JoinKind {
113 Inner,
114 Left,
115 Right,
116}
117
118impl JoinKind {
119 /// Lower to sea-query's join type.
120 pub(crate) fn sea(self) -> sea_query::JoinType {
121 match self {
122 JoinKind::Inner => sea_query::JoinType::InnerJoin,
123 JoinKind::Left => sea_query::JoinType::LeftJoin,
124 JoinKind::Right => sea_query::JoinType::RightJoin,
125 }
126 }
127}
128
129/// One requested eager-join: a dotted relation path (`"plugin__author"`)
130/// plus the join type to apply to the LAST hop. `kind: None` means
131/// auto-infer per-hop from FK nullability (INNER for NOT NULL, LEFT for
132/// nullable), the default. The explicit methods pin `Some(..)`.
133#[derive(Debug, Clone)]
134pub(crate) struct JoinReq {
135 pub(crate) path: String,
136 pub(crate) kind: Option<JoinKind>,
137}
138
139/// A lazy, chainable SQL query.
140///
141/// Carries a sea-query `SelectStatement` plus pool-resolution state.
142/// Nothing is sent to the database until a terminal method is awaited.
143/// Cloning is cheap (the `SelectStatement` clones in O(query size)).
144pub struct QuerySet<T> {
145 /// The base SelectStatement — FROM, columns, joins, group-by,
146 /// order-by, limit, offset. Filters are NOT applied here; they
147 /// accumulate on [`Self::predicates`] and get woven in at
148 /// terminal time, so per-backend predicate variants (Phase
149 /// 4.2.2) can pick the right SimpleExpr based on the resolved
150 /// pool.
151 pub(crate) query: sea_query::SelectStatement,
152 /// Accumulated filter predicates. Each one renders to either its
153 /// default `cond` (for Postgres) or its `cond_sqlite` override
154 /// (for SQLite, if set) at terminal time.
155 pub(crate) predicates: Vec<Predicate<T>>,
156 pub(crate) explicit_pool: Option<DbPool>,
157 /// FK field names requested for eager loading via `select_related`.
158 /// After the main query returns rows, a batch `IN (...)` query
159 /// fetches the related rows for each named field and calls
160 /// `HydrateRelated::hydrate_fk` to populate `ForeignKey.resolved`.
161 pub(crate) select_related: Vec<String>,
162 /// M2M field names requested for eager loading via
163 /// `prefetch_related`. After the main query, one batched JOIN
164 /// against the junction + child table fetches every related row
165 /// for every parent in a single round-trip; each parent's
166 /// `M2M.resolved` slot is populated via
167 /// `HydrateRelated::set_m2m_resolved_json`. Gap #19.
168 pub(crate) prefetch_related: Vec<String>,
169 /// BUG-8: `#[umbral(ordering = [...])]` lowers to a default ORDER
170 /// BY applied at terminal time when the caller didn't supply an
171 /// explicit `.order_by(...)`. The semantics: explicit calls REPLACE
172 /// the default rather than appending to it.
173 pub(crate) default_ordering: Vec<(&'static str, bool)>,
174 /// Set to `true` the first time `.order_by(...)` is called; when
175 /// `false`, `build_query_for` applies `default_ordering`.
176 pub(crate) explicit_order: bool,
177 /// Per-QuerySet override for the `atomic_transactions` builder
178 /// default. `None` = inherit the global default via
179 /// [`crate::db::atomic_default`]; `Some(true)` = wrap this
180 /// QuerySet's write terminal in a transaction; `Some(false)` =
181 /// explicitly opt out.
182 pub(crate) atomic: Option<bool>,
183 /// Feature #72 — soft-delete state. Snapshotted from
184 /// `T::SOFT_DELETE` when the QuerySet is constructed from a
185 /// Manager (the no-bounds `QuerySet::new` constructor leaves
186 /// this `false` so hand-built QuerySets stay opt-out by
187 /// default).
188 pub(crate) soft_delete_active: bool,
189 /// True when the caller opted back into soft-deleted rows via
190 /// `.with_deleted()`. Skips the auto `WHERE deleted_at IS NULL`
191 /// injection.
192 pub(crate) with_deleted: bool,
193 /// True when the caller wants ONLY soft-deleted rows via
194 /// `.only_deleted()`. Inverts the auto-filter to
195 /// `WHERE deleted_at IS NOT NULL`.
196 pub(crate) only_deleted: bool,
197 /// True when the caller asked for a real DELETE via
198 /// `.hard_delete()` — bypasses the soft-delete rewrite that
199 /// would normally turn `delete()` into an UPDATE.
200 pub(crate) hard_delete: bool,
201 /// Gap #111 — column projection set by [`Self::only`]. When
202 /// `Some`, [`Self::to_sql`] / [`Self::to_sql_pg`] swap the
203 /// SELECT list for just these columns, and the typed terminals
204 /// (`fetch` / `first` / `get`) refuse to run with a clear error
205 /// pointing the caller at [`Self::values`] (FromRow can't
206 /// hydrate `T` from a partial-column row). `None` keeps the
207 /// pre-#111 behaviour (full SELECT).
208 pub(crate) only_cols: Option<Vec<String>>,
209 /// FK field names requested for JOIN-based prefetch via
210 /// [`Self::join_related`]. Distinct from `select_related`:
211 /// `join_related` weaves `LEFT JOIN <related_table>` into the
212 /// main SELECT (with aliased columns `<field>__<col>`) so one
213 /// round-trip pulls parent + related rows together. The existing
214 /// `select_related` path keeps its "batched-IN-followup-query"
215 /// shape — both are valid; this one wins when round-trip count
216 /// matters more than the per-row column overhead.
217 pub(crate) join_related: Vec<JoinReq>,
218 /// Related-aggregate annotations added via
219 /// [`Self::annotate_related`] / [`Self::annotate_count`]. Applied
220 /// inside `build_query_for`, so EVERY terminal and introspection
221 /// path — `fetch_annotated`, `explain`, `to_sql`, `to_sql_pg` —
222 /// sees the same correlated subqueries. That's the
223 /// `annotate()` contract: an annotation is query-builder state,
224 /// not a side query.
225 pub(crate) annotations: Vec<RelatedAnnotation>,
226 _phantom: PhantomData<T>,
227}
228
229// Manual `Clone` — NOT `#[derive(Clone)]`, because the derive would force a
230// spurious `T: Clone` bound (every field is `T`-independent or carries its
231// own `T`-free `Clone`: `Predicate<T>` has a manual `impl<T> Clone`, and
232// `PhantomData<T>` clones for any `T`). The doc comment above already
233// promised cheap cloning; this is what makes `Paginator` (and any
234// requery-without-consume caller) slice the same query per page.
235impl<T> Clone for QuerySet<T> {
236 fn clone(&self) -> Self {
237 Self {
238 query: self.query.clone(),
239 predicates: self.predicates.clone(),
240 explicit_pool: self.explicit_pool.clone(),
241 select_related: self.select_related.clone(),
242 prefetch_related: self.prefetch_related.clone(),
243 default_ordering: self.default_ordering.clone(),
244 explicit_order: self.explicit_order,
245 atomic: self.atomic,
246 soft_delete_active: self.soft_delete_active,
247 with_deleted: self.with_deleted,
248 only_deleted: self.only_deleted,
249 hard_delete: self.hard_delete,
250 only_cols: self.only_cols.clone(),
251 join_related: self.join_related.clone(),
252 annotations: self.annotations.clone(),
253 _phantom: PhantomData,
254 }
255 }
256}
257
258// Manual `Debug` for the same `T: Debug`-free reason; `Predicate<T>`'s own
259// `Debug` is likewise `T`-free.
260impl<T> std::fmt::Debug for QuerySet<T> {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 // `Predicate<T>` is intentionally not `Debug` (its `SimpleExpr`
263 // carries no `T`), so report the predicate count rather than the
264 // opaque expressions.
265 f.debug_struct("QuerySet")
266 .field("query", &self.query)
267 .field("predicates", &format_args!("[{} predicate(s)]", self.predicates.len()))
268 .field("explicit_pool", &self.explicit_pool)
269 .field("select_related", &self.select_related)
270 .field("prefetch_related", &self.prefetch_related)
271 .field("default_ordering", &self.default_ordering)
272 .field("explicit_order", &self.explicit_order)
273 .field("atomic", &self.atomic)
274 .field("soft_delete_active", &self.soft_delete_active)
275 .field("with_deleted", &self.with_deleted)
276 .field("only_deleted", &self.only_deleted)
277 .field("hard_delete", &self.hard_delete)
278 .field("only_cols", &self.only_cols)
279 .field("join_related", &self.join_related)
280 .field("annotations", &self.annotations)
281 .finish()
282 }
283}
284
285/// One related-aggregate annotation on a [`QuerySet`] —
286/// `(alias, relation, aggregate)` resolved against the model's
287/// `REVERSE_FK_RELATIONS` at builder time. A name that fails to
288/// resolve is stored poisoned (`resolved: Err`) so the infallible
289/// builder stays chainable while every fallible consumer
290/// (`fetch_annotated`, `explain`) reports it loudly.
291#[derive(Debug, Clone)]
292pub(crate) struct RelatedAnnotation {
293 pub(crate) alias: String,
294 pub(crate) agg: crate::orm::Aggregate,
295 /// `Ok((child_table, fk_column, parent_table, parent_pk))` or the
296 /// loud error message for an unknown relation.
297 pub(crate) resolved: Result<(String, String, String, String), String>,
298 /// Child model is `#[umbral(soft_delete)]` — fold
299 /// `AND <child>.deleted_at IS NULL` into the correlated subquery so
300 /// a trashed child stops inflating the parent's count.
301 pub(crate) child_soft_delete: bool,
302 /// Optional child-side predicate (a filtered count),
303 /// pre-rendered to a backend-default `SimpleExpr`. ANDed into the
304 /// subquery WHERE. From `annotate_count_where`.
305 pub(crate) child_filter: Option<sea_query::SimpleExpr>,
306 /// `Some(junction_table)` when this annotation counts M2M junction
307 /// rows instead of child rows (`annotate_count` over an `M2M<T>`).
308 pub(crate) m2m_junction: Option<String>,
309}
310
311/// Outcome of auto-discovering a reverse-FK relation (gaps2 #45) when
312/// the parent declares no matching `ReverseSet` field. The resolver
313/// scans the registry for children whose FK targets the parent table
314/// and matches `relation` against their conventional name forms.
315enum AutoDiscovery {
316 /// Exactly one (child, fk_column) candidate matched.
317 Resolved {
318 child_table: String,
319 fk_column: String,
320 soft_delete: bool,
321 },
322 /// Two or more candidates matched — the caller must declare a
323 /// `#[umbral(reverse_fk = "...")]` field to disambiguate. Carries
324 /// the candidate `child.fk` labels for the error message.
325 Ambiguous(Vec<String>),
326 /// No candidate matched. Carries the list of auto-discoverable
327 /// child names so the error can teach the available relations.
328 NotFound(Vec<String>),
329}
330
331// `snake_case` replaced by `umbral_casing::to_snake_case` (imported above)
332// in the gaps2 #77 consolidation refactor.
333
334/// Scan the model registry for children whose FK targets `T::TABLE`,
335/// and match `relation` against each candidate's conventional name
336/// forms: the child's table name, the child's struct name in
337/// snake_case and bare-lowercase, and any of those with a `_set`
338/// suffix (the `<model>_set` form). Declared `REVERSE_FK_RELATIONS` /
339/// `M2M_RELATIONS` are resolved by the caller BEFORE this runs, so
340/// they always take precedence.
341fn discover_reverse_relation<T: crate::orm::Model>(relation: &str) -> AutoDiscovery {
342 if !crate::migrate::is_initialised() {
343 return AutoDiscovery::NotFound(Vec::new());
344 }
345 let parent_table = T::TABLE;
346 // Each candidate: (child_table, fk_column, child_soft_delete).
347 let mut candidates: Vec<(String, String, bool)> = Vec::new();
348 let mut discoverable: Vec<String> = Vec::new();
349 for meta in crate::migrate::registered_models() {
350 for col in &meta.fields {
351 if col.fk_target.as_deref() != Some(parent_table) {
352 continue;
353 }
354 // Conventional name forms this (child, fk_column) answers to.
355 let snake = to_snake_case(&meta.name);
356 let lower = meta.name.to_ascii_lowercase();
357 let mut forms = vec![
358 meta.table.clone(),
359 snake.clone(),
360 lower.clone(),
361 format!("{}_set", meta.table),
362 format!("{snake}_set"),
363 format!("{lower}_set"),
364 ];
365 forms.sort();
366 forms.dedup();
367 // Surface a friendly name for "available children" errors.
368 discoverable.push(format!("{}_set", meta.table));
369 if forms.iter().any(|f| f == relation) {
370 candidates.push((meta.table.clone(), col.name.clone(), meta.soft_delete));
371 }
372 }
373 }
374 match candidates.len() {
375 1 => {
376 let (child_table, fk_column, soft_delete) = candidates.pop().unwrap();
377 AutoDiscovery::Resolved {
378 child_table,
379 fk_column,
380 soft_delete,
381 }
382 }
383 0 => {
384 discoverable.sort();
385 discoverable.dedup();
386 AutoDiscovery::NotFound(discoverable)
387 }
388 _ => {
389 let labels = candidates
390 .into_iter()
391 .map(|(child, fk, _)| format!("{child}.{fk}"))
392 .collect();
393 AutoDiscovery::Ambiguous(labels)
394 }
395 }
396}
397
398impl<T> QuerySet<T> {
399 pub(crate) fn new(query: sea_query::SelectStatement) -> Self {
400 Self {
401 query,
402 default_ordering: Vec::new(),
403 explicit_order: false,
404 predicates: Vec::new(),
405 explicit_pool: None,
406 select_related: Vec::new(),
407 prefetch_related: Vec::new(),
408 atomic: None,
409 soft_delete_active: false,
410 with_deleted: false,
411 only_deleted: false,
412 hard_delete: false,
413 only_cols: None,
414 join_related: Vec::new(),
415 annotations: Vec::new(),
416 _phantom: PhantomData,
417 }
418 }
419
420 /// Feature #72 — include soft-deleted rows in this query. Skips
421 /// the auto `WHERE deleted_at IS NULL` injection. No-op on
422 /// models that aren't tagged `#[umbral(soft_delete)]`.
423 pub fn with_deleted(mut self) -> Self {
424 self.with_deleted = true;
425 self
426 }
427
428 /// Feature #72 — only soft-deleted rows. Useful for admin
429 /// trash views and undelete workflows. No-op on models that
430 /// aren't tagged `#[umbral(soft_delete)]`.
431 pub fn only_deleted(mut self) -> Self {
432 self.only_deleted = true;
433 self
434 }
435
436 /// Feature #72 — force a real DELETE for the next `.delete()`
437 /// terminal call. Soft-delete models normally rewrite delete()
438 /// as `UPDATE ... SET deleted_at = NOW()`; `.hard_delete()`
439 /// bypasses that for GDPR purges, test cleanup, or any other
440 /// case where the row truly should be gone. No-op on models
441 /// that aren't tagged `#[umbral(soft_delete)]` (their delete()
442 /// is already a hard DELETE).
443 pub fn hard_delete(mut self) -> Self {
444 self.hard_delete = true;
445 self
446 }
447
448 /// Gap #111 — restrict the SELECT to the named columns.
449 ///
450 /// Affects [`Self::to_sql`] / [`Self::to_sql_pg`] (the SELECT list
451 /// shrinks to just these columns) and propagates into
452 /// [`Self::values`] when that terminal is called without its own
453 /// explicit column slice.
454 ///
455 /// **The typed terminals (`fetch` / `first` / `get`) refuse to
456 /// run with `.only()` set** because a partial-column row can't
457 /// satisfy `T`'s `FromRow` impl. The error message points at
458 /// `.values(...)` (returns `Vec<serde_json::Value>`) as the
459 /// execution path. Use `.only()` for `.to_sql()` inspection and
460 /// `.values()` for actual reads.
461 ///
462 /// ```rust,ignore
463 /// // Inspect: "SELECT \"id\", \"name\" FROM \"brand\" WHERE \"id\" = ?"
464 /// let sql = Brand::objects()
465 /// .filter(brand::ID.eq(1))
466 /// .only(&["id", "name"])
467 /// .to_sql();
468 ///
469 /// // Execute (returns JSON rows):
470 /// let rows = Brand::objects()
471 /// .filter(brand::ID.eq(1))
472 /// .values(&["id", "name"])
473 /// .await?;
474 /// ```
475 ///
476 /// Unknown column names are not validated here — they surface at
477 /// terminal time the same way `.values()` reports them (the
478 /// rendered SQL contains the bad identifier and SQLite/Postgres
479 /// raises). This keeps the chainable surface return-type-stable.
480 pub fn only(mut self, cols: &[&str]) -> Self {
481 self.only_cols = Some(cols.iter().map(|s| s.to_string()).collect());
482 self
483 }
484
485 /// Wrap this QuerySet's write terminal in a transaction. Reads are
486 /// unaffected (read terminals are single statements and the DB
487 /// gives them a consistent snapshot). Mutually exclusive with
488 /// [`Self::on_tx`] — if both are set, `on_tx` wins (you're
489 /// already inside a transaction, so wrapping again would deadlock
490 /// or fail on backends without nested transactions).
491 pub fn atomic(mut self) -> Self {
492 self.atomic = Some(true);
493 self
494 }
495
496 /// Opt this QuerySet's write terminal out of the global
497 /// `App::builder().atomic_transactions(true)` default. Useful in
498 /// hot-path batches where the caller already owns the outer
499 /// transaction.
500 pub fn non_atomic(mut self) -> Self {
501 self.atomic = Some(false);
502 self
503 }
504
505 /// Resolve whether this QuerySet should auto-wrap its write
506 /// terminal in a transaction. Per-call override > builder global.
507 pub(crate) fn should_atomic_wrap(&self) -> bool {
508 self.atomic.unwrap_or_else(crate::db::atomic_default)
509 }
510
511 /// Clone the base query and weave in the accumulated predicates,
512 /// picking the dialect-appropriate `SimpleExpr` for each one. The
513 /// `backend_name` is `"sqlite"` or `"postgres"`; any other value
514 /// behaves like Postgres (the default).
515 pub(crate) fn build_query_for(&self, backend_name: &str) -> sea_query::SelectStatement {
516 let mut q = self.query.clone();
517 for p in &self.predicates {
518 q.and_where(p.cond_for(backend_name));
519 }
520 // Feature #72 — soft-delete auto-filter. When the model
521 // opted in via `#[umbral(soft_delete)]` AND the caller
522 // didn't switch the visibility via `.with_deleted()` /
523 // `.only_deleted()`, inject `WHERE deleted_at IS NULL`.
524 // `.with_deleted()` shows everything; `.only_deleted()`
525 // shows just the soft-deleted rows.
526 if self.soft_delete_active {
527 use sea_query::Expr;
528 if self.only_deleted {
529 q.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
530 } else if !self.with_deleted {
531 q.and_where(Expr::col(Alias::new("deleted_at")).is_null());
532 }
533 }
534 // Related-aggregate annotations: one correlated scalar
535 // subquery per entry, aliased onto the SELECT list. Living
536 // HERE is what makes `.annotate_*` compose with everything —
537 // explain(), to_sql(), fetch_annotated() all see the same
538 // query. Poisoned entries (unknown relation) are skipped in
539 // this infallible path; the fallible consumers call
540 // `check_annotations()` first and fail loudly instead.
541 for ann in &self.annotations {
542 // M2M-junction annotations count rows of the junction table
543 // (`<parent>_<field>`, columns parent_id / child_id),
544 // correlated on parent_id = <parent>.<pk>.
545 if let Some(junction) = &ann.m2m_junction {
546 if let Ok((_child_table, _fk_col, parent_table, parent_pk)) = &ann.resolved {
547 let mut sub = sea_query::Query::select();
548 sub.expr(ann.agg.to_simple_expr())
549 .from(crate::db::router::schema_qualified_table(junction.as_str()))
550 .and_where(
551 sea_query::Expr::col((
552 Alias::new(junction.as_str()),
553 Alias::new("parent_id"),
554 ))
555 .equals((
556 Alias::new(parent_table.as_str()),
557 Alias::new(parent_pk.as_str()),
558 )),
559 );
560 q.expr_as(
561 sea_query::SimpleExpr::SubQuery(
562 None,
563 Box::new(sea_query::SubQueryStatement::SelectStatement(
564 sub.to_owned(),
565 )),
566 ),
567 Alias::new(ann.alias.as_str()),
568 );
569 }
570 continue;
571 }
572 if let Ok((child_table, fk_col, parent_table, parent_pk)) = &ann.resolved {
573 let mut sub = sea_query::Query::select();
574 sub.expr(ann.agg.to_simple_expr())
575 .from(crate::db::router::schema_qualified_table(
576 child_table.as_str(),
577 ))
578 .and_where(
579 sea_query::Expr::col((
580 Alias::new(child_table.as_str()),
581 Alias::new(fk_col.as_str()),
582 ))
583 .equals((
584 Alias::new(parent_table.as_str()),
585 Alias::new(parent_pk.as_str()),
586 )),
587 );
588 // Auto-exclude soft-deleted children from the count when
589 // the child model is `#[umbral(soft_delete)]`.
590 if ann.child_soft_delete {
591 sub.and_where(
592 sea_query::Expr::col((
593 Alias::new(child_table.as_str()),
594 Alias::new("deleted_at"),
595 ))
596 .is_null(),
597 );
598 }
599 // Child-side predicate (annotate_count_where).
600 if let Some(filter) = &ann.child_filter {
601 sub.and_where(filter.clone());
602 }
603 q.expr_as(
604 sea_query::SimpleExpr::SubQuery(
605 None,
606 Box::new(sea_query::SubQueryStatement::SelectStatement(
607 sub.to_owned(),
608 )),
609 ),
610 Alias::new(ann.alias.as_str()),
611 );
612 }
613 }
614 // BUG-8: default ORDER BY applies only when the caller didn't
615 // supply an explicit `.order_by(...)`: the model-default
616 // ordering semantics.
617 if !self.explicit_order {
618 for (col, desc) in &self.default_ordering {
619 let order = if *desc { Order::Desc } else { Order::Asc };
620 q.order_by(Alias::new(*col), order);
621 }
622 }
623 q
624 }
625}
626
627/// Chainable methods on every `QuerySet<T>`.
628///
629/// These are model-agnostic: they only touch the sea-query
630/// `SelectStatement` and the pool-resolution slot, neither of which
631/// depends on `T`. Terminals (which need row mapping) live in the
632/// `impl<T: Model> QuerySet<T>` block below.
633impl<T> QuerySet<T> {
634 /// Add a WHERE condition. Multiple `.filter` calls AND together
635 /// (sea-query's `and_where` semantics — applied at terminal time
636 /// once the resolved pool's backend is known).
637 pub fn filter(mut self, p: Predicate<T>) -> Self {
638 self.predicates.push(p);
639 self
640 }
641
642 /// Add a negated WHERE condition. The negated predicate ANDs into
643 /// the chain alongside any `filter()` calls, so
644 /// `.filter(A).exclude(B).filter(C)` renders as `WHERE A AND NOT B
645 /// AND C`. Sugar for `filter(Q::not(p))`.
646 ///
647 /// The negated-filter terminal.
648 pub fn exclude(self, p: Predicate<T>) -> Self {
649 self.filter(crate::orm::Q::not(p))
650 }
651
652 /// Add an ORDER BY clause. Multiple `.order_by` calls append.
653 /// The first explicit call also opts out of the model's
654 /// `#[umbral(ordering = [...])]` default (BUG-8): explicit ordering
655 /// replaces the default rather than stacking on top of it.
656 pub fn order_by(mut self, o: OrderExpr<T>) -> Self {
657 let order = if o.descending {
658 Order::Desc
659 } else {
660 Order::Asc
661 };
662 self.query.order_by(Alias::new(o.column), order);
663 self.explicit_order = true;
664 self
665 }
666
667 /// Set LIMIT.
668 pub fn limit(mut self, n: u64) -> Self {
669 self.query.limit(n);
670 self
671 }
672
673 /// Set OFFSET.
674 pub fn offset(mut self, n: u64) -> Self {
675 self.query.offset(n);
676 self
677 }
678
679 /// Override the pool resolved at terminal time with a SQLite pool.
680 ///
681 /// Wins over the ambient default. Used by tests that drive the ORM
682 /// without going through `App::build()`. For a Postgres override
683 /// use [`Self::on_pg`].
684 pub fn on(mut self, pool: &sqlx::SqlitePool) -> Self {
685 self.explicit_pool = Some(DbPool::Sqlite(pool.clone()));
686 self
687 }
688
689 /// Override the pool resolved at terminal time with a Postgres pool.
690 ///
691 /// The Postgres counterpart of [`Self::on`]. Tests that want to
692 /// exercise the Postgres branch (or that drive against a real
693 /// Postgres instance) reach for this directly.
694 pub fn on_pg(mut self, pool: &sqlx::PgPool) -> Self {
695 self.explicit_pool = Some(DbPool::Postgres(pool.clone()));
696 self
697 }
698
699 /// Attach this `QuerySet` to an open transaction.
700 ///
701 /// Returns a [`QuerySetTx`] that holds both the query and a mutable
702 /// reference to the transaction. Every terminal on `QuerySetTx`
703 /// (`fetch`, `first`, `count`, `exists`, `get`, `delete`,
704 /// `update_values`) executes inside the open transaction so all
705 /// operations in the same closure commit or roll back as a unit.
706 ///
707 /// ```rust,ignore
708 /// umbral::db::transaction(|tx| async move {
709 /// let order = Order::objects().on_tx(tx).create(new_order).await?;
710 /// Stock::objects()
711 /// .on_tx(tx)
712 /// .filter(stock::SKU.eq(sku))
713 /// .update_values(delta)
714 /// .await?;
715 /// Ok::<_, MyError>(order)
716 /// }).await?;
717 /// ```
718 pub fn on_tx(self, tx: &mut crate::db::Transaction) -> QuerySetTx<'_, T> {
719 QuerySetTx { qs: self, tx }
720 }
721
722 /// Eagerly load a single FK field by name.
723 ///
724 /// After the main SELECT returns rows, a batch `SELECT ... FROM <related_table>
725 /// WHERE id IN (...)` fetches all referenced rows in one round-trip. Each
726 /// returned row is deserialised as the target model and stored in
727 /// `ForeignKey<U>.resolved` so template rendering (`{{ post.author.username }}`)
728 /// and `serde_json::to_value(&post)["author"]["username"]` both work without
729 /// additional queries.
730 ///
731 /// Calling `select_related` multiple times accumulates the names:
732 /// `.select_related("author").select_related("editor")` works the same as
733 /// `.select_related_many(&["author", "editor"])`.
734 ///
735 /// ## Nested traversal (post-#42)
736 ///
737 /// `.select_related("author__manager")` walks the FK chain through
738 /// the `__` separator. One batched `IN (...)` query per hop —
739 /// `1 + len(hops)` round-trips regardless of parent count. No
740 /// N+1. Each hop's related row is embedded into the prior level's
741 /// JSON, and recursive `ForeignKey<T>::Deserialize` unpacks the
742 /// chain into `resolved()` slots at every depth. Bonus: a
743 /// select_related'd model now round-trips through
744 /// `serde_json::to_value(&t)` / `from_value` without losing the
745 /// resolved relation.
746 ///
747 /// ## Companion shapes
748 ///
749 /// - **`join_related(name)`** — same goal (load related rows) via
750 /// a true `LEFT JOIN` in the main SELECT. One round-trip total
751 /// vs. `select_related`'s `1 + N` batched-IN approach. Wider
752 /// per-row payload; better when round-trip count dominates.
753 /// - **`prefetch_related(name)`** — M2M batched loading (one
754 /// query per declared M2M field). For reverse-FK collections
755 /// (`prefetch_related("comment_set")`-style) see gap #44 — not
756 /// yet implemented.
757 ///
758 /// ## Loud errors
759 ///
760 /// Unknown field names (typos, M2M names accidentally passed
761 /// here, fields without `fk_target`) return a clear
762 /// `sqlx::Error::Protocol` from the terminal naming the bad hop
763 /// and the table it was looked up against. Pre-#42 these
764 /// silently no-op'd.
765 pub fn select_related(mut self, field_name: impl Into<String>) -> Self {
766 self.select_related.push(field_name.into());
767 self
768 }
769
770 /// Eagerly load multiple FK fields in one call.
771 ///
772 /// Sugar for chained `.select_related(name)` calls.
773 pub fn select_related_many(mut self, field_names: &[&str]) -> Self {
774 for name in field_names {
775 self.select_related.push(name.to_string());
776 }
777 self
778 }
779
780 /// JOIN-based eager FK loading — emits `LEFT JOIN <related> ON ...`
781 /// in the main SELECT (with aliased child columns `<field>__<col>`)
782 /// so one round-trip pulls the parent + related rows together.
783 ///
784 /// Trade-off vs. [`Self::select_related`]:
785 /// - `select_related` runs ONE extra batched query after the
786 /// main fetch (`SELECT * FROM related WHERE id IN (...)`).
787 /// Two round-trips total; rows stay narrow.
788 /// - `join_related` runs the main query AS the join — one
789 /// round-trip total — at the cost of a wider per-row payload
790 /// (every related column rides along even for duplicated
791 /// parents).
792 ///
793 /// Both populate `ForeignKey<U>.resolved` the same way so
794 /// downstream code (templates, serde) doesn't care which path
795 /// was used. Pick `join_related` when round-trip count dominates
796 /// (hot listing pages, small related tables) and `select_related`
797 /// when the related row is wide or only loaded for a subset of
798 /// the parent rows.
799 ///
800 /// Composes with `.select_related(other_fk)` and
801 /// `.prefetch_related(m2m)` — different fields can take different
802 /// paths in the same query.
803 ///
804 /// Multi-hop FK chains are supported: a `"__"`-separated path like
805 /// `"author__manager"` resolves one JOIN per hop in a single query.
806 ///
807 /// **Constraints**: FK fields must live in `model.fields` (M2M links
808 /// route through `prefetch_related`), and every related model along the
809 /// chain must be registered with the framework
810 /// (`App::builder().model::<U>()` or contributed by a plugin) so we can
811 /// resolve its column layout for the aliased SELECT.
812 pub fn join_related(mut self, field_name: impl Into<String>) -> Self {
813 self.join_related.push(JoinReq {
814 path: field_name.into(),
815 kind: None,
816 });
817 self
818 }
819
820 /// Sugar for chained [`Self::join_related`] calls.
821 pub fn join_related_many(mut self, field_names: &[&str]) -> Self {
822 for name in field_names {
823 self.join_related.push(JoinReq {
824 path: (*name).to_string(),
825 kind: None,
826 });
827 }
828 self
829 }
830
831 /// `LEFT JOIN` the related path — keeps parent rows whose relation
832 /// is absent (the relation hydrates as unresolved/None). Accepts a
833 /// nested path (`"plugin__author"`); the join type applies to the
834 /// deepest hop.
835 pub fn left_join_related(mut self, path: impl Into<String>) -> Self {
836 self.join_related.push(JoinReq {
837 path: path.into(),
838 kind: Some(JoinKind::Left),
839 });
840 self
841 }
842
843 /// `INNER JOIN` the related path — drops parent rows whose relation
844 /// is absent. The default for a NOT NULL FK.
845 pub fn inner_join_related(mut self, path: impl Into<String>) -> Self {
846 self.join_related.push(JoinReq {
847 path: path.into(),
848 kind: Some(JoinKind::Inner),
849 });
850 self
851 }
852
853 /// `RIGHT JOIN` the related path. Postgres-unconditional; SQLite
854 /// needs >= 3.39 — a runtime warning fires on older SQLite (see the
855 /// boot/runtime note in the joins docs). The precise version gate
856 /// lives at execute time (the SQLite driver's own error); the warn
857 /// is the early nudge.
858 pub fn right_join_related(mut self, path: impl Into<String>) -> Self {
859 self.join_related.push(JoinReq {
860 path: path.into(),
861 kind: Some(JoinKind::Right),
862 });
863 self
864 }
865
866 /// Eagerly load an M2M relation via a single batched join.
867 ///
868 /// After the main SELECT returns rows, one query of the shape
869 /// `SELECT j.parent_id, child.* FROM <child_table> child INNER
870 /// JOIN <junction> j ON child.<pk> = j.child_id WHERE
871 /// j.parent_id IN (...)` fetches every related child for every
872 /// parent in one round-trip. Each parent's `M2M.resolved` slot
873 /// is populated with its matching children.
874 ///
875 /// The M2M counterpart of [`Self::select_related`] for FKs —
876 /// same goal of killing N+1: a batch-loaded `prefetch_related('tags')`.
877 ///
878 /// ## Reverse-FK collections (post-#44)
879 ///
880 /// `prefetch_related` also loads `ReverseSet<C>` fields — the
881 /// "for each Post, give me every Comment that points at it"
882 /// shape. Declare the field on the parent with
883 /// `#[sqlx(skip)] #[serde(skip)]
884 /// #[umbral(reverse_fk = "<fk_col>")] pub <name>: ReverseSet<C>`
885 /// where `<fk_col>` names the FK column on `C` pointing back.
886 /// One `SELECT * FROM <child> WHERE <fk_col> IN (parent_pks)`
887 /// regardless of parent count — no N+1.
888 ///
889 /// ## Scope (v1)
890 ///
891 /// - **M2M + reverse-FK only.** FK fields go through
892 /// [`Self::select_related`] (batched IN) or
893 /// [`Self::join_related`] (LEFT JOIN).
894 /// - **i64 parent PK only.** Same constraint as the rest of the
895 /// M2M plumbing; models with non-i64 PKs surface a clean
896 /// compile error.
897 /// - **Unknown field name → loud error** (post-#42). If the
898 /// name matches neither an M2M nor a `ReverseSet` field,
899 /// fetch returns a clear `sqlx::Error::Protocol` pointing at
900 /// the right method.
901 pub fn prefetch_related(mut self, field_name: impl Into<String>) -> Self {
902 self.prefetch_related.push(field_name.into());
903 self
904 }
905
906 /// Eagerly load multiple M2M relations. Sugar for chained
907 /// `.prefetch_related(name)` calls.
908 pub fn prefetch_related_many(mut self, field_names: &[&str]) -> Self {
909 for name in field_names {
910 self.prefetch_related.push(name.to_string());
911 }
912 self
913 }
914
915 /// Convert this QuerySet into a [`Subquery`] suitable for use in
916 /// an `IN (SELECT ...)` predicate. Projects only the named
917 /// column; the accumulated WHERE / ORDER BY survive.
918 ///
919 /// `Post::objects().filter(...).into_subquery("author_id")` →
920 /// `Subquery` you can hand to `user::ID.in_subquery(...)`.
921 pub fn into_subquery(self, col_name: &str) -> crate::orm::Subquery {
922 let mut q = self.build_query_for("sqlite");
923 q.clear_selects();
924 q.column(Alias::new(col_name));
925 crate::orm::Subquery::from_select(q)
926 }
927
928 /// Combine this QuerySet with `other` via SQL `UNION` (gap #28).
929 /// Both QuerySets must produce the same column shape — which
930 /// they always do here because both are typed `QuerySet<T>`.
931 /// Duplicates are removed (the de-duplicating UNION, not UNION
932 /// ALL).
933 pub fn union(self, other: QuerySet<T>) -> Self {
934 self.combine(other, sea_query::UnionType::Distinct)
935 }
936
937 /// Combine this QuerySet with `other` via SQL `INTERSECT`
938 /// (gap #28). Returns rows present in BOTH inputs.
939 pub fn intersect(self, other: QuerySet<T>) -> Self {
940 self.combine(other, sea_query::UnionType::Intersect)
941 }
942
943 /// Combine this QuerySet with `other` via SQL `EXCEPT`
944 /// (gap #28). Returns rows present in `self` but not in `other`.
945 pub fn except(self, other: QuerySet<T>) -> Self {
946 self.combine(other, sea_query::UnionType::Except)
947 }
948
949 /// Internal: attach `other`'s SelectStatement to `self`'s
950 /// SelectStatement with the given UnionType. Both sides apply
951 /// their accumulated predicates / ORDER BY before the union.
952 fn combine(mut self, other: QuerySet<T>, ty: sea_query::UnionType) -> Self {
953 let backend = "sqlite";
954 let other_select = other.build_query_for(backend);
955 // Fold our own predicates into the base query so the union
956 // sees them; further `.filter()` calls on the returned
957 // QuerySet would still apply to the OUTER (combined) query.
958 let mut base = self.build_query_for(backend);
959 self.predicates.clear();
960 base.union(ty, other_select);
961 self.query = base;
962 self
963 }
964
965 /// Emit `SELECT DISTINCT ...` for this query (gap #17). Most
966 /// useful when combined with [`Self::values`] to dedupe a
967 /// column-projected list (`distinct().values(&["tag"])`); the
968 /// full-row DISTINCT is rarely what you want.
969 ///
970 /// Postgres-specific `DISTINCT ON (cols)` is deferred until a
971 /// real consumer surfaces the need — the standard `DISTINCT`
972 /// covers most use cases.
973 pub fn distinct(mut self) -> Self {
974 self.query.distinct();
975 self
976 }
977}
978
979/// Resolve the pool to run a terminal against.
980///
981/// Precedence: explicit `.on(&pool)` / `.on_pg(&pool)` override wins;
982/// then the per-model database alias the Plugin contract published
983/// via `Plugin::database()` (FEATURES.md #6); then the `"default"`
984/// pool. Tests that skip the App builder pass an explicit pool and
985/// bypass the alias lookup entirely.
986/// Validate that every name passed to `.join_related(...)` resolves
987/// to a foreign-key field on `T`. Loud-error path that replaces the
988/// pre-#42 silent no-op (where an unknown field, an M2M field, or a
989/// non-FK column produced an empty JOIN list and a confusing
990/// "ForeignKey::resolved() returned None" downstream).
991fn validate_join_related_fields<T: Model>(fields: &[String]) -> Result<(), sqlx::Error> {
992 for field_name in fields {
993 // Nested path (`"plugin__author"` / `"tags__category"`):
994 // validate via the hop resolver. A leading M2M segment is a
995 // valid chain entry even though it isn't an FK on `T`, so
996 // accept it and let `apply_join_related` route it.
997 if field_name.contains("__") {
998 let first = field_name.split("__").next().unwrap_or(field_name);
999 let leads_m2m = T::M2M_RELATIONS.iter().any(|r| r.field_name == first);
1000 if leads_m2m || resolve_join_hops::<T>(field_name).is_some() {
1001 continue;
1002 }
1003 return Err(sqlx::Error::Protocol(format!(
1004 "umbral::orm::join_related: nested path `{field_name}` on `{}` has an \
1005 unresolvable hop (each segment must be a FK or M2M to a registered model)",
1006 T::NAME
1007 )));
1008 }
1009 // Try as a regular column first.
1010 let col = T::FIELDS.iter().find(|f| f.name == field_name.as_str());
1011 if let Some(col) = col {
1012 if col.fk_target.is_some() {
1013 continue; // FK column — OK.
1014 }
1015 return Err(sqlx::Error::Protocol(format!(
1016 "umbral::orm::join_related: field `{field_name}` on `{}` is not a foreign \
1017 key (it has no fk_target)",
1018 T::NAME
1019 )));
1020 }
1021 // M2M field name? Post-#113 these go through the double
1022 // LEFT JOIN path: apply_join_related emits
1023 // `LEFT JOIN <junction> LEFT JOIN <child>` with aliased
1024 // child cols, and fetch()'s dedup-aware path collects M2M
1025 // children per parent. Trade-off documented at the join_related
1026 // docstring — M2M JOINs multiply parent rows by avg
1027 // cardinality, so prefetch_related stays the default for
1028 // any list page where M2M cardinality isn't tiny.
1029 if T::M2M_RELATIONS
1030 .iter()
1031 .any(|r| r.field_name == field_name.as_str())
1032 {
1033 continue;
1034 }
1035 return Err(sqlx::Error::Protocol(format!(
1036 "umbral::orm::join_related: unknown field `{field_name}` on model `{}`",
1037 T::NAME
1038 )));
1039 }
1040 Ok(())
1041}
1042
1043/// One resolved hop of a `join_related` FK chain.
1044#[derive(Debug, Clone)]
1045pub(crate) struct JoinHop {
1046 /// FK column name on the *previous* level's table.
1047 pub(crate) fk_col: String,
1048 /// Table this hop targets.
1049 pub(crate) child_table: String,
1050 /// PK column on `child_table`.
1051 pub(crate) child_pk: String,
1052 /// Was the FK column nullable? (drives auto-inference)
1053 pub(crate) nullable: bool,
1054}
1055
1056/// Resolve a dotted FK path (`"plugin__author"`) into ordered hops.
1057/// Hop 0 reads `T::FIELDS`; deeper hops read the migrate registry's
1058/// `Column`s for the prior hop's target table. Returns `None` (skip,
1059/// emit no JOIN) on any unresolved hop — same forgiving posture as the
1060/// pre-existing one-hop path's silent skip in `to_sql`.
1061///
1062/// FK-only: a path whose FIRST segment is an M2M field is NOT handled
1063/// here (M2M chains route through `apply_join_related`'s M2M branch);
1064/// this returns `None` for such a path.
1065pub(crate) fn resolve_join_hops<T: Model>(path: &str) -> Option<Vec<JoinHop>> {
1066 let registered = crate::migrate::registered_models();
1067 let segs: Vec<&str> = path.split("__").filter(|s| !s.is_empty()).collect();
1068 if segs.is_empty() {
1069 return None;
1070 }
1071 let mut hops = Vec::with_capacity(segs.len());
1072 // Hop 0 off the typed parent.
1073 let f0 = T::FIELDS.iter().find(|f| f.name == segs[0])?;
1074 let t0 = f0.fk_target?;
1075 let m0 = registered.iter().find(|m| m.table == t0)?;
1076 let pk0 = m0.fields.iter().find(|c| c.primary_key)?;
1077 hops.push(JoinHop {
1078 fk_col: segs[0].to_string(),
1079 child_table: t0.to_string(),
1080 child_pk: pk0.name.clone(),
1081 nullable: f0.nullable,
1082 });
1083 let mut current = t0;
1084 for seg in &segs[1..] {
1085 let meta = registered.iter().find(|m| m.table == current)?;
1086 let col = meta.fields.iter().find(|c| c.name == *seg)?;
1087 let tgt = col.fk_target.as_deref()?;
1088 let tmeta = registered.iter().find(|m| m.table == tgt)?;
1089 let pk = tmeta.fields.iter().find(|c| c.primary_key)?;
1090 hops.push(JoinHop {
1091 fk_col: (*seg).to_string(),
1092 child_table: tgt.to_string(),
1093 child_pk: pk.name.clone(),
1094 nullable: col.nullable,
1095 });
1096 current = tgt;
1097 }
1098 Some(hops)
1099}
1100
1101/// Thin wrapper so the backend hydration helpers (a sibling module)
1102/// can resolve a chain without importing the private name directly.
1103pub(crate) fn resolve_join_hops_for<T: Model>(path: &str) -> Option<Vec<JoinHop>> {
1104 resolve_join_hops::<T>(path)
1105}
1106
1107/// Resolve a path whose FIRST segment is an M2M field on `T` into the
1108/// M2M child table + child PK + the onward FK chain hops off that
1109/// child. `onward` is empty for a bare `"tags"`; for `"tags__category"`
1110/// it carries the `category` FK hop off the child table. `None` when
1111/// `segs[0]` isn't an M2M field or any onward hop fails to resolve.
1112pub(crate) fn resolve_m2m_chain<T: Model>(path: &str) -> Option<(String, String, Vec<JoinHop>)> {
1113 let registered = crate::migrate::registered_models();
1114 let segs: Vec<&str> = path.split("__").filter(|s| !s.is_empty()).collect();
1115 let first = segs.first()?;
1116 let rel = T::M2M_RELATIONS.iter().find(|r| r.field_name == *first)?;
1117 let child_meta = registered.iter().find(|m| m.table == rel.target_table)?;
1118 let child_pk = child_meta.fields.iter().find(|c| c.primary_key)?;
1119 let mut onward = Vec::with_capacity(segs.len().saturating_sub(1));
1120 let mut current = rel.target_table;
1121 for seg in &segs[1..] {
1122 let meta = registered.iter().find(|m| m.table == current)?;
1123 let col = meta.fields.iter().find(|c| c.name == *seg)?;
1124 let tgt = col.fk_target.as_deref()?;
1125 let tmeta = registered.iter().find(|m| m.table == tgt)?;
1126 let pk = tmeta.fields.iter().find(|c| c.primary_key)?;
1127 onward.push(JoinHop {
1128 fk_col: (*seg).to_string(),
1129 child_table: tgt.to_string(),
1130 child_pk: pk.name.clone(),
1131 nullable: col.nullable,
1132 });
1133 current = tgt;
1134 }
1135 Some((rel.target_table.to_string(), child_pk.name.clone(), onward))
1136}
1137
1138/// Gap #111 — error returned when a typed terminal (`fetch` / `first`
1139/// / `get`) runs against a QuerySet that has `.only(...)` set. A
1140/// partial-column row can't satisfy `T`'s `FromRow` impl, so the
1141/// caller has to either drop the `.only(...)` (full SELECT, typed
1142/// rows back) or terminate via `.values(&[...])` (JSON rows with
1143/// just the requested columns). The message names the offending
1144/// terminal so the fix is one rename away.
1145fn only_with_typed_terminal_error(terminal: &'static str) -> sqlx::Error {
1146 sqlx::Error::Protocol(format!(
1147 "umbral::orm::{terminal}: cannot run a typed terminal on a QuerySet \
1148 with `.only(...)` set — a partial-column row can't hydrate `T` via \
1149 FromRow. Either drop `.only(...)` to fetch full typed rows, or \
1150 terminate via `.values(&[...])` to get JSON rows with just the \
1151 projected columns."
1152 ))
1153}
1154
1155fn resolve_pool<T: Model>(explicit: Option<DbPool>, op: crate::db::RouteOp) -> DbPool {
1156 if let Some(pool) = explicit {
1157 return pool;
1158 }
1159 // Route through the swappable router when the registry is up.
1160 if let Some(meta) = crate::migrate::model_meta_ref(T::NAME) {
1161 let ctx = crate::db::route_context::current();
1162 let r = crate::db::router::router();
1163 let alias = match op {
1164 crate::db::RouteOp::Read => r.db_for_read(meta, &ctx),
1165 crate::db::RouteOp::Write => r.db_for_write(meta, &ctx),
1166 };
1167 return crate::db::pool_for_dispatched(alias.as_str()).clone();
1168 }
1169 // Registry-less fallback (low-level tests): today's static behavior.
1170 if let Some(alias) = crate::migrate::model_alias(T::NAME) {
1171 return crate::db::pool_for_dispatched(&alias).clone();
1172 }
1173 crate::db::pool_dispatched().clone()
1174}
1175
1176/// Pin a QuerySet to an explicit pool, dispatching SQLite vs Postgres. Used by
1177/// the upsert paths (`get_or_create` / `update_or_create`) so their
1178/// existence-check reads run on the WRITE database — read-your-writes, so a
1179/// read/write-split router never probes a lagging replica and inserts a
1180/// duplicate (or reads a stale row back after the update).
1181fn pin_to_pool<T: Model>(qs: QuerySet<T>, pool: &DbPool) -> QuerySet<T> {
1182 match pool {
1183 DbPool::Sqlite(p) => qs.on(p),
1184 DbPool::Postgres(p) => qs.on_pg(p),
1185 }
1186}
1187
1188// GetError / TryForEachError moved to `errors`; re-exported above.
1189
1190/// Emit a one-shot advisory when a `right_join_related` is applied
1191/// against a SQLite pool.
1192///
1193/// RIGHT/FULL JOIN landed in SQLite 3.39 (June 2022); Postgres has
1194/// always supported it. The boot system check (`check.rs`) can't surface
1195/// this — it's synchronous, has no live pool, and whether a RIGHT join
1196/// is *reachable* is a runtime QuerySet fact rather than static model
1197/// metadata. So the spec's "boot warning" is realized here: the first
1198/// time a RIGHT join is built against a SQLite pool, we `tracing::warn!`
1199/// once per process. We do NOT probe the library version (that needs an
1200/// async round-trip the SQL builder doesn't have) — the precise gate is
1201/// the SQLite driver's own error at execute time on an engine < 3.39;
1202/// this warn is the early nudge, consistent with `check.rs`'s
1203/// `Severity::Warning` posture.
1204///
1205/// Postgres pools and the no-pool case (a pure `to_sql` build with no
1206/// app booted) are silent.
1207fn warn_right_join_on_sqlite() {
1208 use std::sync::Once;
1209 static ONCE: Once = Once::new();
1210 if matches!(
1211 crate::db::try_pool_dispatched(),
1212 Some(crate::db::DbPool::Sqlite(_))
1213 ) {
1214 ONCE.call_once(|| {
1215 tracing::warn!(
1216 "umbral::orm::right_join_related: RIGHT JOIN requires SQLite >= 3.39. \
1217 If your SQLite is older the query will error at execute time; \
1218 Postgres is unaffected. Prefer left_/inner_join_related on SQLite \
1219 unless you've confirmed the engine version."
1220 );
1221 });
1222 }
1223}
1224
1225/// Terminal methods for every `QuerySet<T>` where `T: Model`.
1226///
1227/// Each terminal that materializes `T` carries a FromRow bound on the
1228/// method (not the impl block) — the conjunction of both backends'
1229/// FromRow impls. `#[derive(sqlx::FromRow)]` emits a generic-over-`R`
1230/// impl, so any user struct with standard field types satisfies both
1231/// bounds automatically.
1232impl<T: Model> QuerySet<T> {
1233 /// Render the SQL the QuerySet would execute, without running it.
1234 ///
1235 /// Returns the prepared statement with `?` placeholders for the
1236 /// bound values, exactly the string sqlx would send. Useful for
1237 /// `eprintln!`-style debugging and for tests that want to pin
1238 /// the rendered query without round-tripping through a pool.
1239 ///
1240 /// The bound values are intentionally not surfaced (sqlx's binder
1241 /// types aren't part of umbral's public surface); a `(sql, values)`
1242 /// accessor lands when EXPLAIN-style integration needs it.
1243 ///
1244 /// The rendered placeholder dialect is SQLite's (`?`). When the
1245 /// dispatched pool is Postgres the actual at-execute rendering
1246 /// uses `$1`-style placeholders; the `to_sql` debug surface
1247 /// continues to emit SQLite-style for stability across calls
1248 /// regardless of which pool is registered.
1249 pub fn to_sql(&self) -> String {
1250 let mut q = self.build_query_for("sqlite");
1251 self.apply_join_related(&mut q);
1252 self.apply_only_projection(&mut q);
1253 let (sql, _values) = q.build_sqlx(SqliteQueryBuilder);
1254 sql
1255 }
1256
1257 /// Render the QuerySet's SQL against the **Postgres** dialect,
1258 /// without running it. Companion to [`Self::to_sql`].
1259 ///
1260 /// The two render slightly different placeholder syntax (`?` for
1261 /// SQLite, `$1..$N` for Postgres) and any Postgres-specific
1262 /// operators like the array `@>` / `<@` / `&&` family only render
1263 /// correctly through this entry point — `to_sql`'s SQLite path
1264 /// leaves `$N` tokens in the template untouched. Use this when
1265 /// debugging a Postgres query or asserting on the rendered shape
1266 /// in tests.
1267 pub fn to_sql_pg(&self) -> String {
1268 let mut q = self.build_query_for("postgres");
1269 self.apply_join_related(&mut q);
1270 self.apply_only_projection(&mut q);
1271 let (sql, _values) = q.build_sqlx(PostgresQueryBuilder);
1272 sql
1273 }
1274
1275 /// Internal helper — when `.only(...)` was set, swap the SELECT
1276 /// list for just those columns. Shared by `to_sql` / `to_sql_pg`
1277 /// so the inspection surface stays in sync with what `values()`
1278 /// would emit. No-op when `only_cols` is `None`.
1279 fn apply_only_projection(&self, q: &mut sea_query::SelectStatement) {
1280 if let Some(cols) = &self.only_cols {
1281 q.clear_selects();
1282 for c in cols {
1283 q.column(Alias::new(c.as_str()));
1284 }
1285 }
1286 }
1287
1288 /// Internal helper — when `.join_related(name)` was set, wrap the
1289 /// current query as a subquery and build an outer SELECT that
1290 /// LEFT JOINs every requested related table (with child columns
1291 /// aliased as `<field>__<col>`). The subquery wrapper is the
1292 /// load-bearing trick: WHERE / ORDER BY / LIMIT predicates inside
1293 /// the inner query reference parent columns only — there's no
1294 /// JOIN in scope, so bare names like `id` resolve unambiguously
1295 /// to the parent. Without this wrapper SQLite raises
1296 /// "ambiguous column name: id" on any predicate sharing a column
1297 /// name with a JOIN'd table.
1298 ///
1299 /// No-op when `join_related` is empty. Unknown field names /
1300 /// unregistered related models / FK columns missing `fk_target`
1301 /// are silently skipped — the SQL just won't carry the JOIN and
1302 /// the caller notices when `ForeignKey::resolved()` stays empty.
1303 fn apply_join_related(&self, q: &mut sea_query::SelectStatement) {
1304 if self.join_related.is_empty() {
1305 return;
1306 }
1307 use sea_query::{Expr, Query};
1308 let registered = crate::migrate::registered_models();
1309
1310 // Inner-subquery column trim. When `.only(...)` is also set,
1311 // the outer SELECT only references a subset of parent
1312 // columns (plus the JOIN'd child columns it gets through
1313 // its alias). The inner subquery only needs to expose:
1314 // - parent columns named in `.only(...)` (intersected
1315 // with T::FIELDS so the JOIN aliases like
1316 // `category__name` don't leak in here — those live on
1317 // the JOIN'd table, not on the parent),
1318 // - PLUS the FK columns each `.join_related(name)` needs
1319 // for its `ON __p.<name> = <child>.<pk>` clause.
1320 // WHERE / ORDER BY inside the inner subquery can still
1321 // reference any parent column (SQL doesn't require an
1322 // ORDER BY column to be in the SELECT list), so we don't
1323 // need to promote those. Postgres often skips this prune
1324 // through subquery boundaries; SQLite usually doesn't —
1325 // either way trimming here is a measurable win on wide
1326 // tables (think 30-column Product on a busy hot path).
1327 if let Some(only) = &self.only_cols {
1328 let parent_field_names: std::collections::HashSet<&str> =
1329 T::FIELDS.iter().map(|f| f.name).collect();
1330 let mut needed: std::collections::HashSet<String> = only
1331 .iter()
1332 .filter(|c| parent_field_names.contains(c.as_str()))
1333 .cloned()
1334 .collect();
1335 for jr in &self.join_related {
1336 // Nested paths only need the FIRST hop's FK column at
1337 // the parent level; deeper hops join off the prior
1338 // level's alias, not the parent subquery.
1339 let join_field = jr.path.split("__").next().unwrap_or(jr.path.as_str());
1340 if parent_field_names.contains(join_field) {
1341 needed.insert(join_field.to_string());
1342 }
1343 }
1344 if !needed.is_empty() {
1345 q.clear_selects();
1346 // Stable ordering so the SQL is deterministic
1347 // across runs (HashSet iteration is not).
1348 let mut ordered: Vec<String> = needed.into_iter().collect();
1349 ordered.sort();
1350 for col in &ordered {
1351 q.column(Alias::new(col.as_str()));
1352 }
1353 }
1354 }
1355
1356 // Take ownership of the (possibly-trimmed) inner query and
1357 // re-mount it as the FROM clause of the new outer SELECT.
1358 let inner = std::mem::replace(q, Query::select().take());
1359 let parent_alias = Alias::new("__p");
1360 let mut outer = Query::select();
1361 outer.from_subquery(inner, parent_alias.clone());
1362 // Re-project the parent's full column set so the outer SELECT
1363 // exposes them unaliased — FromRow on `T` reads parent
1364 // columns by their bare names (`id`, `name`, ...). When
1365 // `.only(...)` later clears this list in
1366 // `apply_only_projection`, the inner-subquery trim above
1367 // means we still didn't pay for columns we ended up
1368 // dropping anyway.
1369 for f in T::FIELDS {
1370 outer.expr(Expr::col((parent_alias.clone(), Alias::new(f.name))));
1371 }
1372 // Set when any emitted hop is a RIGHT JOIN — drives the
1373 // once-per-process old-SQLite advisory after the emit loop.
1374 let mut emitted_right = false;
1375 for jr in &self.join_related {
1376 let field_name = &jr.path;
1377 // FK chain branch first. A (possibly nested) FK path splits
1378 // on `__` into ordered hops; each hop joins onto the prior
1379 // level's alias, and the DEEPEST hop's child columns are
1380 // aliased by the full dotted path so hydration can rebuild
1381 // the nested relation graph. The single-hop case is
1382 // byte-identical in child-column aliases to the pre-nesting
1383 // path (`<field>__<col>`); only the internal join alias
1384 // gains an `_h{idx}` suffix, which no test asserts.
1385 if let Some(hops) = resolve_join_hops::<T>(field_name) {
1386 let mut prev_alias = parent_alias.clone();
1387 let last = hops.len() - 1;
1388 // Cumulative dotted prefix per hop so EVERY level's own
1389 // columns ride along, aliased by its path-so-far. Hop 0
1390 // of `plugin__author` is `plugin`, hop 1 is
1391 // `plugin__author`. Selecting every level (not just the
1392 // leaf) is what lets hydration rebuild a FULL nested
1393 // object — the intermediate `plugin` row needs its own
1394 // `id`/`name` to deserialise into `ForeignKey<Plugin>`
1395 // before `author` nests inside it.
1396 let segs: Vec<&str> = field_name.split("__").collect();
1397 for (idx, hop) in hops.iter().enumerate() {
1398 let hop_alias = Alias::new(format!("__j_{field_name}_h{idx}"));
1399 // Last hop: explicit request, else infer from THIS
1400 // hop's nullability. Intermediate hops always infer
1401 // per-hop (an INNER can nest inside an outer
1402 // LEFT etc.); an explicit kind only pins the leaf.
1403 let kind = if idx == last {
1404 jr.kind.unwrap_or(if hop.nullable {
1405 JoinKind::Left
1406 } else {
1407 JoinKind::Inner
1408 })
1409 } else if hop.nullable {
1410 JoinKind::Left
1411 } else {
1412 JoinKind::Inner
1413 };
1414 emitted_right |= kind == JoinKind::Right;
1415 outer.join_as(
1416 kind.sea(),
1417 crate::db::router::schema_qualified_table(hop.child_table.as_str()),
1418 hop_alias.clone(),
1419 Expr::col((prev_alias.clone(), Alias::new(hop.fk_col.as_str())))
1420 .equals((hop_alias.clone(), Alias::new(hop.child_pk.as_str()))),
1421 );
1422 if let Some(meta) = registered.iter().find(|m| m.table == hop.child_table) {
1423 // Cumulative dotted prefix for this hop's columns.
1424 let prefix = segs[..=idx].join("__");
1425 for col in &meta.fields {
1426 let alias = format!("{}__{}", prefix, col.name);
1427 outer.expr_as(
1428 Expr::col((hop_alias.clone(), Alias::new(col.name.as_str()))),
1429 Alias::new(alias),
1430 );
1431 }
1432 }
1433 prev_alias = hop_alias;
1434 }
1435 continue;
1436 }
1437
1438 // M2M branch (post-#113). Emit the double LEFT JOIN
1439 // through the junction table:
1440 // LEFT JOIN <junction> AS __jm_<field>
1441 // ON __p.<parent_pk> = __jm_<field>.parent_id
1442 // LEFT JOIN <child_table> AS __j_<field>
1443 // ON __jm_<field>.child_id = __j_<field>.<child_pk>
1444 // Aliased child cols use the same `<field>__<col>` shape
1445 // as the FK branch so the decode helper can be reused.
1446 // The M2M field is the FIRST segment of the path; a nested
1447 // path like `"tags__category"` passes THROUGH the M2M hop
1448 // and continues with an onward FK chain off the child.
1449 let m2m_seg = field_name.split("__").next().unwrap_or(field_name.as_str());
1450 if let Some(m2m_rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg)
1451 && let Some(parent_pk) = T::FIELDS.iter().find(|f| f.primary_key)
1452 && let Some(child_meta) =
1453 registered.iter().find(|m| m.table == m2m_rel.target_table)
1454 && let Some(child_pk) = child_meta.fields.iter().find(|c| c.primary_key)
1455 {
1456 // Junction table + aliases key off the M2M field name
1457 // (segs[0]), NOT the full dotted path.
1458 let junction_table = format!("{}_{}", T::TABLE, m2m_seg);
1459 let junction_alias = Alias::new(format!("__jm_{m2m_seg}"));
1460 let child_alias = Alias::new(format!("__j_{m2m_seg}"));
1461 // The junction hop stays LEFT so a parent with zero
1462 // junction rows isn't dropped by the join to the
1463 // junction table itself — the CHILD hop's kind is what
1464 // decides drop/keep. Plain `join_related` (kind None)
1465 // leaves the child LEFT too, preserving the shipped
1466 // double-LEFT-JOIN M2M behavior (a tag-less parent
1467 // survives with an empty M2M slot). An explicit
1468 // inner_join_related drops parents whose relation is
1469 // absent: the junction-LEFT miss yields a NULL child_id,
1470 // then the child INNER on NULL has no match -> the parent
1471 // is dropped, which is the INNER contract.
1472 let child_kind = jr.kind.unwrap_or(JoinKind::Left);
1473 emitted_right |= child_kind == JoinKind::Right;
1474 outer.join_as(
1475 sea_query::JoinType::LeftJoin,
1476 crate::db::router::schema_qualified_table(&junction_table),
1477 junction_alias.clone(),
1478 Expr::col((parent_alias.clone(), Alias::new(parent_pk.name)))
1479 .equals((junction_alias.clone(), Alias::new("parent_id"))),
1480 );
1481 outer.join_as(
1482 child_kind.sea(),
1483 crate::db::router::schema_qualified_table(m2m_rel.target_table),
1484 child_alias.clone(),
1485 Expr::col((junction_alias.clone(), Alias::new("child_id")))
1486 .equals((child_alias.clone(), Alias::new(child_pk.name.as_str()))),
1487 );
1488 // Child columns aliased by the M2M field name so the
1489 // M2M decode path (`<m2m_field>__<col>`) reads them.
1490 for col in &child_meta.fields {
1491 let alias = format!("{}__{}", m2m_seg, col.name);
1492 outer.expr_as(
1493 Expr::col((child_alias.clone(), Alias::new(col.name.as_str()))),
1494 Alias::new(alias),
1495 );
1496 }
1497 // Onward FK chain off the child (segs[1..]). Each hop
1498 // joins onto the prior level's alias and aliases its
1499 // columns by the cumulative dotted path
1500 // (`tags__category__name`) so the M2M decode path can
1501 // nest the onward object into each child row.
1502 if let Some((_child_table, _child_pk, onward)) = resolve_m2m_chain::<T>(field_name)
1503 {
1504 let segs: Vec<&str> = field_name.split("__").collect();
1505 let mut prev_alias = child_alias.clone();
1506 for (i, hop) in onward.iter().enumerate() {
1507 // segs index for this hop: segs[0] is the M2M
1508 // field, segs[1] is onward[0], etc.
1509 let seg_idx = i + 1;
1510 let hop_alias = Alias::new(format!("__j_{m2m_seg}_o{i}"));
1511 let kind = if hop.nullable {
1512 JoinKind::Left
1513 } else {
1514 JoinKind::Inner
1515 };
1516 outer.join_as(
1517 kind.sea(),
1518 crate::db::router::schema_qualified_table(hop.child_table.as_str()),
1519 hop_alias.clone(),
1520 Expr::col((prev_alias.clone(), Alias::new(hop.fk_col.as_str())))
1521 .equals((hop_alias.clone(), Alias::new(hop.child_pk.as_str()))),
1522 );
1523 if let Some(meta) = registered.iter().find(|m| m.table == hop.child_table) {
1524 let prefix = segs[..=seg_idx].join("__");
1525 for col in &meta.fields {
1526 let alias = format!("{}__{}", prefix, col.name);
1527 outer.expr_as(
1528 Expr::col((hop_alias.clone(), Alias::new(col.name.as_str()))),
1529 Alias::new(alias),
1530 );
1531 }
1532 }
1533 prev_alias = hop_alias;
1534 }
1535 }
1536 continue;
1537 }
1538 }
1539 // A RIGHT JOIN against SQLite needs >= 3.39; warn once per
1540 // process. Postgres / no-pool builds stay silent.
1541 if emitted_right {
1542 warn_right_join_on_sqlite();
1543 }
1544 *q = outer;
1545 }
1546
1547 /// Run the SELECT and return every matching row.
1548 ///
1549 /// If `.select_related(name)` was called, a follow-up batch query
1550 /// populates `ForeignKey<U>.resolved` for each named field before
1551 /// the rows are returned.
1552 pub async fn fetch(self) -> Result<Vec<T>, sqlx::Error>
1553 where
1554 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1555 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1556 + HydrateRelated,
1557 {
1558 if self.only_cols.is_some() {
1559 return Err(only_with_typed_terminal_error("fetch"));
1560 }
1561 let sr_fields = self.select_related.clone();
1562 let prefetch_fields = self.prefetch_related.clone();
1563 let join_reqs = self.join_related.clone();
1564 let join_fields: Vec<String> = join_reqs.iter().map(|j| j.path.clone()).collect();
1565 // Validate join_related field names up front so a typo or an
1566 // M2M field name doesn't silently no-op (it used to render a
1567 // SELECT with no JOIN). Pre-#42 the failure mode was
1568 // `ForeignKey::resolved()` stays None and the caller debugs
1569 // the wrong thing. Now they get a typed error.
1570 validate_join_related_fields::<T>(&join_fields)?;
1571 // The turbofish on `query_as_with::<DB, _, _>` is load-bearing:
1572 // with both `sqlx-sqlite` and `sqlx-postgres` features on
1573 // sea-query-binder, `SqlxValues` implements `IntoArguments` for
1574 // both backends, so the compiler can't infer DB from the values
1575 // alone. Naming DB explicitly pins which `FromRow` bound is
1576 // checked.
1577 // Split join_fields into FK vs M2M groups up front. The
1578 // M2M branch needs parent dedup (one parent row per JOIN
1579 // combo would surface duplicate Ts to the caller); the FK
1580 // branch is one-to-one with rows.
1581 // A path whose FIRST segment is an M2M field routes to the M2M
1582 // group even when it continues with an onward FK chain
1583 // (`"tags__category"`): the junction double-join + parent dedup
1584 // live on the M2M side, and the onward FK nests into each child.
1585 let (m2m_join_fields, fk_join_fields): (Vec<String>, Vec<String>) =
1586 join_fields.iter().cloned().partition(|f| {
1587 let first = f.split("__").next().unwrap_or(f.as_str());
1588 T::M2M_RELATIONS.iter().any(|r| r.field_name == first)
1589 });
1590 let has_m2m_join = !m2m_join_fields.is_empty();
1591
1592 let mut rows = match resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read)
1593 {
1594 DbPool::Sqlite(pool) => {
1595 let mut q = self.build_query_for("sqlite");
1596 self.apply_join_related(&mut q);
1597 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1598 if join_fields.is_empty() {
1599 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
1600 .fetch_all(&pool)
1601 .await?
1602 } else if !has_m2m_join {
1603 // FK-only JOIN path: one row in → one T out.
1604 let raw_rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
1605 .fetch_all(&pool)
1606 .await?;
1607 let mut typed = Vec::with_capacity(raw_rows.len());
1608 for row in &raw_rows {
1609 let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
1610 backend_sqlite::hydrate_joined_rels::<T>(&mut t, row, &fk_join_fields)?;
1611 typed.push(t);
1612 }
1613 typed
1614 } else {
1615 // Mixed (FK + M2M) or pure M2M JOIN path:
1616 // dedup parents, collect M2M children per
1617 // (parent_pk, field).
1618 let raw_rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
1619 .fetch_all(&pool)
1620 .await?;
1621 dedup_decode_sqlite::<T>(&raw_rows, &fk_join_fields, &m2m_join_fields)?
1622 }
1623 }
1624 DbPool::Postgres(pool) => {
1625 let mut q = self.build_query_for("postgres");
1626 self.apply_join_related(&mut q);
1627 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1628 if join_fields.is_empty() {
1629 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
1630 .fetch_all(&pool)
1631 .await?
1632 } else if !has_m2m_join {
1633 let raw_rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
1634 .fetch_all(&pool)
1635 .await?;
1636 let mut typed = Vec::with_capacity(raw_rows.len());
1637 for row in &raw_rows {
1638 let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
1639 backend_pg::hydrate_joined_rels::<T>(&mut t, row, &fk_join_fields)?;
1640 typed.push(t);
1641 }
1642 typed
1643 } else {
1644 let raw_rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
1645 .fetch_all(&pool)
1646 .await?;
1647 dedup_decode_pg::<T>(&raw_rows, &fk_join_fields, &m2m_join_fields)?
1648 }
1649 }
1650 };
1651 // BUG-16 step 2: wire each row's PK into its `M2M<U>` slots so
1652 // `add`/`remove`/`clear` know which parent they belong to.
1653 // No-op for models with no M2M fields.
1654 for r in &mut rows {
1655 r.set_m2m_parent_ids();
1656 }
1657 if !sr_fields.is_empty() {
1658 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1659 hydrate_select_related::<T>(&mut rows, &sr_fields, &pool).await?;
1660 }
1661 if !prefetch_fields.is_empty() {
1662 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1663 hydrate_prefetch_related::<T>(&mut rows, &prefetch_fields, &pool).await?;
1664 }
1665 Ok(rows)
1666 }
1667
1668 /// Feature 29 Phase 1 — chunked streaming via a callback.
1669 ///
1670 /// Runs the SELECT in pages of `chunk_size` rows and invokes
1671 /// `callback` once per row. Memory bound = `chunk_size *
1672 /// sizeof::<T>` instead of the full row count `fetch()` would
1673 /// buffer, so this is the right shape for million-row exports,
1674 /// migrations, and batch transforms.
1675 ///
1676 /// Deliberately NOT named `iterator()` — that name suggests a
1677 /// `Stream`-shaped return value, which would force a
1678 /// `futures-util` dep. The callback shape is idiomatic Rust,
1679 /// requires no new crates, and ships the same memory bound. A
1680 /// future `iterator()` returning `BoxStream<T>` can land later
1681 /// once `futures-util` is in the workspace for some other reason
1682 /// (likely SSE / WebSockets).
1683 ///
1684 /// Error contract: the callback may return any error type `E`.
1685 /// SQL failures become `TryForEachError::Sqlx`; callback errors
1686 /// become `TryForEachError::Callback(e)`. The first error stops
1687 /// the walk — subsequent rows are not fetched.
1688 ///
1689 /// Caveats: pages are stable only if the result set isn't being
1690 /// mutated concurrently. For consistent-snapshot iteration over
1691 /// a live table, wrap the call in a serialised-or-repeatable-read
1692 /// transaction. `select_related` and `prefetch_related` hooks
1693 /// are NOT applied on each row — `try_for_each` is intentionally
1694 /// the "raw column data, one row at a time" terminal.
1695 pub async fn try_for_each<F, E>(
1696 self,
1697 chunk_size: usize,
1698 mut callback: F,
1699 ) -> Result<(), TryForEachError<E>>
1700 where
1701 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1702 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1703 + HydrateRelated,
1704 F: FnMut(T) -> Result<(), E>,
1705 {
1706 let chunk_size = chunk_size.max(1);
1707 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1708 let mut offset: u64 = 0;
1709 loop {
1710 let mut rows: Vec<T> = match &pool {
1711 DbPool::Sqlite(pg) => {
1712 let mut q = self.build_query_for("sqlite");
1713 q.limit(chunk_size as u64).offset(offset);
1714 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1715 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
1716 .fetch_all(pg)
1717 .await
1718 .map_err(TryForEachError::Sqlx)?
1719 }
1720 DbPool::Postgres(pg) => {
1721 let mut q = self.build_query_for("postgres");
1722 q.limit(chunk_size as u64).offset(offset);
1723 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1724 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
1725 .fetch_all(pg)
1726 .await
1727 .map_err(TryForEachError::Sqlx)?
1728 }
1729 };
1730 let fetched = rows.len();
1731 if fetched == 0 {
1732 break;
1733 }
1734 for row in rows.drain(..) {
1735 callback(row).map_err(TryForEachError::Callback)?;
1736 }
1737 if fetched < chunk_size {
1738 break;
1739 }
1740 offset += fetched as u64;
1741 }
1742 Ok(())
1743 }
1744
1745 /// Run the SELECT with LIMIT 1 and return the first row, if any.
1746 pub async fn first(mut self) -> Result<Option<T>, sqlx::Error>
1747 where
1748 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1749 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1750 + HydrateRelated,
1751 {
1752 if self.only_cols.is_some() {
1753 return Err(only_with_typed_terminal_error("first"));
1754 }
1755 // Review #2: delegate to `fetch()` with LIMIT 1 so select_related,
1756 // prefetch_related, AND join_related are all hydrated. `first()`
1757 // used to build a plain query and hydrate only select_related, so
1758 // `.prefetch_related("tags").first()` returned an unprefetched row
1759 // and `.join_related("author").first()` an unresolved join — both
1760 // silently. (For a to-many `join_related`, LIMIT 1 truncates the
1761 // joined children the same way `fetch()` does with `.limit(1)`;
1762 // prefer `prefetch_related` there.)
1763 self.query.limit(1);
1764 let rows = self.fetch().await?;
1765 Ok(rows.into_iter().next())
1766 }
1767
1768 /// Return the row with the smallest value in `col_name`. Sugar
1769 /// for `order_by(col.asc()).first()`. The `earliest('created_at')`
1770 /// terminal.
1771 ///
1772 /// Takes a `&'static str` column name (same shape as
1773 /// `select_related`) so the call site stays terse:
1774 /// `.earliest("created_at")` reads naturally without spelling out
1775 /// `.asc()`.
1776 pub async fn earliest(self, col_name: &'static str) -> Result<Option<T>, sqlx::Error>
1777 where
1778 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1779 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1780 + HydrateRelated,
1781 {
1782 self.order_by(OrderExpr::new(col_name, false)).first().await
1783 }
1784
1785 /// Return the row with the largest value in `col_name`. Sugar
1786 /// for `order_by(col.desc()).first()`. The `latest('created_at')`
1787 /// terminal.
1788 pub async fn latest(self, col_name: &'static str) -> Result<Option<T>, sqlx::Error>
1789 where
1790 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1791 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1792 + HydrateRelated,
1793 {
1794 self.order_by(OrderExpr::new(col_name, true)).first().await
1795 }
1796
1797 /// Fetch many rows by their primary keys and return a
1798 /// `HashMap<T::PrimaryKey, T>` keyed by PK. The everyday companion to
1799 /// a cached list of ids — `User::objects().in_bulk(user_ids)` gives
1800 /// you direct lookup access without a second `.iter().find(...)` pass
1801 /// per id.
1802 ///
1803 /// Missing ids are silently absent from the map; callers that
1804 /// need the existence check can compare `map.len()` to
1805 /// `pks.len()`. Empty input is a no-op (returns the empty map).
1806 ///
1807 /// PK-agnostic (PK lift — was `Vec<i64>` / `HashMap<i64, T>`): the key
1808 /// is the model's `PrimaryKey` type, so i64-, String/slug-, and
1809 /// Uuid-keyed models all work. The map key requires `Hash + Eq`,
1810 /// which every standard PK type (integers, `String`, `Uuid`) satisfies.
1811 pub async fn in_bulk(
1812 self,
1813 pks: Vec<T::PrimaryKey>,
1814 ) -> Result<HashMap<T::PrimaryKey, T>, sqlx::Error>
1815 where
1816 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1817 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1818 + HydrateRelated,
1819 T::PrimaryKey: std::hash::Hash + Eq,
1820 {
1821 if pks.is_empty() {
1822 return Ok(HashMap::new());
1823 }
1824 let pk_name = pk_field::<T>().map(|f| f.name).unwrap_or("id");
1825 // Each PK converts to a `sea_query::Value` (the `PrimaryKey` trait
1826 // bounds `Into<sea_query::Value>`); wrap as a `SimpleExpr` for the
1827 // IN-list so any PK shape binds correctly.
1828 let pk_pred: Predicate<T> = Predicate::new(
1829 Expr::col(Alias::new(pk_name)).is_in(
1830 pks.into_iter()
1831 .map(|p| sea_query::SimpleExpr::Value(p.into())),
1832 ),
1833 );
1834 let rows = self.filter(pk_pred).fetch().await?;
1835 let mut out: HashMap<T::PrimaryKey, T> = HashMap::with_capacity(rows.len());
1836 for row in rows {
1837 out.insert(row.primary_key(), row);
1838 }
1839 Ok(out)
1840 }
1841
1842 /// Return the database's execution plan for this query as a
1843 /// plain-text string. Doesn't run the underlying query — just
1844 /// asks the DB how it would be executed.
1845 ///
1846 /// Backend dispatch:
1847 ///
1848 /// - SQLite: `EXPLAIN QUERY PLAN <sql>` — returns the planner's
1849 /// nested loop hierarchy, one row per access step.
1850 /// - Postgres: `EXPLAIN <sql>` — returns the default text plan.
1851 /// For machine-readable output use raw sqlx with
1852 /// `EXPLAIN (FORMAT JSON)`; the framework defaults to text
1853 /// because most callers want eyeball-able output.
1854 ///
1855 /// Lines are joined with newlines. The returned string is what a
1856 /// developer would paste into a debugger or a perf-review issue.
1857 pub async fn explain(self) -> Result<String, sqlx::Error> {
1858 // Annotations are part of the built query (they render inside
1859 // build_query_for), so the plan below includes them — but a
1860 // poisoned annotation (unknown relation) must fail loudly
1861 // here, not silently vanish from the plan.
1862 self.check_annotations()?;
1863 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1864 let backend = pool.backend_name();
1865 let q = self.build_query_for(backend);
1866 match pool {
1867 DbPool::Sqlite(pool) => {
1868 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1869 let explain_sql = format!("EXPLAIN QUERY PLAN {sql}");
1870 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&explain_sql, vals)
1871 .fetch_all(&pool)
1872 .await?;
1873 let mut out = String::new();
1874 for row in &rows {
1875 use sqlx::Row;
1876 // SQLite returns: id, parent, notused, detail.
1877 // The `detail` column is the human-readable step.
1878 let detail: String = row.try_get("detail")?;
1879 if !out.is_empty() {
1880 out.push('\n');
1881 }
1882 out.push_str(&detail);
1883 }
1884 Ok(out)
1885 }
1886 DbPool::Postgres(pool) => {
1887 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1888 let explain_sql = format!("EXPLAIN {sql}");
1889 let rows = sqlx::query_with::<sqlx::Postgres, _>(&explain_sql, vals)
1890 .fetch_all(&pool)
1891 .await?;
1892 let mut out = String::new();
1893 for row in &rows {
1894 use sqlx::Row;
1895 // Postgres EXPLAIN returns one column named
1896 // "QUERY PLAN", one row per line of the plan.
1897 let line: String = row.try_get("QUERY PLAN")?;
1898 if !out.is_empty() {
1899 out.push('\n');
1900 }
1901 out.push_str(&line);
1902 }
1903 Ok(out)
1904 }
1905 }
1906 }
1907
1908 /// Run `SELECT COUNT(*)` against the same FROM + WHERE.
1909 ///
1910 /// Reshapes the query rather than wrapping the existing SELECT: the
1911 /// projection becomes `COUNT(*)` and LIMIT/OFFSET drop away. ORDER
1912 /// BY is harmless on a scalar aggregate and is left in place. The
1913 /// row type is `(i64,)` so the FromRow constraint comes from sqlx's
1914 /// tuple impl rather than the user struct — count() doesn't need
1915 /// T's FromRow bounds.
1916 pub async fn count(self) -> Result<i64, sqlx::Error> {
1917 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
1918 let backend = pool.backend_name();
1919 // Build the dialect-appropriate filtered query first, then
1920 // rebuild as COUNT. Doing it in this order keeps the predicate
1921 // walk pluggable per backend without duplicating the COUNT
1922 // rewrite logic across branches.
1923 let mut rebuilt = self.build_query_for(backend);
1924 rebuilt.clear_selects();
1925 // Postgres rejects `"*"` as a quoted identifier (SQLite tolerates
1926 // it); use sea_query's Asterisk token which renders bare `*`
1927 // on both backends.
1928 rebuilt.expr(Func::count(Expr::col(sea_query::Asterisk)));
1929 rebuilt.reset_limit();
1930 rebuilt.reset_offset();
1931
1932 match pool {
1933 DbPool::Sqlite(pool) => {
1934 let (sql, values) = rebuilt.build_sqlx(SqliteQueryBuilder);
1935 let (n,): (i64,) = sqlx::query_as_with::<sqlx::Sqlite, (i64,), _>(&sql, values)
1936 .fetch_one(&pool)
1937 .await?;
1938 Ok(n)
1939 }
1940 DbPool::Postgres(pool) => {
1941 let (sql, values) = rebuilt.build_sqlx(PostgresQueryBuilder);
1942 let (n,): (i64,) = sqlx::query_as_with::<sqlx::Postgres, (i64,), _>(&sql, values)
1943 .fetch_one(&pool)
1944 .await?;
1945 Ok(n)
1946 }
1947 }
1948 }
1949
1950 /// Return whether any row matches.
1951 ///
1952 /// M1 keeps the simple form: add LIMIT 1, fetch, check non-empty.
1953 /// A later milestone may swap the projection for `SELECT 1` to
1954 /// skip column materialisation.
1955 pub async fn exists(self) -> Result<bool, sqlx::Error>
1956 where
1957 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1958 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1959 + HydrateRelated,
1960 {
1961 let rows = self.limit(1).fetch().await?;
1962 Ok(!rows.is_empty())
1963 }
1964
1965 /// `.get()` — the exactly-one terminal.
1966 ///
1967 /// Returns `Ok(row)` when the filter chain matches exactly one
1968 /// row. The two not-exactly-one cases each get their own
1969 /// `GetError` variant so the caller can branch deliberately:
1970 ///
1971 /// - [`GetError::NotFound`] — zero rows matched. The right
1972 /// choice for "fetch the row this user just clicked on; 404
1973 /// if it's gone."
1974 /// - [`GetError::MultipleObjectsReturned`] — more than one row
1975 /// matched. The right choice for filters that should be
1976 /// uniquely-keyed (e.g. `.filter(user::EMAIL.eq("..."))`
1977 /// when email has a UNIQUE constraint); a result of 2+ is a
1978 /// data-integrity bug worth crashing on.
1979 /// - The underlying sqlx error wraps as [`GetError::Sqlx`].
1980 ///
1981 /// Internally this issues `SELECT ... LIMIT 2` — the cheapest
1982 /// way to distinguish "one row" from "many." The second row, if
1983 /// it exists, isn't materialised beyond the bare FromRow call.
1984 ///
1985 /// ```ignore
1986 /// match Post::objects().filter(post::ID.eq(42)).get().await {
1987 /// Ok(p) => /* render */,
1988 /// Err(GetError::NotFound) => /* 404 */,
1989 /// Err(GetError::MultipleObjectsReturned) => unreachable!("ID is unique"),
1990 /// Err(GetError::Sqlx(e)) => /* 500 */,
1991 /// }
1992 /// ```
1993 pub async fn get(self) -> Result<T, GetError>
1994 where
1995 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
1996 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
1997 + HydrateRelated,
1998 {
1999 let mut rows = self.limit(2).fetch().await.map_err(GetError::Sqlx)?;
2000 match rows.len() {
2001 0 => Err(GetError::NotFound),
2002 1 => Ok(rows.pop().unwrap()),
2003 _ => Err(GetError::MultipleObjectsReturned),
2004 }
2005 }
2006
2007 // =====================================================================
2008 // Postgres-only terminals (Phase 4.1).
2009 //
2010 // Models with Postgres-only field types (`Vec<T>` arrays, the future
2011 // Hstore / CIDR / FullTextSearch types) can't satisfy the dual
2012 // FromRow bound on `fetch` / `first` / `count` / `exists`. These
2013 // `_pg` variants bound on `FromRow<PgRow>` alone, take the pool as
2014 // an argument, and skip the dispatch — the call site explicitly
2015 // says "this model is Postgres-only."
2016 //
2017 // For models with portable fields, the existing `fetch` etc. stay
2018 // the recommended call: they pick up the ambient pool and route
2019 // through `.on(&pool)` / `.on_pg(&pool)` overrides exactly as
2020 // Phase 2.5 documented.
2021 // =====================================================================
2022
2023 // =====================================================================
2024 // Write terminals — DELETE and UPDATE.
2025 //
2026 // Both apply the accumulated filter predicates as the WHERE clause,
2027 // dispatch to the resolved pool's backend, and return the affected-
2028 // rows count from sqlx. No row materialisation — DELETE is keyless,
2029 // and UPDATE doesn't do a RETURNING read-back at v1 (use
2030 // `.filter(...).fetch()` after a write if you need the updated
2031 // rows back).
2032 //
2033 // **Without a `.filter(...)`, both terminals affect every row in
2034 // the table.** That mirrors raw SQL semantics; the type system
2035 // can't distinguish "I forgot the filter" from "I really meant to
2036 // truncate." Users protecting against accidental full-table writes
2037 // wrap their callers or assert a row count via `.count()` first.
2038 // =====================================================================
2039
2040 /// Project the query to only the named columns, returning a
2041 /// vector of `serde_json::Value::Object` rows instead of typed
2042 /// `T` instances. The columns-projection terminal: `values('id', 'title')`.
2043 ///
2044 /// Use when a list view only needs a few fields — skipping the
2045 /// 50KB body BLOB on every Post saves both memory and the
2046 /// FromRow hydration overhead. Each returned `Value` is an
2047 /// object keyed by the requested column names, with values
2048 /// typed per the column's declared SqlType (integers stay
2049 /// integers, booleans stay booleans, dates render as ISO
2050 /// strings).
2051 ///
2052 /// Unknown column names fail loudly with
2053 /// `sqlx::Error::Protocol` naming the offending column.
2054 /// Composes with `filter`, `exclude`, `order_by`, `limit`,
2055 /// `offset` exactly the way the typed terminals do.
2056 ///
2057 /// ```rust,ignore
2058 /// let rows = Post::objects()
2059 /// .filter(post::PUBLISHED.eq(true))
2060 /// .order_by(post::ID.desc())
2061 /// .values(&["id", "title"])
2062 /// .await?;
2063 /// // [ { "id": 3, "title": "c" }, ... ]
2064 /// ```
2065 pub async fn values(self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
2066 // Gap #46 follow-up: if any name uses `__` traversal
2067 // (`author__id`), route to the JOIN-aware path that builds
2068 // nested per-relation JSON objects. The unbranched path
2069 // below stays byte-for-byte identical for the common
2070 // parent-cols-only case.
2071 if columns.iter().any(|c| c.contains("__")) {
2072 return self.values_with_traversal(columns).await;
2073 }
2074 let meta = crate::migrate::ModelMeta::for_::<T>();
2075 // Resolve every requested name against the model's metadata
2076 // up front so an unknown column errors before any SQL runs.
2077 let mut chosen: Vec<&crate::migrate::Column> = Vec::with_capacity(columns.len());
2078 for name in columns {
2079 let col = meta
2080 .fields
2081 .iter()
2082 .find(|c| c.name == *name)
2083 .ok_or_else(|| {
2084 sqlx::Error::Protocol(format!(
2085 "umbral::orm::values: unknown column `{}` on model `{}`",
2086 name,
2087 T::NAME
2088 ))
2089 })?;
2090 chosen.push(col);
2091 }
2092 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2093 let backend = pool.backend_name();
2094 // Build the base query (predicates + ORDER BY) then swap its
2095 // SELECT list for only the requested columns.
2096 let mut q = self.build_query_for(backend);
2097 q.clear_selects();
2098 for col in &chosen {
2099 q.column(Alias::new(col.name.as_str()));
2100 }
2101 match pool {
2102 DbPool::Sqlite(pool) => {
2103 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2104 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2105 .fetch_all(&pool)
2106 .await?;
2107 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2108 for row in &rows {
2109 let mut obj = serde_json::Map::with_capacity(chosen.len());
2110 for col in &chosen {
2111 let v = crate::orm::dynamic::decode_to_json(row, col)?;
2112 obj.insert(col.name.clone(), v);
2113 }
2114 out.push(JsonValue::Object(obj));
2115 }
2116 Ok(out)
2117 }
2118 DbPool::Postgres(pool) => {
2119 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2120 let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2121 .fetch_all(&pool)
2122 .await?;
2123 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2124 for row in &rows {
2125 let mut obj = serde_json::Map::with_capacity(chosen.len());
2126 for col in &chosen {
2127 let v = crate::orm::dynamic::decode_pg_to_json(row, col)?;
2128 obj.insert(col.name.clone(), v);
2129 }
2130 out.push(JsonValue::Object(obj));
2131 }
2132 Ok(out)
2133 }
2134 }
2135 }
2136
2137 /// `.values("author__name")`-style traversal. One-hop only at
2138 /// v1 (`a__b`, not `a__b__c` — that fails loudly so the user
2139 /// doesn't get a silent partial result). Emits one LEFT JOIN
2140 /// per distinct relation referenced, aliases child columns as
2141 /// `<rel>__<col>`, and returns each row as a nested JSON
2142 /// object: `{id, title, author: {id, name}, editor: {id}}`.
2143 ///
2144 /// A LEFT JOIN miss (nullable FK pointing at nothing) maps the
2145 /// whole relation key to `Value::Null` rather than a nested
2146 /// object full of nulls — caller code that branches on
2147 /// `obj["author"].is_null()` works naturally.
2148 ///
2149 /// Validates every name up front so a typo errors before any
2150 /// SQL runs. Unknown parent col / non-FK relation name /
2151 /// unknown child col / deeper-than-one-hop path all surface
2152 /// distinct messages.
2153 async fn values_with_traversal(self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
2154 use sea_query::{Expr, Query};
2155 let meta = crate::migrate::ModelMeta::for_::<T>();
2156 let registered = crate::migrate::registered_models();
2157
2158 // Split each column name into (relation, child_col) or
2159 // (None, parent_col). Reject paths with more than one `__`
2160 // hop — nested traversal across two relation layers is a
2161 // separate piece of work (it'd need to chain JOINs and
2162 // build doubly-nested JSON).
2163 let mut parent_cols: Vec<String> = Vec::new();
2164 let mut per_rel: std::collections::BTreeMap<String, Vec<String>> =
2165 std::collections::BTreeMap::new();
2166 for raw in columns {
2167 let mut parts = raw.splitn(3, "__");
2168 let first = parts.next().unwrap_or("");
2169 let second = parts.next();
2170 let third = parts.next();
2171 if third.is_some() {
2172 return Err(sqlx::Error::Protocol(format!(
2173 "umbral::orm::values: nested `{raw}` is not supported in v1 \
2174 (one-hop only — `a__b`, not `a__b__c`)"
2175 )));
2176 }
2177 match second {
2178 Some(child) => {
2179 per_rel
2180 .entry(first.to_string())
2181 .or_default()
2182 .push(child.to_string());
2183 }
2184 None => parent_cols.push(first.to_string()),
2185 }
2186 }
2187
2188 // Validate every parent name against T::FIELDS.
2189 for name in &parent_cols {
2190 if !meta.fields.iter().any(|c| c.name == *name) {
2191 return Err(sqlx::Error::Protocol(format!(
2192 "umbral::orm::values: unknown column `{name}` on model `{}`",
2193 T::NAME
2194 )));
2195 }
2196 }
2197
2198 // Validate every relation + child trio. Build a struct per
2199 // relation that the SQL/decoder loops below need.
2200 struct RelInfo<'a> {
2201 rel_name: String,
2202 related_table: &'a str,
2203 related_pk: &'a crate::migrate::Column,
2204 child_cols: Vec<&'a crate::migrate::Column>,
2205 }
2206 let mut rel_infos: Vec<RelInfo<'_>> = Vec::with_capacity(per_rel.len());
2207 for (rel_name, child_names) in &per_rel {
2208 let fk_field = meta.fields.iter().find(|c| c.name == *rel_name);
2209 let Some(fk_field) = fk_field else {
2210 return Err(sqlx::Error::Protocol(format!(
2211 "umbral::orm::values: unknown relation `{rel_name}` on model `{}` \
2212 (used in `{rel_name}__...`)",
2213 T::NAME
2214 )));
2215 };
2216 let Some(related_table) = fk_field.fk_target.as_deref() else {
2217 return Err(sqlx::Error::Protocol(format!(
2218 "umbral::orm::values: field `{rel_name}` on `{}` is not a foreign \
2219 key — `__` traversal only works through FK fields",
2220 T::NAME
2221 )));
2222 };
2223 let Some(related_meta) = registered.iter().find(|m| m.table == related_table) else {
2224 return Err(sqlx::Error::Protocol(format!(
2225 "umbral::orm::values: related model for table `{related_table}` \
2226 is not registered"
2227 )));
2228 };
2229 let Some(related_pk) = related_meta.fields.iter().find(|c| c.primary_key) else {
2230 return Err(sqlx::Error::Protocol(format!(
2231 "umbral::orm::values: related model `{related_table}` has no \
2232 primary key column"
2233 )));
2234 };
2235 let mut child_cols: Vec<&crate::migrate::Column> =
2236 Vec::with_capacity(child_names.len());
2237 for name in child_names {
2238 let col = related_meta
2239 .fields
2240 .iter()
2241 .find(|c| c.name == *name)
2242 .ok_or_else(|| {
2243 sqlx::Error::Protocol(format!(
2244 "umbral::orm::values: unknown child column `{name}` on \
2245 related model `{related_table}` (full path `{rel_name}__{name}`)"
2246 ))
2247 })?;
2248 child_cols.push(col);
2249 }
2250 rel_infos.push(RelInfo {
2251 rel_name: rel_name.clone(),
2252 related_table,
2253 related_pk,
2254 child_cols,
2255 });
2256 }
2257
2258 // Build the SQL. Subquery-wrap the parent (WHERE / ORDER
2259 // BY / LIMIT all stay scoped to it) so JOIN'd tables can't
2260 // shadow bare-column predicates — same trick
2261 // apply_join_related uses.
2262 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2263 let backend = pool.backend_name();
2264 let inner = self.build_query_for(backend);
2265 let parent_alias = Alias::new("__p");
2266 let mut outer = Query::select();
2267 outer.from_subquery(inner, parent_alias.clone());
2268 // Outer SELECT: parent cols (bare), aliased child cols.
2269 for name in &parent_cols {
2270 outer.expr_as(
2271 Expr::col((parent_alias.clone(), Alias::new(name.as_str()))),
2272 Alias::new(name.as_str()),
2273 );
2274 }
2275 for info in &rel_infos {
2276 let join_alias = Alias::new(format!("__j_{}", info.rel_name));
2277 outer.join_as(
2278 sea_query::JoinType::LeftJoin,
2279 crate::db::router::schema_qualified_table(info.related_table),
2280 join_alias.clone(),
2281 Expr::col((parent_alias.clone(), Alias::new(info.rel_name.as_str()))).equals((
2282 join_alias.clone(),
2283 Alias::new(info.related_pk.name.as_str()),
2284 )),
2285 );
2286 // Always include the related PK alias so the decoder
2287 // can detect a LEFT JOIN miss → emit Value::Null for
2288 // the whole relation. Plus every requested child col.
2289 let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2290 outer.expr_as(
2291 Expr::col((
2292 join_alias.clone(),
2293 Alias::new(info.related_pk.name.as_str()),
2294 )),
2295 Alias::new(pk_alias),
2296 );
2297 for col in &info.child_cols {
2298 let alias = format!("{}__{}", info.rel_name, col.name);
2299 outer.expr_as(
2300 Expr::col((join_alias.clone(), Alias::new(col.name.as_str()))),
2301 Alias::new(alias),
2302 );
2303 }
2304 }
2305
2306 // Execute + decode. Per-backend dispatch.
2307 match pool {
2308 DbPool::Sqlite(pool) => {
2309 let (sql, vals) = outer.build_sqlx(SqliteQueryBuilder);
2310 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2311 .fetch_all(&pool)
2312 .await?;
2313 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2314 for row in &rows {
2315 let mut obj = serde_json::Map::new();
2316 // Parent cols decode by their bare alias name.
2317 for name in &parent_cols {
2318 if let Some(col) = meta.fields.iter().find(|c| c.name == *name) {
2319 let v = crate::orm::dynamic::decode_to_json_aliased(row, col, name)?;
2320 obj.insert(name.clone(), v);
2321 }
2322 }
2323 // Per-relation nested object — `null` on LEFT JOIN miss.
2324 for info in &rel_infos {
2325 let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2326 let pk_is_null =
2327 backend_sqlite::joined_pk_is_null(row, &info.related_pk, &pk_alias);
2328 if pk_is_null {
2329 obj.insert(info.rel_name.clone(), JsonValue::Null);
2330 continue;
2331 }
2332 let mut nested = serde_json::Map::with_capacity(info.child_cols.len());
2333 for col in &info.child_cols {
2334 let alias = format!("{}__{}", info.rel_name, col.name);
2335 let v = crate::orm::dynamic::decode_to_json_aliased(row, col, &alias)?;
2336 nested.insert(col.name.clone(), v);
2337 }
2338 obj.insert(info.rel_name.clone(), JsonValue::Object(nested));
2339 }
2340 out.push(JsonValue::Object(obj));
2341 }
2342 Ok(out)
2343 }
2344 DbPool::Postgres(pool) => {
2345 let (sql, vals) = outer.build_sqlx(PostgresQueryBuilder);
2346 let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2347 .fetch_all(&pool)
2348 .await?;
2349 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2350 for row in &rows {
2351 let mut obj = serde_json::Map::new();
2352 for name in &parent_cols {
2353 if let Some(col) = meta.fields.iter().find(|c| c.name == *name) {
2354 let v = crate::orm::dynamic::decode_pg_to_json_aliased(row, col, name)?;
2355 obj.insert(name.clone(), v);
2356 }
2357 }
2358 for info in &rel_infos {
2359 let pk_alias = format!("{}__{}", info.rel_name, info.related_pk.name);
2360 let pk_is_null =
2361 backend_pg::joined_pk_is_null(row, &info.related_pk, &pk_alias);
2362 if pk_is_null {
2363 obj.insert(info.rel_name.clone(), JsonValue::Null);
2364 continue;
2365 }
2366 let mut nested = serde_json::Map::with_capacity(info.child_cols.len());
2367 for col in &info.child_cols {
2368 let alias = format!("{}__{}", info.rel_name, col.name);
2369 let v =
2370 crate::orm::dynamic::decode_pg_to_json_aliased(row, col, &alias)?;
2371 nested.insert(col.name.clone(), v);
2372 }
2373 obj.insert(info.rel_name.clone(), JsonValue::Object(nested));
2374 }
2375 out.push(JsonValue::Object(obj));
2376 }
2377 Ok(out)
2378 }
2379 }
2380 }
2381
2382 /// Single-row aggregate. Runs `SELECT AGG(col) AS name, ...` with
2383 /// the QuerySet's accumulated WHERE clause (ORDER BY / LIMIT /
2384 /// OFFSET are dropped — they make no sense over an aggregate
2385 /// without GROUP BY).
2386 ///
2387 /// Returns a `serde_json::Value::Object` keyed by the supplied
2388 /// names. COUNT comes back as an integer; AVG as a float; SUM /
2389 /// MAX / MIN inherit the source column's declared type.
2390 ///
2391 /// ```rust,ignore
2392 /// use umbral::orm::Aggregate;
2393 /// let summary = Post::objects()
2394 /// .filter(post::PUBLISHED.eq(true))
2395 /// .aggregate(&[
2396 /// ("count", Aggregate::count()),
2397 /// ("total", Aggregate::sum("view_count")),
2398 /// ])
2399 /// .await?;
2400 /// // { "count": 42, "total": 9999 }
2401 /// ```
2402 pub async fn aggregate(
2403 self,
2404 aggs: &[(&str, crate::orm::Aggregate)],
2405 ) -> Result<JsonValue, sqlx::Error> {
2406 let meta = crate::migrate::ModelMeta::for_::<T>();
2407 // Validate every aggregate's source column exists.
2408 for (name, agg) in aggs {
2409 if let Some(col) = agg.source_column()
2410 && !meta.fields.iter().any(|c| c.name == col)
2411 {
2412 return Err(sqlx::Error::Protocol(format!(
2413 "umbral::orm::aggregate: unknown column `{}` on model `{}` for aggregate `{}`",
2414 col,
2415 T::NAME,
2416 name
2417 )));
2418 }
2419 }
2420 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2421 let backend = pool.backend_name();
2422 let mut q = self.build_query_for(backend);
2423 q.clear_selects();
2424 q.reset_limit();
2425 q.reset_offset();
2426 for (name, agg) in aggs {
2427 q.expr_as(agg.to_simple_expr(), Alias::new(*name));
2428 }
2429 match pool {
2430 DbPool::Sqlite(pool) => {
2431 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2432 let row = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2433 .fetch_one(&pool)
2434 .await?;
2435 let mut obj = serde_json::Map::with_capacity(aggs.len());
2436 for (name, agg) in aggs {
2437 let source_ty = agg
2438 .source_column()
2439 .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2440 obj.insert(
2441 name.to_string(),
2442 backend_sqlite::decode_agg(&row, name, agg, source_ty)?,
2443 );
2444 }
2445 Ok(JsonValue::Object(obj))
2446 }
2447 DbPool::Postgres(pool) => {
2448 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2449 let row = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2450 .fetch_one(&pool)
2451 .await?;
2452 let mut obj = serde_json::Map::with_capacity(aggs.len());
2453 for (name, agg) in aggs {
2454 let source_ty = agg
2455 .source_column()
2456 .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2457 obj.insert(
2458 name.to_string(),
2459 backend_pg::decode_agg(&row, name, agg, source_ty)?,
2460 );
2461 }
2462 Ok(JsonValue::Object(obj))
2463 }
2464 }
2465 }
2466
2467 /// Grouped aggregate. Runs `SELECT <group_cols>, AGG(col) AS name,
2468 /// ... GROUP BY <group_cols>` with the accumulated WHERE clause.
2469 ///
2470 /// Returns one `Value::Object` per group, with both the group
2471 /// columns and each named aggregate as fields. Group columns are
2472 /// decoded per their declared SqlType (so an integer
2473 /// `author_id` stays a JSON number).
2474 ///
2475 /// ```rust,ignore
2476 /// let by_author = Post::objects()
2477 /// .annotate(&["author_id"], &[("count", Aggregate::count())])
2478 /// .await?;
2479 /// // [ { "author_id": 1, "count": 3 }, { "author_id": 2, "count": 2 } ]
2480 /// ```
2481 pub async fn annotate(
2482 self,
2483 group_cols: &[&str],
2484 aggs: &[(&str, crate::orm::Aggregate)],
2485 ) -> Result<Vec<JsonValue>, sqlx::Error> {
2486 let meta = crate::migrate::ModelMeta::for_::<T>();
2487 // Resolve group columns up front so unknown names fail
2488 // before any SQL runs.
2489 let mut chosen_groups: Vec<&crate::migrate::Column> = Vec::with_capacity(group_cols.len());
2490 for name in group_cols {
2491 let col = meta
2492 .fields
2493 .iter()
2494 .find(|c| c.name == *name)
2495 .ok_or_else(|| {
2496 sqlx::Error::Protocol(format!(
2497 "umbral::orm::annotate: unknown group column `{}` on model `{}`",
2498 name,
2499 T::NAME
2500 ))
2501 })?;
2502 chosen_groups.push(col);
2503 }
2504 // Validate aggregate source columns.
2505 for (name, agg) in aggs {
2506 if let Some(col) = agg.source_column()
2507 && !meta.fields.iter().any(|c| c.name == col)
2508 {
2509 return Err(sqlx::Error::Protocol(format!(
2510 "umbral::orm::annotate: unknown column `{}` on model `{}` for aggregate `{}`",
2511 col,
2512 T::NAME,
2513 name
2514 )));
2515 }
2516 }
2517 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2518 let backend = pool.backend_name();
2519 let mut q = self.build_query_for(backend);
2520 q.clear_selects();
2521 // GROUP BY columns appear in the SELECT list AND the GROUP BY
2522 // clause. Aggregates only in the SELECT.
2523 for col in &chosen_groups {
2524 q.column(Alias::new(col.name.as_str()));
2525 q.add_group_by([sea_query::SimpleExpr::Column(sea_query::ColumnRef::Column(
2526 Alias::new(col.name.as_str()).into_iden(),
2527 ))]);
2528 }
2529 for (name, agg) in aggs {
2530 q.expr_as(agg.to_simple_expr(), Alias::new(*name));
2531 }
2532 match pool {
2533 DbPool::Sqlite(pool) => {
2534 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2535 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2536 .fetch_all(&pool)
2537 .await?;
2538 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2539 for row in &rows {
2540 let mut obj = serde_json::Map::with_capacity(chosen_groups.len() + aggs.len());
2541 for col in &chosen_groups {
2542 obj.insert(
2543 col.name.clone(),
2544 crate::orm::dynamic::decode_to_json(row, col)?,
2545 );
2546 }
2547 for (name, agg) in aggs {
2548 let source_ty = agg
2549 .source_column()
2550 .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2551 obj.insert(
2552 name.to_string(),
2553 backend_sqlite::decode_agg(row, name, agg, source_ty)?,
2554 );
2555 }
2556 out.push(JsonValue::Object(obj));
2557 }
2558 Ok(out)
2559 }
2560 DbPool::Postgres(pool) => {
2561 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2562 let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2563 .fetch_all(&pool)
2564 .await?;
2565 let mut out: Vec<JsonValue> = Vec::with_capacity(rows.len());
2566 for row in &rows {
2567 let mut obj = serde_json::Map::with_capacity(chosen_groups.len() + aggs.len());
2568 for col in &chosen_groups {
2569 obj.insert(
2570 col.name.clone(),
2571 crate::orm::dynamic::decode_pg_to_json(row, col)?,
2572 );
2573 }
2574 for (name, agg) in aggs {
2575 let source_ty = agg
2576 .source_column()
2577 .and_then(|c| meta.fields.iter().find(|f| f.name == c).map(|f| f.ty));
2578 obj.insert(
2579 name.to_string(),
2580 backend_pg::decode_agg(row, name, agg, source_ty)?,
2581 );
2582 }
2583 out.push(JsonValue::Object(obj));
2584 }
2585 Ok(out)
2586 }
2587 }
2588 }
2589
2590 /// The chainable `annotate(alias=Agg("relation"))`: attach a
2591 /// related-aggregate annotation to this QuerySet. The annotation
2592 /// is **query-builder state** — it renders as a correlated scalar
2593 /// subquery inside the one SELECT every terminal builds, so it
2594 /// composes with `.filter` / `.order_by` / `.limit`, stacks with
2595 /// further annotations, and shows up in [`Self::explain`] /
2596 /// [`Self::to_sql`] out of the box. Never a side query, never an
2597 /// N+1.
2598 ///
2599 /// `relation` names a `ReverseSet` relation on the model
2600 /// (`#[umbral(reverse_fk = "...")]`), the same names
2601 /// `prefetch_related` accepts. When no declared relation matches,
2602 /// the resolver AUTO-DISCOVERS the relation (gaps2 #45): it scans
2603 /// the model registry for any child whose FK targets this parent's
2604 /// table and matches `relation` against the child's conventional
2605 /// name forms (table name, `snake_case` / lowercase struct name,
2606 /// any of those with a `_set` suffix). Declared relations always
2607 /// take precedence; an ambiguous auto-match (two children, or a
2608 /// child with two FKs to this parent) poisons the annotation with
2609 /// an error that names the candidates and points at the
2610 /// `#[umbral(reverse_fk = "...")]` escape hatch. Any
2611 /// [`crate::orm::Aggregate`] works; non-count aggregates name a
2612 /// column on the CHILD model:
2613 ///
2614 /// ```rust,ignore
2615 /// let rows = Plugin::objects()
2616 /// .filter(plugin::MODERATION.eq("approved"))
2617 /// .annotate_count("comment_set") // COUNT(*)
2618 /// .annotate_related("rating_avg", "review_set", Aggregate::avg("rating"))
2619 /// .fetch_annotated()
2620 /// .await?; // Vec<(Plugin, Map<alias, value>)>
2621 /// ```
2622 ///
2623 /// An unknown relation name doesn't panic the (infallible)
2624 /// builder — it poisons the annotation, and every fallible
2625 /// consumer (`fetch_annotated`, `explain`) reports it loudly.
2626 /// v1 caveats: child rows aggregate unconditionally — a
2627 /// child-side predicate (a filtered count)
2628 /// and child soft-delete awareness are tracked follow-ups
2629 /// (gaps2 #39).
2630 pub fn annotate_related(
2631 mut self,
2632 alias: &str,
2633 relation: &str,
2634 agg: crate::orm::Aggregate,
2635 ) -> Self {
2636 let rev_spec = T::REVERSE_FK_RELATIONS
2637 .iter()
2638 .find(|r| r.field_name == relation);
2639 let m2m_spec = T::M2M_RELATIONS.iter().find(|r| r.field_name == relation);
2640
2641 let pk = T::FIELDS
2642 .iter()
2643 .find(|f| f.primary_key)
2644 .map(|f| f.name)
2645 .unwrap_or("id");
2646
2647 let mut child_soft_delete = false;
2648 let mut m2m_junction: Option<String> = None;
2649
2650 let resolved = if let Some(spec) = rev_spec {
2651 child_soft_delete = spec.soft_delete;
2652 Ok((
2653 spec.target_table.to_string(),
2654 spec.fk_column.to_string(),
2655 T::TABLE.to_string(),
2656 pk.to_string(),
2657 ))
2658 } else if let Some(spec) = m2m_spec {
2659 // M2M count: junction table = "<parent>_<field>", columns
2660 // parent_id / child_id. The subquery counts junction rows.
2661 m2m_junction = Some(format!("{}_{}", T::TABLE, spec.field_name));
2662 // child_table / fk_column are unused for the M2M shape, but
2663 // the tuple still carries parent_table + parent_pk for the
2664 // correlation in build_query_for.
2665 Ok((
2666 spec.target_table.to_string(),
2667 "child_id".to_string(),
2668 T::TABLE.to_string(),
2669 pk.to_string(),
2670 ))
2671 } else {
2672 // No DECLARED relation matched. Fall back to auto-discovery
2673 // (gaps2 #45): scan the model registry for any child whose
2674 // FK points back at this parent's table, and match `relation`
2675 // against the conventional name forms. Declared relations
2676 // always win above; this only runs as a fallback.
2677 match discover_reverse_relation::<T>(relation) {
2678 AutoDiscovery::Resolved {
2679 child_table,
2680 fk_column,
2681 soft_delete,
2682 } => {
2683 child_soft_delete = soft_delete;
2684 Ok((child_table, fk_column, T::TABLE.to_string(), pk.to_string()))
2685 }
2686 AutoDiscovery::Ambiguous(candidates) => Err(format!(
2687 "umbral::orm::annotate_related: ambiguous reverse relation `{relation}` on `{}` — candidates: [{}]; declare a `#[umbral(reverse_fk = \"<fk>\")] ReverseSet<Child>` field to disambiguate",
2688 T::NAME,
2689 candidates.join(", "),
2690 )),
2691 AutoDiscovery::NotFound(discoverable) => {
2692 let declared = T::REVERSE_FK_RELATIONS
2693 .iter()
2694 .map(|r| r.field_name)
2695 .collect::<Vec<_>>()
2696 .join(", ");
2697 let m2m = T::M2M_RELATIONS
2698 .iter()
2699 .map(|r| r.field_name)
2700 .collect::<Vec<_>>()
2701 .join(", ");
2702 Err(format!(
2703 "umbral::orm::annotate_related: `{relation}` is not a reverse-FK or M2M relation on `{}` — reverse-FK relations: [{declared}], M2M relations: [{m2m}], auto-discoverable children: [{}]",
2704 T::NAME,
2705 discoverable.join(", "),
2706 ))
2707 }
2708 }
2709 };
2710
2711 self.annotations.push(RelatedAnnotation {
2712 alias: alias.to_string(),
2713 agg,
2714 resolved,
2715 child_soft_delete,
2716 child_filter: None,
2717 m2m_junction,
2718 });
2719 self
2720 }
2721
2722 /// Sugar for the overwhelmingly common annotation:
2723 /// `annotate_related("<relation>_count", relation, Aggregate::count())`.
2724 /// `.annotate_count("comment_set")` exposes the value under the
2725 /// `comment_set_count` alias in [`Self::fetch_annotated`].
2726 pub fn annotate_count(self, relation: &str) -> Self {
2727 let alias = format!("{relation}_count");
2728 self.annotate_related(&alias, relation, crate::orm::Aggregate::count())
2729 }
2730
2731 /// Like [`Self::annotate_count`] but counts only the children
2732 /// matching `pred` — a filtered count over `"comments"`.
2733 /// `C` is the CHILD model, so the predicate is typed against the
2734 /// child's columns (`comment::MODERATION.eq("visible")`). The
2735 /// predicate renders into the correlated count subquery's WHERE
2736 /// alongside the FK correlation and the auto soft-delete filter.
2737 ///
2738 /// ```rust,ignore
2739 /// Plugin::objects()
2740 /// .annotate_count_where::<PluginComment>(
2741 /// "visible_comments",
2742 /// "comment_set",
2743 /// plugin_comment::MODERATION.eq("visible"),
2744 /// )
2745 /// ```
2746 pub fn annotate_count_where<C: crate::orm::Model>(
2747 self,
2748 alias: &str,
2749 relation: &str,
2750 pred: crate::orm::Predicate<C>,
2751 ) -> Self {
2752 // Render the child predicate to a backend-default SimpleExpr.
2753 // The count subquery embeds one expression; the equality /
2754 // comparison predicates used for child filters render the same
2755 // on both backends, so the default `cond` is correct.
2756 let child_filter = pred.cond_for("postgres");
2757 let mut queryset = self.annotate_related(alias, relation, crate::orm::Aggregate::count());
2758 // The just-pushed annotation is the last one; attach the filter.
2759 if let Some(last) = queryset.annotations.last_mut() {
2760 last.child_filter = Some(child_filter);
2761 }
2762 queryset
2763 }
2764
2765 /// Loud-failure check for poisoned annotations (unknown relation
2766 /// names recorded by the infallible builder). Called by every
2767 /// fallible consumer before SQL runs.
2768 fn check_annotations(&self) -> Result<(), sqlx::Error> {
2769 for ann in &self.annotations {
2770 if let Err(msg) = &ann.resolved {
2771 return Err(sqlx::Error::Protocol(msg.clone()));
2772 }
2773 }
2774 Ok(())
2775 }
2776
2777 /// Run the SELECT and return every matching row **with its
2778 /// annotation values** — the execution terminal for
2779 /// [`Self::annotate_related`] / [`Self::annotate_count`]. One
2780 /// query; each row's annotations arrive as an `alias → JSON
2781 /// value` map (count → integer, AVG → float/null, SUM/MAX/MIN →
2782 /// typed per the child column, NULL on empty sets for the
2783 /// non-count aggregates).
2784 pub async fn fetch_annotated(
2785 self,
2786 ) -> Result<Vec<(T, serde_json::Map<String, JsonValue>)>, sqlx::Error>
2787 where
2788 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
2789 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
2790 {
2791 self.check_annotations()?;
2792 // Child-column types for SUM/MAX/MIN decoding, resolved from
2793 // the runtime registry (the child model is known by table
2794 // name only). `None` falls back to decode_agg's string path.
2795 let source_types: Vec<(String, crate::orm::Aggregate, Option<crate::orm::SqlType>)> = {
2796 let registry_up = crate::migrate::is_initialised();
2797 self.annotations
2798 .iter()
2799 .map(|ann| {
2800 let ty = match (&ann.resolved, registry_up, ann.agg.source_column()) {
2801 (Ok((child_table, ..)), true, Some(col)) => {
2802 crate::migrate::registered_models()
2803 .into_iter()
2804 .find(|m| m.table == *child_table)
2805 .and_then(|m| m.fields.iter().find(|f| f.name == col).map(|f| f.ty))
2806 }
2807 _ => None,
2808 };
2809 (ann.alias.clone(), ann.agg.clone(), ty)
2810 })
2811 .collect()
2812 };
2813
2814 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Read);
2815 match pool {
2816 DbPool::Sqlite(pool) => {
2817 let q = self.build_query_for("sqlite");
2818 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
2819 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, vals)
2820 .fetch_all(&pool)
2821 .await?;
2822 let mut out = Vec::with_capacity(rows.len());
2823 for row in &rows {
2824 let t = <T as sqlx::FromRow<_>>::from_row(row)?;
2825 let mut anns = serde_json::Map::with_capacity(source_types.len());
2826 for (alias, agg, ty) in &source_types {
2827 anns.insert(
2828 alias.clone(),
2829 backend_sqlite::decode_agg(row, alias, agg, *ty)?,
2830 );
2831 }
2832 out.push((t, anns));
2833 }
2834 Ok(out)
2835 }
2836 DbPool::Postgres(pool) => {
2837 let q = self.build_query_for("postgres");
2838 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
2839 let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, vals)
2840 .fetch_all(&pool)
2841 .await?;
2842 let mut out = Vec::with_capacity(rows.len());
2843 for row in &rows {
2844 let t = <T as sqlx::FromRow<_>>::from_row(row)?;
2845 let mut anns = serde_json::Map::with_capacity(source_types.len());
2846 for (alias, agg, ty) in &source_types {
2847 anns.insert(alias.clone(), backend_pg::decode_agg(row, alias, agg, *ty)?);
2848 }
2849 out.push((t, anns));
2850 }
2851 Ok(out)
2852 }
2853 }
2854 }
2855
2856 /// `DELETE FROM table WHERE <predicates>`. Returns the number of
2857 /// rows deleted. With no `.filter` calls, deletes every row.
2858 ///
2859 /// Fires `bulk_post_delete:<table>` once with the list of removed
2860 /// PKs when at least one row was deleted. Per-row `pre_delete` /
2861 /// `post_delete` are NOT fired by this path — use
2862 /// [`Manager::delete_instance`] when per-row signal semantics are
2863 /// required.
2864 ///
2865 /// Feature #72: for `#[umbral(soft_delete)]` models this rewrites
2866 /// to `UPDATE ... SET deleted_at = NOW() WHERE ...` so rows
2867 /// survive in the DB (filtered out of subsequent queries by the
2868 /// auto `WHERE deleted_at IS NULL`). Call `.hard_delete()`
2869 /// beforehand for a real DELETE (GDPR purge, test cleanup).
2870 pub async fn delete(self) -> Result<u64, sqlx::Error> {
2871 // Feature #72 — soft-delete redirect. The whole `delete()`
2872 // contract collapses to an UPDATE setting `deleted_at`. We
2873 // keep the bulk_post_delete signal so subscribers see the
2874 // same event shape regardless of the underlying SQL.
2875 if self.soft_delete_active && !self.hard_delete {
2876 return self.soft_delete_update().await;
2877 }
2878 let atomic = self.should_atomic_wrap();
2879 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
2880 let backend = pool.backend_name();
2881 let mut stmt = self.build_delete_for(backend);
2882 let pk = pk_field::<T>();
2883 if let Some(field) = pk {
2884 stmt.returning_col(Alias::new(field.name));
2885 }
2886 let ids: Vec<JsonValue> = match pool {
2887 DbPool::Sqlite(pool) => {
2888 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
2889 let rows = if atomic {
2890 let mut tx = pool.begin().await?;
2891 let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
2892 .fetch_all(&mut *tx)
2893 .await;
2894 match r {
2895 Ok(rows) => {
2896 tx.commit().await?;
2897 rows
2898 }
2899 Err(e) => {
2900 let _ = tx.rollback().await;
2901 return Err(e);
2902 }
2903 }
2904 } else {
2905 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
2906 .fetch_all(&pool)
2907 .await?
2908 };
2909 match pk {
2910 Some(field) => rows
2911 .iter()
2912 .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
2913 .collect::<Result<_, _>>()?,
2914 None => Vec::new(),
2915 }
2916 }
2917 DbPool::Postgres(pool) => {
2918 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
2919 let rows = if atomic {
2920 let mut tx = pool.begin().await?;
2921 let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
2922 .fetch_all(&mut *tx)
2923 .await;
2924 match r {
2925 Ok(rows) => {
2926 tx.commit().await?;
2927 rows
2928 }
2929 Err(e) => {
2930 let _ = tx.rollback().await;
2931 return Err(e);
2932 }
2933 }
2934 } else {
2935 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
2936 .fetch_all(&pool)
2937 .await?
2938 };
2939 match pk {
2940 Some(field) => rows
2941 .iter()
2942 .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
2943 .collect::<Result<_, _>>()?,
2944 None => Vec::new(),
2945 }
2946 }
2947 };
2948 let count = ids.len() as u64;
2949 if !ids.is_empty() {
2950 crate::signals::emit_bulk_post_delete::<T>(ids).await;
2951 }
2952 Ok(count)
2953 }
2954
2955 /// `UPDATE table SET col = <expr> WHERE <predicates>` using an
2956 /// F-expression for the new value.
2957 ///
2958 /// This is the companion to `update_values` for atomic column
2959 /// arithmetic. `F::col("views").add(1)` produces an [`FExpr`] that
2960 /// renders as `SET views = views + 1` — the database computes the
2961 /// increment atomically on the server side rather than needing a
2962 /// read-modify-write round-trip in application code.
2963 ///
2964 /// ```rust,ignore
2965 /// use umbral::orm::F;
2966 ///
2967 /// Post::objects()
2968 /// .filter(post::ID.eq(42))
2969 /// .update_expr("views", F::col("views").add(1))
2970 /// .await?;
2971 /// ```
2972 ///
2973 /// Mixing `update_values` and `update_expr` for different columns in
2974 /// one statement requires two separate calls. A combined API (a map
2975 /// where values can be either JSON or FExpr) would require a new sum
2976 /// type; deferred until a consumer surfaces the need.
2977 pub async fn update_expr(
2978 self,
2979 col_name: &str,
2980 expr: FExpr,
2981 ) -> Result<u64, crate::orm::write::WriteError> {
2982 use crate::orm::write::WriteError;
2983 // Validate the column exists on the model.
2984 let field = T::FIELDS
2985 .iter()
2986 .find(|f| f.name == col_name)
2987 .ok_or_else(|| WriteError::UnknownColumn {
2988 field: col_name.to_string(),
2989 })?;
2990 if field.primary_key {
2991 // Silently skip PK rewrites, same as update_values.
2992 return Ok(0);
2993 }
2994 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
2995 let backend = pool.backend_name();
2996
2997 let mut stmt = sea_query::Query::update();
2998 stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
2999 stmt.value(Alias::new(field.name), expr.to_simple_expr());
3000 for p in &self.predicates {
3001 stmt.and_where(p.cond_for(backend));
3002 }
3003 if self.soft_delete_active {
3004 if self.only_deleted {
3005 stmt.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
3006 } else if !self.with_deleted {
3007 stmt.and_where(Expr::col(Alias::new("deleted_at")).is_null());
3008 }
3009 }
3010 let pk = pk_field::<T>();
3011 if let Some(pkf) = pk {
3012 stmt.returning_col(Alias::new(pkf.name));
3013 }
3014
3015 let ids: Vec<JsonValue> = match pool {
3016 DbPool::Sqlite(pool) => {
3017 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3018 let rows = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3019 .fetch_all(&pool)
3020 .await?;
3021 match pk {
3022 Some(pkf) => rows
3023 .iter()
3024 .map(|r| backend_sqlite::pk_to_json(r, pkf.name, pkf.ty))
3025 .collect::<Result<_, _>>()
3026 .map_err(crate::orm::write::WriteError::Sqlx)?,
3027 None => Vec::new(),
3028 }
3029 }
3030 DbPool::Postgres(pool) => {
3031 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3032 let rows = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3033 .fetch_all(&pool)
3034 .await?;
3035 match pk {
3036 Some(pkf) => rows
3037 .iter()
3038 .map(|r| backend_pg::pk_to_json(r, pkf.name, pkf.ty))
3039 .collect::<Result<_, _>>()
3040 .map_err(crate::orm::write::WriteError::Sqlx)?,
3041 None => Vec::new(),
3042 }
3043 }
3044 };
3045 let count = ids.len() as u64;
3046 if !ids.is_empty() {
3047 crate::signals::emit_bulk_post_save::<T>(ids, false).await;
3048 }
3049 Ok(count)
3050 }
3051
3052 /// `UPDATE table SET k=v[, ...] WHERE <predicates>`. The values
3053 /// map provides `column_name → JSON value` pairs; each is
3054 /// converted to a `sea_query::Value` per the column's declared
3055 /// `SqlType` via [`crate::orm::write::json_to_sea_value`]. Returns
3056 /// the number of rows affected.
3057 ///
3058 /// Unknown columns in the map fail loudly with
3059 /// `WriteError::UnknownColumn`. JSON `null` is rejected for
3060 /// non-nullable columns; supplying a column that exists but is
3061 /// absent from the map is silently a no-op (the column keeps its
3062 /// current value — PATCH semantics, not PUT).
3063 ///
3064 /// Fires `bulk_post_save:<table>` once with `{ ids, created:
3065 /// false, actor }` when at least one row matched. Per-row
3066 /// `pre_save` / `post_save` are NOT fired — use [`Manager::save`]
3067 /// when per-row signal semantics are required.
3068 pub async fn update_values(
3069 self,
3070 values: serde_json::Map<String, serde_json::Value>,
3071 ) -> Result<u64, crate::orm::write::WriteError> {
3072 let atomic = self.should_atomic_wrap();
3073 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
3074 let backend = pool.backend_name();
3075 let mut stmt = self.build_update_for(backend, &values)?;
3076 // RETURNING <pk> so bulk_post_save can include the matched ids.
3077 let pk = pk_field::<T>();
3078 if let Some(field) = pk {
3079 stmt.returning_col(Alias::new(field.name));
3080 }
3081 let ids: Vec<JsonValue> = match pool {
3082 DbPool::Sqlite(pool) => {
3083 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3084 let rows = if atomic {
3085 let mut tx = pool
3086 .begin()
3087 .await
3088 .map_err(crate::orm::write::WriteError::Sqlx)?;
3089 let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3090 .fetch_all(&mut *tx)
3091 .await;
3092 match r {
3093 Ok(rows) => {
3094 tx.commit()
3095 .await
3096 .map_err(crate::orm::write::WriteError::Sqlx)?;
3097 rows
3098 }
3099 Err(e) => {
3100 let _ = tx.rollback().await;
3101 return Err(crate::orm::write::WriteError::Sqlx(e));
3102 }
3103 }
3104 } else {
3105 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3106 .fetch_all(&pool)
3107 .await?
3108 };
3109 match pk {
3110 Some(field) => rows
3111 .iter()
3112 .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3113 .collect::<Result<_, _>>()
3114 .map_err(crate::orm::write::WriteError::Sqlx)?,
3115 None => Vec::new(),
3116 }
3117 }
3118 DbPool::Postgres(pool) => {
3119 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3120 let rows = if atomic {
3121 let mut tx = pool
3122 .begin()
3123 .await
3124 .map_err(crate::orm::write::WriteError::Sqlx)?;
3125 let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3126 .fetch_all(&mut *tx)
3127 .await;
3128 match r {
3129 Ok(rows) => {
3130 tx.commit()
3131 .await
3132 .map_err(crate::orm::write::WriteError::Sqlx)?;
3133 rows
3134 }
3135 Err(e) => {
3136 let _ = tx.rollback().await;
3137 return Err(crate::orm::write::WriteError::Sqlx(e));
3138 }
3139 }
3140 } else {
3141 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3142 .fetch_all(&pool)
3143 .await?
3144 };
3145 match pk {
3146 Some(field) => rows
3147 .iter()
3148 .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3149 .collect::<Result<_, _>>()
3150 .map_err(crate::orm::write::WriteError::Sqlx)?,
3151 None => Vec::new(),
3152 }
3153 }
3154 };
3155 let count = ids.len() as u64;
3156 if !ids.is_empty() {
3157 crate::signals::emit_bulk_post_save::<T>(ids, false).await;
3158 }
3159 Ok(count)
3160 }
3161
3162 /// Helper: build the DELETE statement for the active backend.
3163 /// Public-by-virtue-of-being-pub(crate) so the `_pg` and (future)
3164 /// `_sqlite` explicit-pool variants can share the SQL builder.
3165 fn build_delete_for(&self, backend_name: &str) -> sea_query::DeleteStatement {
3166 let mut stmt = Query::delete();
3167 stmt.from_table(crate::db::router::schema_qualified_table(T::TABLE));
3168 for p in &self.predicates {
3169 stmt.and_where(p.cond_for(backend_name));
3170 }
3171 stmt
3172 }
3173
3174 /// Feature #72 — soft-delete rewrite: turn `DELETE FROM table
3175 /// WHERE ...` into `UPDATE table SET deleted_at = NOW() WHERE
3176 /// ... AND deleted_at IS NULL`. The trailing `IS NULL` guard
3177 /// makes the operation idempotent: re-soft-deleting an already-
3178 /// soft-deleted row doesn't bump its timestamp. Fires
3179 /// `bulk_post_delete:<table>` so subscribers see the same event
3180 /// shape as a hard delete.
3181 async fn soft_delete_update(self) -> Result<u64, sqlx::Error> {
3182 let atomic = self.should_atomic_wrap();
3183 let pool = resolve_pool::<T>(self.explicit_pool.clone(), crate::db::RouteOp::Write);
3184 let backend = pool.backend_name();
3185 let now = chrono::Utc::now();
3186 let mut stmt = sea_query::Query::update();
3187 stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
3188 stmt.value(
3189 Alias::new("deleted_at"),
3190 sea_query::Value::ChronoDateTimeUtc(Some(Box::new(now))),
3191 );
3192 for p in &self.predicates {
3193 stmt.and_where(p.cond_for(backend));
3194 }
3195 // Idempotency guard — never bump an already-set deleted_at.
3196 stmt.and_where(sea_query::Expr::col(Alias::new("deleted_at")).is_null());
3197 let pk = pk_field::<T>();
3198 if let Some(pkf) = pk {
3199 stmt.returning_col(Alias::new(pkf.name));
3200 }
3201 let ids: Vec<JsonValue> = match pool {
3202 DbPool::Sqlite(pool) => {
3203 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3204 let rows = if atomic {
3205 let mut tx = pool.begin().await?;
3206 let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3207 .fetch_all(&mut *tx)
3208 .await;
3209 match r {
3210 Ok(rows) => {
3211 tx.commit().await?;
3212 rows
3213 }
3214 Err(e) => {
3215 let _ = tx.rollback().await;
3216 return Err(e);
3217 }
3218 }
3219 } else {
3220 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3221 .fetch_all(&pool)
3222 .await?
3223 };
3224 match pk {
3225 Some(field) => rows
3226 .iter()
3227 .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3228 .collect::<Result<_, _>>()?,
3229 None => Vec::new(),
3230 }
3231 }
3232 DbPool::Postgres(pool) => {
3233 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3234 let rows = if atomic {
3235 let mut tx = pool.begin().await?;
3236 let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3237 .fetch_all(&mut *tx)
3238 .await;
3239 match r {
3240 Ok(rows) => {
3241 tx.commit().await?;
3242 rows
3243 }
3244 Err(e) => {
3245 let _ = tx.rollback().await;
3246 return Err(e);
3247 }
3248 }
3249 } else {
3250 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3251 .fetch_all(&pool)
3252 .await?
3253 };
3254 match pk {
3255 Some(field) => rows
3256 .iter()
3257 .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3258 .collect::<Result<_, _>>()?,
3259 None => Vec::new(),
3260 }
3261 }
3262 };
3263 let count = ids.len() as u64;
3264 if !ids.is_empty() {
3265 crate::signals::emit_bulk_post_delete::<T>(ids).await;
3266 }
3267 Ok(count)
3268 }
3269
3270 /// Helper: build the UPDATE statement for the active backend.
3271 /// Walks the `values` map, validates each column against the
3272 /// model's `FIELDS` metadata, converts the JSON value via
3273 /// `write::json_to_sea_value`, and threads the accumulated
3274 /// predicates into the WHERE clause.
3275 fn build_update_for(
3276 &self,
3277 backend_name: &str,
3278 values: &serde_json::Map<String, serde_json::Value>,
3279 ) -> Result<sea_query::UpdateStatement, crate::orm::write::WriteError> {
3280 use crate::orm::write::{WriteError, json_to_sea_value};
3281 let mut stmt = Query::update();
3282 stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
3283 for (col_name, val) in values {
3284 // Look up the column on the model. Unknown column names
3285 // fail loudly here rather than producing a bad UPDATE.
3286 let field = T::FIELDS
3287 .iter()
3288 .find(|f| f.name == col_name.as_str())
3289 .ok_or_else(|| WriteError::UnknownColumn {
3290 field: col_name.clone(),
3291 })?;
3292 // Reject attempts to overwrite the PK via update_values.
3293 // The QuerySet's WHERE clause is the only way to identify
3294 // rows; rewriting the PK while filtering on the old one
3295 // is a footgun.
3296 if field.primary_key {
3297 continue;
3298 }
3299 let sea_value =
3300 json_to_sea_value(field.ty, val, field.nullable, field.name, fk_pk_hint(field))?;
3301 stmt.value(Alias::new(field.name), sea_value);
3302 }
3303 for p in &self.predicates {
3304 stmt.and_where(p.cond_for(backend_name));
3305 }
3306 if self.soft_delete_active {
3307 if self.only_deleted {
3308 stmt.and_where(Expr::col(Alias::new("deleted_at")).is_not_null());
3309 } else if !self.with_deleted {
3310 stmt.and_where(Expr::col(Alias::new("deleted_at")).is_null());
3311 }
3312 }
3313 Ok(stmt)
3314 }
3315
3316 /// Run the SELECT against an explicit `PgPool` and return every
3317 /// matching row. Bound by `FromRow<PgRow>` alone so models with
3318 /// Postgres-only field types compile.
3319 pub async fn fetch_pg(self, pool: &sqlx::PgPool) -> Result<Vec<T>, sqlx::Error>
3320 where
3321 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3322 {
3323 let q = self.build_query_for("postgres");
3324 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
3325 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3326 .fetch_all(pool)
3327 .await
3328 }
3329
3330 /// Run the SELECT against an explicit `PgPool` with LIMIT 1.
3331 pub async fn first_pg(mut self, pool: &sqlx::PgPool) -> Result<Option<T>, sqlx::Error>
3332 where
3333 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3334 {
3335 self.query.limit(1);
3336 let q = self.build_query_for("postgres");
3337 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
3338 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3339 .fetch_optional(pool)
3340 .await
3341 }
3342
3343 /// Run `SELECT COUNT(*)` against an explicit `PgPool`. No FromRow
3344 /// bound on `T` — the count tuple type is `(i64,)`.
3345 pub async fn count_pg(self, pool: &sqlx::PgPool) -> Result<i64, sqlx::Error> {
3346 let mut rebuilt = self.build_query_for("postgres");
3347 rebuilt.clear_selects();
3348 rebuilt.expr(Func::count(Expr::col(sea_query::Asterisk)));
3349 rebuilt.reset_limit();
3350 rebuilt.reset_offset();
3351 let (sql, values) = rebuilt.build_sqlx(PostgresQueryBuilder);
3352 let (n,): (i64,) = sqlx::query_as_with::<sqlx::Postgres, (i64,), _>(&sql, values)
3353 .fetch_one(pool)
3354 .await?;
3355 Ok(n)
3356 }
3357
3358 /// Return whether any row matches, against an explicit `PgPool`.
3359 pub async fn exists_pg(self, pool: &sqlx::PgPool) -> Result<bool, sqlx::Error>
3360 where
3361 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3362 {
3363 let rows = self.limit(1).fetch_pg(pool).await?;
3364 Ok(!rows.is_empty())
3365 }
3366
3367 /// Exactly-one terminal against an explicit `PgPool`.
3368 /// See [`QuerySet::get`] for the error-variant semantics.
3369 pub async fn get_pg(self, pool: &sqlx::PgPool) -> Result<T, GetError>
3370 where
3371 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3372 {
3373 let mut rows = self.limit(2).fetch_pg(pool).await.map_err(GetError::Sqlx)?;
3374 match rows.len() {
3375 0 => Err(GetError::NotFound),
3376 1 => Ok(rows.pop().unwrap()),
3377 _ => Err(GetError::MultipleObjectsReturned),
3378 }
3379 }
3380}
3381
3382/// Delegating chainable + terminal surface on `Manager<T>`.
3383///
3384/// Lets users write `Post::objects().filter(...).fetch().await` without
3385/// a separate `.query()` hop. Each method constructs the initial
3386/// `SelectStatement` against `T::TABLE` with one column per
3387/// `T::FIELDS` entry, wraps it in a fresh `QuerySet<T>`, and forwards.
3388impl<T: Model> Manager<T> {
3389 fn queryset(&self) -> QuerySet<T> {
3390 let columns: Vec<Alias> = T::FIELDS.iter().map(|f| Alias::new(f.name)).collect();
3391 let query = Query::select()
3392 .columns(columns)
3393 .from(crate::db::router::schema_qualified_table(T::TABLE))
3394 .take();
3395 let mut qs = QuerySet::new(query);
3396 // BUG-8: seed the default ORDER BY from `Model::ORDERING` so
3397 // terminals that don't see an explicit `.order_by(...)` still
3398 // get a deterministic row order.
3399 qs.default_ordering = T::ORDERING.to_vec();
3400 // Propagate the Manager's atomic override so QuerySet
3401 // terminals inherit it without the caller re-specifying.
3402 qs.atomic = self.atomic;
3403 // Feature #72 — snapshot the model's soft-delete opt-in into
3404 // the QuerySet so the build_query_for path knows whether to
3405 // auto-inject `WHERE deleted_at IS NULL`. Without this
3406 // snapshot the `impl<T> QuerySet<T>` path can't read
3407 // `T::SOFT_DELETE` (T is unbounded there).
3408 qs.soft_delete_active = T::SOFT_DELETE;
3409 qs
3410 }
3411
3412 /// Resolve whether write terminals on this Manager should auto-wrap
3413 /// in a transaction. Per-call override > builder global.
3414 fn should_atomic_wrap(&self) -> bool {
3415 self.atomic.unwrap_or_else(crate::db::atomic_default)
3416 }
3417
3418 /// See `QuerySet::filter`.
3419 pub fn filter(&self, p: Predicate<T>) -> QuerySet<T> {
3420 self.queryset().filter(p)
3421 }
3422
3423 /// See `QuerySet::exclude`.
3424 pub fn exclude(&self, p: Predicate<T>) -> QuerySet<T> {
3425 self.queryset().exclude(p)
3426 }
3427
3428 /// A bare [`QuerySet`] over every row — the `Model::objects().all()` form.
3429 ///
3430 /// The entry point when you need a `QuerySet` terminal without a
3431 /// filter: a grouped aggregate over the whole table, an unfiltered
3432 /// `aggregate`, or just an explicit "all rows" for readability.
3433 pub fn all(&self) -> QuerySet<T> {
3434 self.queryset()
3435 }
3436
3437 /// See [`QuerySet::aggregate`] — single-row aggregate over every row.
3438 ///
3439 /// Forwards from the manager so `Model::objects().aggregate(...)`
3440 /// works without an intervening `.filter(...)` / `.on(...)`.
3441 pub async fn aggregate(
3442 &self,
3443 aggs: &[(&str, crate::orm::Aggregate)],
3444 ) -> Result<JsonValue, sqlx::Error> {
3445 self.queryset().aggregate(aggs).await
3446 }
3447
3448 /// See [`QuerySet::annotate`] — grouped aggregate (`GROUP BY <group_cols>`).
3449 ///
3450 /// Forwards from the manager so the documented
3451 /// `Model::objects().annotate(&["status"], &[("count", Aggregate::count())])`
3452 /// (a grouped count over `"status"`) compiles
3453 /// directly, without a filter first.
3454 pub async fn annotate(
3455 &self,
3456 group_cols: &[&str],
3457 aggs: &[(&str, crate::orm::Aggregate)],
3458 ) -> Result<Vec<JsonValue>, sqlx::Error> {
3459 self.queryset().annotate(group_cols, aggs).await
3460 }
3461
3462 /// Feature #72 — see `QuerySet::with_deleted`.
3463 pub fn with_deleted(&self) -> QuerySet<T> {
3464 self.queryset().with_deleted()
3465 }
3466
3467 /// Feature #72 — see `QuerySet::only_deleted`.
3468 pub fn only_deleted(&self) -> QuerySet<T> {
3469 self.queryset().only_deleted()
3470 }
3471
3472 /// Gap #111 — see [`QuerySet::only`].
3473 pub fn only(&self, cols: &[&str]) -> QuerySet<T> {
3474 self.queryset().only(cols)
3475 }
3476
3477 /// See [`QuerySet::join_related`].
3478 pub fn join_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3479 self.queryset().join_related(field_name)
3480 }
3481
3482 /// See [`QuerySet::join_related_many`].
3483 pub fn join_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3484 self.queryset().join_related_many(field_names)
3485 }
3486
3487 /// See [`QuerySet::left_join_related`].
3488 pub fn left_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3489 self.queryset().left_join_related(path)
3490 }
3491
3492 /// See [`QuerySet::inner_join_related`].
3493 pub fn inner_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3494 self.queryset().inner_join_related(path)
3495 }
3496
3497 /// See [`QuerySet::right_join_related`].
3498 pub fn right_join_related(&self, path: impl Into<String>) -> QuerySet<T> {
3499 self.queryset().right_join_related(path)
3500 }
3501
3502 /// See [`QuerySet::select_related`].
3503 pub fn select_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3504 self.queryset().select_related(field_name)
3505 }
3506
3507 /// See [`QuerySet::select_related_many`].
3508 pub fn select_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3509 self.queryset().select_related_many(field_names)
3510 }
3511
3512 /// See [`QuerySet::prefetch_related`].
3513 pub fn prefetch_related(&self, field_name: impl Into<String>) -> QuerySet<T> {
3514 self.queryset().prefetch_related(field_name)
3515 }
3516
3517 /// See [`QuerySet::prefetch_related_many`].
3518 pub fn prefetch_related_many(&self, field_names: &[&str]) -> QuerySet<T> {
3519 self.queryset().prefetch_related_many(field_names)
3520 }
3521
3522 /// Feature #72 — see `QuerySet::hard_delete`.
3523 pub fn hard_delete(&self) -> QuerySet<T> {
3524 self.queryset().hard_delete()
3525 }
3526
3527 /// See `QuerySet::into_subquery`.
3528 pub fn into_subquery(&self, col_name: &str) -> crate::orm::Subquery {
3529 self.queryset().into_subquery(col_name)
3530 }
3531
3532 /// See `QuerySet::order_by`.
3533 pub fn order_by(&self, o: OrderExpr<T>) -> QuerySet<T> {
3534 self.queryset().order_by(o)
3535 }
3536
3537 /// See `QuerySet::limit`.
3538 pub fn limit(&self, n: u64) -> QuerySet<T> {
3539 self.queryset().limit(n)
3540 }
3541
3542 /// See `QuerySet::offset`.
3543 pub fn offset(&self, n: u64) -> QuerySet<T> {
3544 self.queryset().offset(n)
3545 }
3546
3547 /// See `QuerySet::on`.
3548 pub fn on(&self, pool: &sqlx::SqlitePool) -> QuerySet<T> {
3549 self.queryset().on(pool)
3550 }
3551
3552 /// See `QuerySet::on_pg`.
3553 pub fn on_pg(&self, pool: &sqlx::PgPool) -> QuerySet<T> {
3554 self.queryset().on_pg(pool)
3555 }
3556
3557 /// See `QuerySet::fetch`.
3558 pub async fn fetch(&self) -> Result<Vec<T>, sqlx::Error>
3559 where
3560 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3561 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3562 + HydrateRelated,
3563 {
3564 self.queryset().fetch().await
3565 }
3566
3567 /// See `QuerySet::first`.
3568 pub async fn first(&self) -> Result<Option<T>, sqlx::Error>
3569 where
3570 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3571 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3572 + HydrateRelated,
3573 {
3574 self.queryset().first().await
3575 }
3576
3577 /// See `QuerySet::count`.
3578 pub async fn count(&self) -> Result<i64, sqlx::Error> {
3579 self.queryset().count().await
3580 }
3581
3582 /// See [`QuerySet::annotate_related`] — starts an annotated chain
3583 /// from the manager, like `filter` does.
3584 pub fn annotate_related(
3585 &self,
3586 alias: &str,
3587 relation: &str,
3588 agg: crate::orm::Aggregate,
3589 ) -> QuerySet<T> {
3590 self.queryset().annotate_related(alias, relation, agg)
3591 }
3592
3593 /// See [`QuerySet::annotate_count`].
3594 pub fn annotate_count(&self, relation: &str) -> QuerySet<T> {
3595 self.queryset().annotate_count(relation)
3596 }
3597
3598 /// See [`QuerySet::annotate_count_where`] — starts a filtered
3599 /// annotated chain from the manager.
3600 pub fn annotate_count_where<C: crate::orm::Model>(
3601 &self,
3602 alias: &str,
3603 relation: &str,
3604 pred: crate::orm::Predicate<C>,
3605 ) -> QuerySet<T> {
3606 self.queryset()
3607 .annotate_count_where::<C>(alias, relation, pred)
3608 }
3609
3610 /// See [`QuerySet::fetch_annotated`].
3611 pub async fn fetch_annotated(
3612 &self,
3613 ) -> Result<Vec<(T, serde_json::Map<String, JsonValue>)>, sqlx::Error>
3614 where
3615 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3616 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3617 {
3618 self.queryset().fetch_annotated().await
3619 }
3620
3621 /// See [`QuerySet::values`].
3622 pub async fn values(&self, columns: &[&str]) -> Result<Vec<JsonValue>, sqlx::Error> {
3623 self.queryset().values(columns).await
3624 }
3625
3626 /// See `QuerySet::exists`.
3627 pub async fn exists(&self) -> Result<bool, sqlx::Error>
3628 where
3629 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3630 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3631 + HydrateRelated,
3632 {
3633 self.queryset().exists().await
3634 }
3635
3636 /// `.get(predicate)` — sugar for `.filter(predicate).get()`.
3637 ///
3638 /// The one-liner: `User::objects().get(user::ID.eq(1))`.
3639 /// See [`QuerySet::get`] for error-variant semantics.
3640 pub async fn get(&self, p: Predicate<T>) -> Result<T, GetError>
3641 where
3642 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3643 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3644 + HydrateRelated,
3645 {
3646 self.queryset().filter(p).get().await
3647 }
3648
3649 /// See [`QuerySet::fetch_pg`].
3650 pub async fn fetch_pg(&self, pool: &sqlx::PgPool) -> Result<Vec<T>, sqlx::Error>
3651 where
3652 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3653 {
3654 self.queryset().fetch_pg(pool).await
3655 }
3656
3657 /// See [`QuerySet::first_pg`].
3658 pub async fn first_pg(&self, pool: &sqlx::PgPool) -> Result<Option<T>, sqlx::Error>
3659 where
3660 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3661 {
3662 self.queryset().first_pg(pool).await
3663 }
3664
3665 /// See [`QuerySet::count_pg`].
3666 pub async fn count_pg(&self, pool: &sqlx::PgPool) -> Result<i64, sqlx::Error> {
3667 self.queryset().count_pg(pool).await
3668 }
3669
3670 /// See [`QuerySet::exists_pg`].
3671 pub async fn exists_pg(&self, pool: &sqlx::PgPool) -> Result<bool, sqlx::Error>
3672 where
3673 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3674 {
3675 self.queryset().exists_pg(pool).await
3676 }
3677
3678 /// Postgres-only sugar for `.filter(predicate).get_pg(pool)`.
3679 pub async fn get_pg(&self, pool: &sqlx::PgPool, p: Predicate<T>) -> Result<T, GetError>
3680 where
3681 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
3682 {
3683 self.queryset().filter(p).get_pg(pool).await
3684 }
3685
3686 // =====================================================================
3687 // Write methods — INSERT.
3688 //
3689 // `create(instance)` does one row; `bulk_create([...])` does many in
3690 // a single multi-VALUES INSERT. Both serialise the instance(s) to a
3691 // JSON map via `serde::Serialize`, look up each field in the model's
3692 // `FIELDS` metadata, and bind values through
3693 // [`crate::orm::write::json_to_sea_value`].
3694 //
3695 // PK handling:
3696 // - Default value (0 for ints, nil for UUIDs, empty for String):
3697 // omitted from the INSERT column list so the DB autoincrement /
3698 // default kicks in.
3699 // - Explicit non-default value: included in the INSERT so the
3700 // caller can supply UUIDs / slug PKs themselves.
3701 // =====================================================================
3702
3703 /// INSERT one row, return the row as it now exists in the
3704 /// database (with any autoincrement PK populated). Uses the
3705 /// ambient pool via `Manager::queryset().resolve_pool`.
3706 pub async fn create(&self, mut instance: T) -> Result<T, crate::orm::write::WriteError>
3707 where
3708 T: serde::Serialize
3709 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3710 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3711 + HydrateRelated,
3712 {
3713 use crate::orm::write::WriteError;
3714 let map = serialize_to_map(&instance)?;
3715
3716 // Same pre-DB validation pipeline the dynamic
3717 // `insert_json` path runs — choices + FK existence +
3718 // M2M shape. Empty-string + required-field checks are
3719 // intentionally relaxed on the typed path: a Rust
3720 // `pub title: String` field set to `""` is the caller's
3721 // deliberate choice, not a form-default leak, and
3722 // missing-required can't happen because the struct
3723 // forced the caller to supply every column at compile
3724 // time. We only validate the things the typed path
3725 // can't catch at compile time.
3726 let meta = crate::migrate::ModelMeta::for_::<T>();
3727 let validation_errors = crate::orm::validation::validate_on_typed_create(&meta, &map).await;
3728 if !validation_errors.is_empty() {
3729 return Err(WriteError::Multiple {
3730 errors: validation_errors,
3731 });
3732 }
3733
3734 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3735 let backend = pool.backend_name();
3736 let stmt = build_insert_one_for::<T>(backend, &map)?;
3737 let atomic = self.should_atomic_wrap();
3738 // Post-execution SQL classification: turns the DB's
3739 // UNIQUE / FK / NOT NULL / CHECK violations into the
3740 // structured `WriteError` variants instead of a raw
3741 // `Sqlx(_)` 500. Symmetric with `DynQuerySet::insert_json`.
3742 match pool {
3743 DbPool::Sqlite(pool) => {
3744 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3745 let row_result = if atomic {
3746 let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3747 let r = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
3748 .fetch_one(&mut *tx)
3749 .await;
3750 match r {
3751 Ok(row) => {
3752 tx.commit().await.map_err(WriteError::Sqlx)?;
3753 Ok(row)
3754 }
3755 Err(e) => {
3756 let _ = tx.rollback().await;
3757 Err(e)
3758 }
3759 }
3760 } else {
3761 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
3762 .fetch_one(&pool)
3763 .await
3764 };
3765 let mut row = row_result.map_err(|e| {
3766 crate::orm::validation::classify_sql_error(&e, &map)
3767 .unwrap_or(WriteError::Sqlx(e))
3768 })?;
3769 // BUG-16 step 2: every materialised row, including the
3770 // post-INSERT readback, needs `parent_id` +
3771 // `junction_table` seeded on its M2M slots — otherwise
3772 // `row.tags.add(...)` is a silent no-op.
3773 row.set_m2m_parent_ids();
3774 // Carry form-staged M2M pending ids from the caller's
3775 // instance onto the readback row, then flush them to
3776 // junction rows now that parent_id + junction_table are
3777 // seeded on the readback row.
3778 instance.take_pending_m2m_into(&mut row);
3779 row.write_pending_m2m().await?;
3780 Ok(row)
3781 }
3782 DbPool::Postgres(pool) => {
3783 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3784 let row_result = if atomic {
3785 let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3786 let r = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3787 .fetch_one(&mut *tx)
3788 .await;
3789 match r {
3790 Ok(row) => {
3791 tx.commit().await.map_err(WriteError::Sqlx)?;
3792 Ok(row)
3793 }
3794 Err(e) => {
3795 let _ = tx.rollback().await;
3796 Err(e)
3797 }
3798 }
3799 } else {
3800 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
3801 .fetch_one(&pool)
3802 .await
3803 };
3804 let mut row = row_result.map_err(|e| {
3805 crate::orm::validation::classify_sql_error(&e, &map)
3806 .unwrap_or(WriteError::Sqlx(e))
3807 })?;
3808 row.set_m2m_parent_ids();
3809 instance.take_pending_m2m_into(&mut row);
3810 row.write_pending_m2m().await?;
3811 Ok(row)
3812 }
3813 }
3814 }
3815
3816 /// INSERT many rows in a single statement. Returns the number of
3817 /// rows inserted. The full populated rows aren't materialised —
3818 /// use a follow-up `Model::objects().filter(...).fetch()` if you
3819 /// need them.
3820 ///
3821 /// Empty input is a no-op (returns Ok(0)) — the alternative
3822 /// (building an `INSERT INTO t () VALUES ()` and failing at the
3823 /// DB) doesn't help anyone.
3824 ///
3825 /// Fires `bulk_post_save:<table>` once with `{ ids, created: true,
3826 /// actor }` when at least one row was inserted. Per-row
3827 /// `pre_save` / `post_save` are NOT fired — use [`Self::save`]
3828 /// when per-row signal semantics are required.
3829 pub async fn bulk_create(&self, instances: Vec<T>) -> Result<u64, crate::orm::write::WriteError>
3830 where
3831 T: serde::Serialize,
3832 {
3833 use crate::orm::write::WriteError;
3834 if instances.is_empty() {
3835 return Ok(0);
3836 }
3837 let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
3838 let maps = maps?;
3839 // Validate every instance through the typed-create
3840 // pipeline. Collected into one `Multiple` so a caller
3841 // that submitted ten rows and got two bad ones can fix
3842 // both in one pass.
3843 let meta = crate::migrate::ModelMeta::for_::<T>();
3844 let mut all_errors: Vec<WriteError> = Vec::new();
3845 for map in &maps {
3846 let errs = crate::orm::validation::validate_on_typed_create(&meta, map).await;
3847 all_errors.extend(errs);
3848 }
3849 if !all_errors.is_empty() {
3850 return Err(WriteError::Multiple { errors: all_errors });
3851 }
3852 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3853 let backend = pool.backend_name();
3854 let mut stmt = build_insert_many_for::<T>(backend, &maps)?;
3855 // First row's map is used to enrich UNIQUE / FK
3856 // messages with the offending value when the engine
3857 // doesn't name it. Imperfect for bulk (the failing row
3858 // could be later in the batch) but better than the raw
3859 // sqlx error.
3860 let first_map = maps.first().cloned().unwrap_or_default();
3861 // Add `RETURNING <pk>` so the bulk_post_save signal payload can
3862 // carry the inserted PKs. Both backends support this — SQLite
3863 // since 3.35, Postgres natively. Replaces the previous
3864 // `execute()` + rows_affected path; count comes from the
3865 // returned row vector instead.
3866 let pk = pk_field::<T>();
3867 if let Some(field) = pk {
3868 stmt.returning_col(Alias::new(field.name));
3869 }
3870 let atomic = self.should_atomic_wrap();
3871 let ids: Vec<JsonValue> = match pool {
3872 DbPool::Sqlite(pool) => {
3873 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
3874 let rows = if atomic {
3875 let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3876 let r = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3877 .fetch_all(&mut *tx)
3878 .await;
3879 match r {
3880 Ok(rows) => {
3881 tx.commit().await.map_err(WriteError::Sqlx)?;
3882 rows
3883 }
3884 Err(e) => {
3885 let _ = tx.rollback().await;
3886 return Err(crate::orm::validation::classify_sql_error(&e, &first_map)
3887 .unwrap_or(WriteError::Sqlx(e)));
3888 }
3889 }
3890 } else {
3891 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
3892 .fetch_all(&pool)
3893 .await
3894 .map_err(|e| {
3895 crate::orm::validation::classify_sql_error(&e, &first_map)
3896 .unwrap_or(WriteError::Sqlx(e))
3897 })?
3898 };
3899 match pk {
3900 Some(field) => rows
3901 .iter()
3902 .map(|r| backend_sqlite::pk_to_json(r, field.name, field.ty))
3903 .collect::<Result<_, _>>()
3904 .map_err(WriteError::Sqlx)?,
3905 None => Vec::new(),
3906 }
3907 }
3908 DbPool::Postgres(pool) => {
3909 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
3910 let rows = if atomic {
3911 let mut tx = pool.begin().await.map_err(WriteError::Sqlx)?;
3912 let r = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3913 .fetch_all(&mut *tx)
3914 .await;
3915 match r {
3916 Ok(rows) => {
3917 tx.commit().await.map_err(WriteError::Sqlx)?;
3918 rows
3919 }
3920 Err(e) => {
3921 let _ = tx.rollback().await;
3922 return Err(crate::orm::validation::classify_sql_error(&e, &first_map)
3923 .unwrap_or(WriteError::Sqlx(e)));
3924 }
3925 }
3926 } else {
3927 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
3928 .fetch_all(&pool)
3929 .await
3930 .map_err(|e| {
3931 crate::orm::validation::classify_sql_error(&e, &first_map)
3932 .unwrap_or(WriteError::Sqlx(e))
3933 })?
3934 };
3935 match pk {
3936 Some(field) => rows
3937 .iter()
3938 .map(|r| backend_pg::pk_to_json(r, field.name, field.ty))
3939 .collect::<Result<_, _>>()
3940 .map_err(WriteError::Sqlx)?,
3941 None => Vec::new(),
3942 }
3943 }
3944 };
3945 let count = ids.len() as u64;
3946 if !ids.is_empty() {
3947 crate::signals::emit_bulk_post_save::<T>(ids, true).await;
3948 }
3949 Ok(count)
3950 }
3951
3952 /// The `get_or_create` terminal: fetch the first row matching `predicate`;
3953 /// if none exists, insert `defaults` and return it. Returns
3954 /// `(row, created)` so the caller can branch on whether the write
3955 /// happened. Two queries on the miss path (filter+first then create),
3956 /// one query on the hit path.
3957 ///
3958 /// ## Concurrency
3959 ///
3960 /// Convergent under concurrent callers: if two callers both miss the
3961 /// SELECT and race to INSERT, the one that loses gets a
3962 /// `UniqueViolation`; that error is caught here and the existing row
3963 /// is re-fetched, so both callers return the same row with
3964 /// `created = false` for the loser. A UNIQUE constraint on the
3965 /// predicate columns is required for true at-most-one semantics — the
3966 /// constraint is what makes the convergence deterministic.
3967 pub async fn get_or_create(
3968 &self,
3969 predicate: Predicate<T>,
3970 defaults: T,
3971 ) -> Result<(T, bool), crate::orm::write::WriteError>
3972 where
3973 T: serde::Serialize
3974 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
3975 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
3976 + HydrateRelated,
3977 {
3978 use crate::orm::write::WriteError;
3979
3980 // Read-your-writes: probe for the existing row on the WRITE database,
3981 // not a (possibly lagging) read replica — otherwise a read/write-split
3982 // router could miss a just-written row and insert a duplicate. The
3983 // following `create()` already resolves the same write target.
3984 let write_pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
3985 if let Some(existing) = pin_to_pool(self.filter(predicate.clone()), &write_pool)
3986 .first()
3987 .await
3988 .map_err(WriteError::Sqlx)?
3989 {
3990 return Ok((existing, false));
3991 }
3992
3993 // Attempt the INSERT. On a UNIQUE violation (concurrent writer won the
3994 // race between our SELECT and this INSERT), catch the error and
3995 // re-SELECT to return the now-existing row with created=false.
3996 // This is the standard try-insert-then-fetch convergence pattern.
3997 //
3998 // Note on transaction semantics: a plain INSERT is atomic at the
3999 // statement level. Wrapping SELECT+INSERT in a serialisable transaction
4000 // would be stricter but requires SAVEPOINT support to recover from the
4001 // Postgres "aborted transaction" state after a constraint violation —
4002 // a per-operation SAVEPOINT would add two extra round-trips on every
4003 // write for marginal gain. The UNIQUE-constraint backstop plus this
4004 // re-fetch gives the same observable guarantee: callers always converge
4005 // on the same row and never see a spurious UniqueViolation.
4006 match self.create(defaults).await {
4007 Ok(created) => Ok((created, true)),
4008 Err(WriteError::UniqueViolation { .. }) => {
4009 // A concurrent writer inserted the row between our SELECT and
4010 // our INSERT. Re-fetch the now-existing row.
4011 let existing = pin_to_pool(self.filter(predicate), &write_pool)
4012 .first()
4013 .await
4014 .map_err(WriteError::Sqlx)?
4015 .ok_or_else(|| {
4016 WriteError::Sqlx(sqlx::Error::Protocol(
4017 "get_or_create: row vanished after UniqueViolation re-fetch"
4018 .to_string(),
4019 ))
4020 })?;
4021 Ok((existing, false))
4022 }
4023 Err(e) => Err(e),
4024 }
4025 }
4026
4027 /// The `update_or_create` terminal: fetch the first row matching
4028 /// `predicate`; if found, update its non-PK columns with the
4029 /// `defaults` instance's values and return the fresh row;
4030 /// otherwise insert `defaults` and return it. Returns
4031 /// `(row, created)` so the caller can branch on the path taken.
4032 ///
4033 /// The defaults' PK is intentionally ignored on the update path —
4034 /// the matched row keeps its original PK. On the insert path the
4035 /// defaults' PK is honoured (autoincrement sentinel `0` → DB
4036 /// assigns; explicit value → DB uses it).
4037 ///
4038 /// ## Concurrency
4039 ///
4040 /// Convergent under concurrent callers: if two callers both miss the
4041 /// SELECT and race to INSERT, the loser gets a `UniqueViolation`; that
4042 /// error is caught here and the existing row is re-fetched, then the
4043 /// update is applied to it. Both callers converge on the same row, with
4044 /// `created = false` for the loser. A UNIQUE constraint on the predicate
4045 /// columns is required for deterministic convergence.
4046 ///
4047 /// Implementation: 2 queries on the hit path (`first` + UPDATE + re-fetch),
4048 /// 2 queries on the miss+create path (`first` + `create`), or
4049 /// 4 queries on the miss+race path (`first` + failed-INSERT + `first`
4050 /// + UPDATE + re-fetch).
4051 pub async fn update_or_create(
4052 &self,
4053 predicate: Predicate<T>,
4054 defaults: T,
4055 ) -> Result<(T, bool), crate::orm::write::WriteError>
4056 where
4057 T: serde::Serialize
4058 + Clone
4059 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4060 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
4061 + HydrateRelated,
4062 {
4063 use crate::orm::write::WriteError;
4064 let pk = pk_field::<T>().ok_or_else(|| {
4065 WriteError::Sqlx(sqlx::Error::Protocol(
4066 "update_or_create: model has no primary key".to_string(),
4067 ))
4068 })?;
4069 let pk_name = pk.name;
4070
4071 // Read-your-writes: the existence probe and the post-update re-fetch
4072 // run on the WRITE database, so a read/write-split router doesn't miss
4073 // a just-written row (duplicate insert) or read a stale row back. The
4074 // intervening UPDATE already routes to the same write target.
4075 let write_pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4076
4077 // Shared helper: given the existing row (already fetched) and the
4078 // defaults instance, apply the non-PK column update and return the
4079 // re-fetched row. Used by both the direct-hit path and the
4080 // UniqueViolation-convergence path so the update logic is in one place.
4081 macro_rules! do_update {
4082 ($existing:expr, $defaults:expr) => {{
4083 let existing: T = $existing;
4084 let defaults: T = $defaults;
4085
4086 // Serialize defaults, drop the PK so the matched row's PK is
4087 // preserved, then UPDATE WHERE <pk_col> = <existing_pk>.
4088 let mut update_map = serialize_to_map(&defaults)?;
4089 update_map.remove(pk_name);
4090
4091 // Build a PK predicate from the existing row's serialized PK
4092 // value. Goes through serde_json so any PK type (i64, String,
4093 // Uuid) round-trips correctly through sea-query.
4094 let existing_json =
4095 serde_json::to_value(&existing).map_err(WriteError::SerializeFailed)?;
4096 let pk_value_json = existing_json
4097 .get(pk_name)
4098 .cloned()
4099 .unwrap_or(serde_json::Value::Null);
4100 let pk_sea = crate::orm::write::json_to_sea_value(
4101 pk.ty,
4102 &pk_value_json,
4103 false,
4104 pk_name,
4105 None,
4106 )?;
4107 let pk_pred: Predicate<T> = Predicate::new(
4108 sea_query::Expr::col(sea_query::Alias::new(pk_name)).eq(pk_sea),
4109 );
4110
4111 // Run the UPDATE.
4112 self.filter(pk_pred).update_values(update_map).await?;
4113
4114 // Re-fetch to return the populated row.
4115 let pk_sea2 = crate::orm::write::json_to_sea_value(
4116 pk.ty,
4117 &pk_value_json,
4118 false,
4119 pk_name,
4120 None,
4121 )?;
4122 let refetch_pred: Predicate<T> = Predicate::new(
4123 sea_query::Expr::col(sea_query::Alias::new(pk_name)).eq(pk_sea2),
4124 );
4125 pin_to_pool(self.filter(refetch_pred), &write_pool)
4126 .first()
4127 .await
4128 .map_err(WriteError::Sqlx)?
4129 .ok_or_else(|| {
4130 WriteError::Sqlx(sqlx::Error::Protocol(
4131 "update_or_create: row vanished between UPDATE and re-fetch"
4132 .to_string(),
4133 ))
4134 })?
4135 }};
4136 }
4137
4138 if let Some(existing) = pin_to_pool(self.filter(predicate.clone()), &write_pool)
4139 .first()
4140 .await
4141 .map_err(WriteError::Sqlx)?
4142 {
4143 let updated = do_update!(existing, defaults);
4144 return Ok((updated, false));
4145 }
4146
4147 // Attempt the INSERT. On a UNIQUE violation (concurrent writer won the
4148 // race between our SELECT and this INSERT), catch the error, re-fetch
4149 // the now-existing row, and apply the update to it — same convergence
4150 // as get_or_create but with an extra UPDATE step.
4151 match self.create(defaults.clone()).await {
4152 Ok(created) => Ok((created, true)),
4153 Err(WriteError::UniqueViolation { .. }) => {
4154 // A concurrent writer inserted the row between our SELECT and
4155 // our INSERT. Re-fetch then update, same as the direct-hit path.
4156 let existing = pin_to_pool(self.filter(predicate), &write_pool)
4157 .first()
4158 .await
4159 .map_err(WriteError::Sqlx)?
4160 .ok_or_else(|| {
4161 WriteError::Sqlx(sqlx::Error::Protocol(
4162 "update_or_create: row vanished after UniqueViolation re-fetch"
4163 .to_string(),
4164 ))
4165 })?;
4166 let updated = do_update!(existing, defaults);
4167 Ok((updated, false))
4168 }
4169 Err(e) => Err(e),
4170 }
4171 }
4172
4173 /// INSERT-or-UPDATE keyed on the primary key. The row's PK column
4174 /// is the conflict target; on a hit, every non-PK column is
4175 /// overwritten with the supplied value. Returns the row as the DB
4176 /// stored it (post-upsert).
4177 ///
4178 /// Both backends use `INSERT ... ON CONFLICT(<pk>) DO UPDATE SET
4179 /// col = excluded.col, ...`. The SQLite and Postgres syntax happens
4180 /// to match exactly here so a single sea-query `OnConflict` builder
4181 /// covers both.
4182 pub async fn upsert(&self, instance: T) -> Result<T, crate::orm::write::WriteError>
4183 where
4184 T: serde::Serialize
4185 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4186 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4187 {
4188 let map = serialize_to_map(&instance)?;
4189 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4190 let backend = pool.backend_name();
4191 let mut stmt = build_insert_one_for::<T>(backend, &map)?;
4192
4193 // Conflict target = PK column. update_columns = every non-PK
4194 // column the body included. sea-query renders `DO UPDATE SET
4195 // col = excluded.col` (SQLite) / `DO UPDATE SET col =
4196 // EXCLUDED.col` (PG) — both forms work cross-dialect.
4197 let pk_name = T::FIELDS
4198 .iter()
4199 .find(|f| f.primary_key)
4200 .map(|f| f.name)
4201 .ok_or_else(|| {
4202 crate::orm::write::WriteError::Sqlx(sqlx::Error::Protocol(
4203 "upsert: model has no primary key — use get_or_create or create instead"
4204 .to_string(),
4205 ))
4206 })?;
4207 let update_cols: Vec<Alias> = T::FIELDS
4208 .iter()
4209 .filter(|f| !f.primary_key && map.contains_key(f.name))
4210 .map(|f| Alias::new(f.name))
4211 .collect();
4212 let mut on_conflict = sea_query::OnConflict::column(Alias::new(pk_name));
4213 if !update_cols.is_empty() {
4214 on_conflict.update_columns(update_cols);
4215 } else {
4216 // No non-PK columns to overwrite — this is a "INSERT OR
4217 // IGNORE" shape. sea-query encodes that as `DO NOTHING`.
4218 on_conflict.do_nothing();
4219 }
4220 stmt.on_conflict(on_conflict);
4221
4222 match pool {
4223 DbPool::Sqlite(pool) => {
4224 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4225 let row = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4226 .fetch_one(&pool)
4227 .await?;
4228 Ok(row)
4229 }
4230 DbPool::Postgres(pool) => {
4231 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4232 let row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4233 .fetch_one(&pool)
4234 .await?;
4235 Ok(row)
4236 }
4237 }
4238 }
4239
4240 /// `create` against an explicit Postgres pool. The Postgres
4241 /// counterpart of [`Self::create`] for models with Postgres-only
4242 /// field types (Array, Inet, MacAddr, FullText), whose `FromRow`
4243 /// impl exists only for `PgRow`.
4244 pub async fn create_pg(
4245 &self,
4246 instance: T,
4247 pool: &sqlx::PgPool,
4248 ) -> Result<T, crate::orm::write::WriteError>
4249 where
4250 T: serde::Serialize + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4251 {
4252 let map = serialize_to_map(&instance)?;
4253 let stmt = build_insert_one_for::<T>("postgres", &map)?;
4254 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4255 let row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4256 .fetch_one(pool)
4257 .await?;
4258 Ok(row)
4259 }
4260
4261 /// Apply per-row differing values to a list of instances in one
4262 /// statement. Each instance carries its own PK and its own
4263 /// column values; the generated SQL uses one `CASE pk WHEN ...
4264 /// THEN ... END` per non-PK column, plus a `WHERE pk IN (...)`
4265 /// to scope the update.
4266 ///
4267 /// A `bulk_update(objs, fields)` that
4268 /// updates every non-PK column rather than asking the caller to
4269 /// list them. Returns the number of rows affected.
4270 ///
4271 /// Empty input is a no-op (returns 0). The pattern works on both
4272 /// SQLite and Postgres — the `CASE` expression is SQL-standard.
4273 ///
4274 /// Limitations:
4275 /// - All instances must have a non-default PK (the caller has
4276 /// already loaded the rows). Default-PK instances are skipped.
4277 /// - Bulk-write signals are NOT fired by this path — it's the
4278 /// `Manager::bulk_create` analogue for UPDATE, deliberately
4279 /// silent for speed.
4280 pub async fn bulk_update(&self, instances: Vec<T>) -> Result<u64, crate::orm::write::WriteError>
4281 where
4282 T: serde::Serialize,
4283 {
4284 use crate::orm::write::{WriteError, is_default_pk, json_to_sea_value};
4285 if instances.is_empty() {
4286 return Ok(0);
4287 }
4288 let pk = pk_field::<T>().ok_or_else(|| {
4289 WriteError::Sqlx(sqlx::Error::Protocol(
4290 "bulk_update: model has no primary key".to_string(),
4291 ))
4292 })?;
4293 let pk_name = pk.name;
4294 let pk_ty = pk.ty;
4295
4296 // Serialize every instance, collecting (pk_value, full_map)
4297 // for the CASE branches and the IN clause. Skip rows whose
4298 // PK is still the default sentinel — they were never
4299 // persisted and a bulk UPDATE on them is a no-op anyway.
4300 let mut serialized: Vec<(
4301 serde_json::Value,
4302 serde_json::Map<String, serde_json::Value>,
4303 )> = Vec::with_capacity(instances.len());
4304 for instance in &instances {
4305 let map = serialize_to_map(instance)?;
4306 let pk_val = map.get(pk_name).cloned().unwrap_or(serde_json::Value::Null);
4307 if is_default_pk(pk_ty, &pk_val) {
4308 continue;
4309 }
4310 serialized.push((pk_val, map));
4311 }
4312 if serialized.is_empty() {
4313 return Ok(0);
4314 }
4315
4316 // Collect the list of non-PK columns to update from the
4317 // first row (every row contributes the same column set
4318 // because they're typed instances of T).
4319 let update_cols: Vec<&crate::orm::FieldSpec> =
4320 T::FIELDS.iter().filter(|f| !f.primary_key).collect();
4321
4322 // Build the UPDATE: one CASE per column, IN clause for the
4323 // WHERE. Goes through sea-query's update statement for
4324 // backend portability.
4325 let mut stmt = sea_query::Query::update();
4326 stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
4327
4328 for field in &update_cols {
4329 // CASE pk_col
4330 // WHEN <pk1> THEN <val1>
4331 // WHEN <pk2> THEN <val2>
4332 // ...
4333 // END
4334 let mut case = sea_query::CaseStatement::new();
4335 for (pk_val, map) in &serialized {
4336 let val = map
4337 .get(field.name)
4338 .cloned()
4339 .unwrap_or(serde_json::Value::Null);
4340 let cell = json_to_sea_value(
4341 field.ty,
4342 &val,
4343 field.nullable,
4344 field.name,
4345 fk_pk_hint(field),
4346 )?;
4347 let pk_sea = json_to_sea_value(pk_ty, pk_val, false, pk_name, None)?;
4348 case = case.case(sea_query::Expr::col(Alias::new(pk_name)).eq(pk_sea), cell);
4349 }
4350 stmt.value(Alias::new(field.name), case);
4351 }
4352
4353 // WHERE pk IN (<pk1>, <pk2>, ...)
4354 let pk_seas: Vec<sea_query::Value> = serialized
4355 .iter()
4356 .map(|(pk_val, _)| json_to_sea_value(pk_ty, pk_val, false, pk_name, None))
4357 .collect::<Result<_, _>>()?;
4358 stmt.and_where(sea_query::Expr::col(Alias::new(pk_name)).is_in(pk_seas));
4359
4360 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4361 let affected = match pool {
4362 DbPool::Sqlite(pool) => {
4363 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4364 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4365 .execute(&pool)
4366 .await
4367 .map_err(WriteError::Sqlx)?
4368 .rows_affected()
4369 }
4370 DbPool::Postgres(pool) => {
4371 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4372 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4373 .execute(&pool)
4374 .await
4375 .map_err(WriteError::Sqlx)?
4376 .rows_affected()
4377 }
4378 };
4379 Ok(affected)
4380 }
4381
4382 /// Run a hand-written SQL query and return typed `Vec<T>` rows.
4383 ///
4384 /// The escape hatch for queries the QuerySet builder can't (or
4385 /// shouldn't) model — CTEs, vendor-specific functions, ad-hoc
4386 /// reporting. Delegates to `sqlx::query_as` against the ambient
4387 /// pool and dispatches on backend, so user code stays portable.
4388 /// The string is sent verbatim; no parameter binding (use
4389 /// `Predicate` / the typed query path for parameterised
4390 /// queries). Inject input only after manual sanitisation.
4391 ///
4392 /// Skips the `select_related` / `prefetch_related` chain — those
4393 /// only apply to the typed QuerySet build path.
4394 pub async fn raw(&self, sql: &str) -> Result<Vec<T>, sqlx::Error>
4395 where
4396 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4397 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4398 {
4399 // Routing: `raw` resolves to the READ database (most raw statements
4400 // are SELECTs, which this returns as `Vec<T>`). Under a read/write-
4401 // split router, a raw statement that WRITES must pin the write pool
4402 // explicitly via `.on(&pool)` / `.on_pg(&pool)`, since the router
4403 // cannot inspect arbitrary SQL to know it mutates.
4404 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Read);
4405 match pool {
4406 DbPool::Sqlite(pool) => {
4407 sqlx::query_as::<sqlx::Sqlite, T>(sql)
4408 .fetch_all(&pool)
4409 .await
4410 }
4411 DbPool::Postgres(pool) => {
4412 sqlx::query_as::<sqlx::Postgres, T>(sql)
4413 .fetch_all(&pool)
4414 .await
4415 }
4416 }
4417 }
4418
4419 /// `bulk_create` against an explicit Postgres pool.
4420 pub async fn bulk_create_pg(
4421 &self,
4422 instances: Vec<T>,
4423 pool: &sqlx::PgPool,
4424 ) -> Result<u64, crate::orm::write::WriteError>
4425 where
4426 T: serde::Serialize,
4427 {
4428 if instances.is_empty() {
4429 return Ok(0);
4430 }
4431 let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
4432 let maps = maps?;
4433 let stmt = build_insert_many_for::<T>("postgres", &maps)?;
4434 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4435 let result = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4436 .execute(pool)
4437 .await?;
4438 Ok(result.rows_affected())
4439 }
4440}
4441
4442// QuerySetTx (struct + impl) moved to `super::tx`; re-exported above.
4443
4444impl<T: Model> Manager<T> {
4445 /// Begin a new query on this manager attached to the given open transaction.
4446 ///
4447 /// Sugar for `T::objects().on_tx(tx)` — lets callers skip the intermediate
4448 /// `QuerySet` construction when they want to go straight to a terminal:
4449 ///
4450 /// ```rust,ignore
4451 /// umbral::db::transaction(|tx| async move {
4452 /// let post = Post::objects().on_tx(tx).create(new_post).await?;
4453 /// Ok::<_, MyError>(post)
4454 /// }).await?;
4455 /// ```
4456 pub fn on_tx<'a>(&self, tx: &'a mut crate::db::Transaction) -> QuerySetTx<'a, T> {
4457 self.queryset().on_tx(tx)
4458 }
4459
4460 /// INSERT one row inside `tx` and return the populated row.
4461 ///
4462 /// This is the primary Manager-level entry point for transactional writes.
4463 /// Equivalent to `Post::objects().on_tx(tx).create(instance)` but more
4464 /// ergonomic when you only need the one INSERT (no filter chain needed).
4465 ///
4466 /// ```rust,ignore
4467 /// umbral::db::transaction(|tx| async move {
4468 /// let post = Post::objects().create_in_tx(new_post, tx).await?;
4469 /// Ok::<_, MyError>(post)
4470 /// }).await?;
4471 /// ```
4472 pub async fn create_in_tx(
4473 &self,
4474 instance: T,
4475 tx: &mut crate::db::Transaction,
4476 ) -> Result<T, crate::orm::write::WriteError>
4477 where
4478 T: serde::Serialize
4479 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4480 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
4481 + HydrateRelated,
4482 {
4483 let map = serialize_to_map(&instance)?;
4484 let stmt = build_insert_one_for::<T>(tx.backend_name(), &map)?;
4485 match tx.backend_name() {
4486 "sqlite" => {
4487 let inner = tx.as_sqlite_mut().unwrap();
4488 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4489 let mut row = sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4490 .fetch_one(&mut **inner)
4491 .await?;
4492 row.set_m2m_parent_ids();
4493 Ok(row)
4494 }
4495 _ => {
4496 let inner = tx.as_pg_mut().unwrap();
4497 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4498 let mut row = sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4499 .fetch_one(&mut **inner)
4500 .await?;
4501 row.set_m2m_parent_ids();
4502 Ok(row)
4503 }
4504 }
4505 }
4506
4507 /// INSERT many rows inside `tx`.
4508 ///
4509 /// Returns the number of rows inserted. Empty input is a no-op.
4510 pub async fn bulk_create_in_tx(
4511 &self,
4512 instances: Vec<T>,
4513 tx: &mut crate::db::Transaction,
4514 ) -> Result<u64, crate::orm::write::WriteError>
4515 where
4516 T: serde::Serialize,
4517 {
4518 if instances.is_empty() {
4519 return Ok(0);
4520 }
4521 let maps: Result<Vec<_>, _> = instances.iter().map(serialize_to_map).collect();
4522 let maps = maps?;
4523 let stmt = build_insert_many_for::<T>(tx.backend_name(), &maps)?;
4524 match tx.backend_name() {
4525 "sqlite" => {
4526 let inner = tx.as_sqlite_mut().unwrap();
4527 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4528 let result = sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4529 .execute(&mut **inner)
4530 .await?;
4531 Ok(result.rows_affected())
4532 }
4533 _ => {
4534 let inner = tx.as_pg_mut().unwrap();
4535 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4536 let result = sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4537 .execute(&mut **inner)
4538 .await?;
4539 Ok(result.rows_affected())
4540 }
4541 }
4542 }
4543
4544 // =====================================================================
4545 // Per-instance signal-firing write methods.
4546 //
4547 // `save(instance)` and `delete_instance(instance)` are the methods that
4548 // fire the ORM lifecycle signals (`pre_save` / `post_save` /
4549 // `pre_delete` / `post_delete`). The existing bulk methods
4550 // (`create`, `bulk_create`, `QuerySet::update_values`,
4551 // `QuerySet::delete`) remain signal-free by design:
4552 // bulk operations bypass signals for performance.
4553 //
4554 // Signal name format: `<event>:<table>` — e.g. `post_save:post`.
4555 // Payload shapes:
4556 // save: `{ "instance": <M as JSON>, "created": bool }`
4557 // delete: `{ "instance": <M as JSON> }`
4558 //
4559 // The `created` flag on save follows the convention:
4560 // `true` when the PK is the default sentinel → INSERT path.
4561 // `false` when the PK is non-default → UPDATE path.
4562 // =====================================================================
4563
4564 /// Save one instance, firing `pre_save` + `post_save` signals.
4565 ///
4566 /// Determines INSERT vs UPDATE by checking whether the primary key
4567 /// is the autoincrement sentinel (`0` for integers, nil UUID, empty
4568 /// string). If it is, an INSERT is performed (`created = true`);
4569 /// otherwise an `UPDATE ... WHERE pk = <value>` is run (`created = false`).
4570 ///
4571 /// Returns the row as it exists in the database after the write
4572 /// (populated PK for inserts, same row for updates).
4573 ///
4574 /// ## Signal contract
4575 ///
4576 /// - `pre_save:<table>` fires before the database write with
4577 /// `{ "instance": ..., "created": bool, "actor": ... }`.
4578 /// - `post_save:<table>` fires after the database write with the
4579 /// DB-read-back row and the same envelope keys.
4580 ///
4581 /// The `"actor"` value is set by the nearest enclosing
4582 /// [`crate::signals::with_actor`] scope; `Value::Null` when no
4583 /// scope is active.
4584 ///
4585 /// ## Bulk paths fire bulk signals, not per-row signals
4586 ///
4587 /// `Manager::create`, `Manager::bulk_create`, and
4588 /// `QuerySet::update_values` / `QuerySet::delete` do NOT fire
4589 /// per-row `post_save` / `post_delete`. They fire
4590 /// `bulk_post_save:<table>` / `bulk_post_delete:<table>` once per
4591 /// statement with the affected PKs in the payload. Use `save` /
4592 /// `delete_instance` when per-row signal semantics are needed.
4593 pub async fn save(&self, instance: T) -> Result<T, crate::orm::write::SaveError>
4594 where
4595 T: serde::Serialize
4596 + for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>
4597 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
4598 {
4599 use crate::orm::write::{SaveError, is_default_pk};
4600 // Determine INSERT vs UPDATE by inspecting the PK field.
4601 let pk_field = T::FIELDS
4602 .iter()
4603 .find(|f| f.primary_key)
4604 .ok_or(SaveError::NoPrimaryKey)?;
4605 let map = serialize_to_map(&instance).map_err(SaveError::Write)?;
4606 let pk_val = map
4607 .get(pk_field.name)
4608 .cloned()
4609 .unwrap_or(serde_json::Value::Null);
4610 let created = is_default_pk(pk_field.ty, &pk_val);
4611
4612 // Fire pre_save before the write.
4613 crate::signals::emit_pre_save::<T>(&instance, created).await;
4614
4615 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4616 let backend = pool.backend_name();
4617
4618 if created {
4619 // INSERT path.
4620 let stmt = build_insert_one_for::<T>(backend, &map).map_err(SaveError::Write)?;
4621 let row = match pool {
4622 DbPool::Sqlite(pool) => {
4623 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4624 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4625 .fetch_one(&pool)
4626 .await
4627 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4628 }
4629 DbPool::Postgres(pool) => {
4630 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4631 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4632 .fetch_one(&pool)
4633 .await
4634 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4635 }
4636 };
4637 // Fire post_save with the DB-populated row.
4638 crate::signals::emit_post_save::<T>(&row, true).await;
4639 Ok(row)
4640 } else {
4641 // UPDATE path: UPDATE ... WHERE <pk> = <value> RETURNING *.
4642 use sea_query::{Alias, Expr, Query};
4643
4644 // gaps2 #92 — snapshot the pre-UPDATE row for `pre_update` /
4645 // `post_update` subscribers, but ONLY when one exists. The
4646 // extra SELECT-by-PK is gated on `has_subscribers` so the
4647 // common UPDATE path (no `*_update` listener) pays nothing.
4648 // Best-effort TOCTOU: the snapshot reads the row before the
4649 // UPDATE; a concurrent writer between the two is accepted.
4650 let pre_table = T::TABLE;
4651 let want_pre = crate::signals::has_subscribers(&format!("pre_update:{pre_table}"));
4652 let want_post = crate::signals::has_subscribers(&format!("post_update:{pre_table}"));
4653 let previous: Option<T> = if want_pre || want_post {
4654 let mut sel = Query::select();
4655 sel.from(crate::db::router::schema_qualified_table(T::TABLE));
4656 for field in T::FIELDS {
4657 sel.column(Alias::new(field.name));
4658 }
4659 let pk_sea_sel = crate::orm::write::json_to_sea_value(
4660 pk_field.ty,
4661 &pk_val,
4662 false,
4663 pk_field.name,
4664 None,
4665 )
4666 .map_err(SaveError::Write)?;
4667 sel.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea_sel));
4668 sel.limit(1);
4669 match pool {
4670 DbPool::Sqlite(ref pool) => {
4671 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
4672 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4673 .fetch_optional(pool)
4674 .await
4675 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4676 }
4677 DbPool::Postgres(ref pool) => {
4678 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
4679 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4680 .fetch_optional(pool)
4681 .await
4682 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4683 }
4684 }
4685 } else {
4686 None
4687 };
4688 // Fire pre_update before the UPDATE when both a snapshot exists
4689 // and a subscriber wants it.
4690 if want_pre {
4691 if let Some(prev) = &previous {
4692 crate::signals::emit_pre_update::<T>(prev, &instance).await;
4693 }
4694 }
4695
4696 let mut stmt = Query::update();
4697 stmt.table(crate::db::router::schema_qualified_table(T::TABLE));
4698 for field in T::FIELDS {
4699 if field.primary_key {
4700 continue;
4701 }
4702 let val = map
4703 .get(field.name)
4704 .cloned()
4705 .unwrap_or(serde_json::Value::Null);
4706 let sea_val = crate::orm::write::json_to_sea_value(
4707 field.ty,
4708 &val,
4709 field.nullable,
4710 field.name,
4711 fk_pk_hint(field),
4712 )
4713 .map_err(SaveError::Write)?;
4714 stmt.value(Alias::new(field.name), sea_val);
4715 }
4716 // WHERE pk = <value>
4717 let pk_sea = crate::orm::write::json_to_sea_value(
4718 pk_field.ty,
4719 &pk_val,
4720 false,
4721 pk_field.name,
4722 None,
4723 )
4724 .map_err(SaveError::Write)?;
4725 stmt.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4726 // RETURNING * so we can return the updated row.
4727 stmt.returning_all();
4728
4729 let row = match pool {
4730 DbPool::Sqlite(pool) => {
4731 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4732 sqlx::query_as_with::<sqlx::Sqlite, T, _>(&sql, values)
4733 .fetch_one(&pool)
4734 .await
4735 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4736 }
4737 DbPool::Postgres(pool) => {
4738 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4739 sqlx::query_as_with::<sqlx::Postgres, T, _>(&sql, values)
4740 .fetch_one(&pool)
4741 .await
4742 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4743 }
4744 };
4745 // Fire post_save with created=false.
4746 crate::signals::emit_post_save::<T>(&row, false).await;
4747 // gaps2 #92 — post_update carries the pre-UPDATE snapshot (old)
4748 // and the freshly-written row (new). Only when a subscriber
4749 // exists and the snapshot was captured.
4750 if want_post {
4751 if let Some(prev) = &previous {
4752 crate::signals::emit_post_update::<T>(prev, &row).await;
4753 }
4754 }
4755 Ok(row)
4756 }
4757 }
4758
4759 /// Delete one instance by primary key, firing `pre_delete` +
4760 /// `post_delete` signals.
4761 ///
4762 /// Issues `DELETE FROM <table> WHERE <pk> = <value>`. Returns the
4763 /// number of rows affected (0 if the row was already gone, 1 otherwise).
4764 ///
4765 /// ## Signal contract
4766 ///
4767 /// - `pre_delete:<table>` fires before the DELETE with
4768 /// `{ "instance": ..., "actor": ... }`.
4769 /// - `post_delete:<table>` fires after the DELETE with the same
4770 /// payload shape.
4771 ///
4772 /// The `"actor"` value is set by the nearest enclosing
4773 /// [`crate::signals::with_actor`] scope; `Value::Null` when no
4774 /// scope is active. The instance value passed to both signals is
4775 /// the value supplied by the caller — not a DB read-back. If you
4776 /// need the freshest DB state before deletion, fetch it first with
4777 /// `.get(...)` then pass to this method.
4778 ///
4779 /// ## Bulk paths fire bulk signals
4780 ///
4781 /// `QuerySet::delete()` (the filter-chain DELETE) fires
4782 /// `bulk_post_delete:<table>` with the list of affected PKs, not
4783 /// per-row `pre_delete` / `post_delete`. Use `delete_instance` for
4784 /// per-row signal semantics.
4785 pub async fn delete_instance(&self, instance: &T) -> Result<u64, crate::orm::write::SaveError>
4786 where
4787 T: serde::Serialize,
4788 {
4789 use crate::orm::write::SaveError;
4790 let pk_field = T::FIELDS
4791 .iter()
4792 .find(|f| f.primary_key)
4793 .ok_or(SaveError::NoPrimaryKey)?;
4794 let map = serialize_to_map(instance).map_err(SaveError::Write)?;
4795 let pk_val = map
4796 .get(pk_field.name)
4797 .cloned()
4798 .unwrap_or(serde_json::Value::Null);
4799
4800 // Fire pre_delete before the write.
4801 crate::signals::emit_pre_delete::<T>(instance).await;
4802
4803 let pk_sea =
4804 crate::orm::write::json_to_sea_value(pk_field.ty, &pk_val, false, pk_field.name, None)
4805 .map_err(SaveError::Write)?;
4806
4807 use sea_query::{Alias, Expr, Query};
4808 // Feature #72 — soft-delete redirect. For models tagged
4809 // `#[umbral(soft_delete)]`, set deleted_at instead of issuing
4810 // DELETE. Pre/post_delete signals still fire because the
4811 // logical contract ("this row is gone from the visible
4812 // table") is preserved — only the physical SQL changed.
4813 // Hard-delete is not exposed through delete_instance (it's
4814 // a typed per-row helper); call `QuerySet::filter(pk =
4815 // instance.id).hard_delete().delete()` when you need it.
4816 let stmt_sql = if T::SOFT_DELETE {
4817 let now = chrono::Utc::now();
4818 let mut up = Query::update();
4819 up.table(crate::db::router::schema_qualified_table(T::TABLE));
4820 up.value(
4821 Alias::new("deleted_at"),
4822 sea_query::Value::ChronoDateTimeUtc(Some(Box::new(now))),
4823 );
4824 up.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4825 // Idempotency guard — don't bump an already-set timestamp.
4826 up.and_where(Expr::col(Alias::new("deleted_at")).is_null());
4827 SoftOrHardStatement::Update(up)
4828 } else {
4829 let mut stmt = Query::delete();
4830 stmt.from_table(crate::db::router::schema_qualified_table(T::TABLE));
4831 stmt.and_where(Expr::col(Alias::new(pk_field.name)).eq(pk_sea));
4832 SoftOrHardStatement::Delete(stmt)
4833 };
4834
4835 let pool = resolve_pool::<T>(None, crate::db::RouteOp::Write);
4836 let affected = match (&pool, stmt_sql) {
4837 (DbPool::Sqlite(pool), SoftOrHardStatement::Delete(stmt)) => {
4838 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4839 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4840 .execute(pool)
4841 .await
4842 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4843 .rows_affected()
4844 }
4845 (DbPool::Postgres(pool), SoftOrHardStatement::Delete(stmt)) => {
4846 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4847 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4848 .execute(pool)
4849 .await
4850 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4851 .rows_affected()
4852 }
4853 (DbPool::Sqlite(pool), SoftOrHardStatement::Update(stmt)) => {
4854 let (sql, values) = stmt.build_sqlx(SqliteQueryBuilder);
4855 sqlx::query_with::<sqlx::Sqlite, _>(&sql, values)
4856 .execute(pool)
4857 .await
4858 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4859 .rows_affected()
4860 }
4861 (DbPool::Postgres(pool), SoftOrHardStatement::Update(stmt)) => {
4862 let (sql, values) = stmt.build_sqlx(PostgresQueryBuilder);
4863 sqlx::query_with::<sqlx::Postgres, _>(&sql, values)
4864 .execute(pool)
4865 .await
4866 .map_err(|e| SaveError::Write(crate::orm::write::WriteError::Sqlx(e)))?
4867 .rows_affected()
4868 }
4869 };
4870
4871 // Fire post_delete after the write.
4872 crate::signals::emit_post_delete::<T>(instance).await;
4873
4874 Ok(affected)
4875 }
4876}
4877
4878/// Internal enum used by `Manager::delete_instance` to dispatch on
4879/// soft-delete vs hard-delete without duplicating the four backend ×
4880/// statement match arms inline. One variant per SQL shape.
4881enum SoftOrHardStatement {
4882 Delete(sea_query::DeleteStatement),
4883 Update(sea_query::UpdateStatement),
4884}
4885
4886// Hydration helpers (hydrate_select_related, hydrate_select_related_nested,
4887// hydrate_prefetch_related, hydrate_reverse_fk_for_field, fetch_related_as_json,
4888// fetch_reverse_fk_children) moved to `super::hydration`.
4889
4890// Insert builders (serialize_to_map, build_insert_one_for, build_insert_many_for)
4891// and pk_field moved to `super::write_helpers`.
4892
4893// `decode_agg_sqlite` / `decode_agg_pg` moved to
4894// `backend_sqlite::decode_agg` / `backend_pg::decode_agg`.
4895
4896// =========================================================================
4897// #113: M2M-via-JOIN dedup decoders
4898//
4899// When .join_related() includes one or more M2M fields, the result
4900// set has one row per (parent, child) combo, so a parent with N
4901// matching children appears N times. The caller wants ONE T per
4902// parent with the M2M slot populated.
4903//
4904// The algorithm:
4905// - First time we see a parent PK: decode T via FromRow, hydrate
4906// any FK joins from this row.
4907// - Every subsequent row for the same parent: extract the M2M
4908// child JsonValue (or skip on LEFT JOIN miss).
4909// - Dedup children by (parent_pk, field, child_pk) so that
4910// joining TWO M2Ms doesn't multiply the child sets (the JOIN
4911// produces parent × m2m1 × m2m2 rows).
4912// - Hand each parent its M2M buckets via set_m2m_resolved_json.
4913// =========================================================================
4914
4915fn dedup_decode_sqlite<T: Model + HydrateRelated>(
4916 raw_rows: &[sqlx::sqlite::SqliteRow],
4917 fk_join_fields: &[String],
4918 m2m_join_fields: &[String],
4919) -> Result<Vec<T>, sqlx::Error>
4920where
4921 T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow>,
4922{
4923 // PK-agnostic dedup: read the parent PK back through the shape-aware
4924 // decoder (using the parent model's PK SqlType) and key by `pk_key`,
4925 // so i64 / String / Uuid parents all dedup correctly.
4926 let parent_pk_col = crate::migrate::ModelMeta::for_::<T>()
4927 .fields
4928 .into_iter()
4929 .find(|c| c.primary_key)
4930 .ok_or_else(|| {
4931 sqlx::Error::Protocol(format!(
4932 "umbral::orm::join_related: model `{}` has no primary key, M2M JOIN \
4933 dedup requires one",
4934 T::NAME
4935 ))
4936 })?;
4937 let registered = crate::migrate::registered_models();
4938 let mut typed: Vec<T> = Vec::new();
4939 let mut idx_by_pk: HashMap<String, usize> = HashMap::new();
4940 // (parent_pk_key, field) → Vec<JsonValue> + a Set of seen child PK keys.
4941 let mut buckets: HashMap<(String, String), Vec<JsonValue>> = HashMap::new();
4942 let mut seen_children: HashMap<(String, String), std::collections::HashSet<String>> =
4943 HashMap::new();
4944 for row in raw_rows {
4945 let Ok(parent_json) = crate::orm::dynamic::decode_to_json(row, &parent_pk_col) else {
4946 continue;
4947 };
4948 let parent_key = crate::orm::pk_key(&parent_json);
4949 if let std::collections::hash_map::Entry::Vacant(e) = idx_by_pk.entry(parent_key.clone()) {
4950 let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
4951 backend_sqlite::hydrate_joined_rels::<T>(&mut t, row, fk_join_fields)?;
4952 e.insert(typed.len());
4953 typed.push(t);
4954 }
4955 for m2m_field in m2m_join_fields {
4956 // The M2M slot is keyed by the FIRST segment (the M2M field
4957 // name); the full `m2m_field` path may carry an onward FK
4958 // chain (`"tags__category"`) the child decoder nests.
4959 let m2m_seg = m2m_field.split("__").next().unwrap_or(m2m_field.as_str());
4960 let Some(rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg) else {
4961 continue;
4962 };
4963 let Some(child_meta) = registered.iter().find(|m| m.table == rel.target_table) else {
4964 continue;
4965 };
4966 let Some(child_json) =
4967 backend_sqlite::extract_m2m_child_json::<T>(row, m2m_field, child_meta)?
4968 else {
4969 continue;
4970 };
4971 // Dedup by child PK (PK-agnostic) so multi-M2M cartesian
4972 // doesn't duplicate this field's children.
4973 let child_key = child_json
4974 .as_object()
4975 .and_then(|m| {
4976 let pk_col = child_meta.fields.iter().find(|c| c.primary_key)?;
4977 m.get(&pk_col.name).map(crate::orm::pk_key)
4978 })
4979 .unwrap_or_default();
4980 let key = (parent_key.clone(), m2m_seg.to_string());
4981 let seen = seen_children.entry(key.clone()).or_default();
4982 if seen.insert(child_key) {
4983 buckets.entry(key).or_default().push(child_json);
4984 }
4985 }
4986 }
4987 for ((parent_key, field), children) in buckets {
4988 if let Some(&idx) = idx_by_pk.get(&parent_key) {
4989 typed[idx].set_m2m_resolved_json(&field, children);
4990 }
4991 }
4992 // LEFT JOIN miss handling: walk every (parent, field) pair
4993 // we expected to populate and zero-init any slot that never
4994 // got a hit. Without this a parent with no matching M2M
4995 // children would leave its slot None — distinguishable from
4996 // "loaded, empty" only by callers checking the absence.
4997 for (parent_key, &idx) in idx_by_pk.iter() {
4998 for field in m2m_join_fields {
4999 let seg = field.split("__").next().unwrap_or(field.as_str());
5000 if !seen_children.contains_key(&(parent_key.clone(), seg.to_string())) {
5001 typed[idx].set_m2m_resolved_json(seg, Vec::new());
5002 }
5003 }
5004 }
5005 Ok(typed)
5006}
5007
5008fn dedup_decode_pg<T: Model + HydrateRelated>(
5009 raw_rows: &[sqlx::postgres::PgRow],
5010 fk_join_fields: &[String],
5011 m2m_join_fields: &[String],
5012) -> Result<Vec<T>, sqlx::Error>
5013where
5014 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
5015{
5016 // PK-agnostic dedup — see the SQLite variant.
5017 let parent_pk_col = crate::migrate::ModelMeta::for_::<T>()
5018 .fields
5019 .into_iter()
5020 .find(|c| c.primary_key)
5021 .ok_or_else(|| {
5022 sqlx::Error::Protocol(format!(
5023 "umbral::orm::join_related: model `{}` has no primary key, M2M JOIN \
5024 dedup requires one",
5025 T::NAME
5026 ))
5027 })?;
5028 let registered = crate::migrate::registered_models();
5029 let mut typed: Vec<T> = Vec::new();
5030 let mut idx_by_pk: HashMap<String, usize> = HashMap::new();
5031 let mut buckets: HashMap<(String, String), Vec<JsonValue>> = HashMap::new();
5032 let mut seen_children: HashMap<(String, String), std::collections::HashSet<String>> =
5033 HashMap::new();
5034 for row in raw_rows {
5035 let Ok(parent_json) = crate::orm::dynamic::decode_pg_to_json(row, &parent_pk_col) else {
5036 continue;
5037 };
5038 let parent_key = crate::orm::pk_key(&parent_json);
5039 if let std::collections::hash_map::Entry::Vacant(e) = idx_by_pk.entry(parent_key.clone()) {
5040 let mut t = <T as sqlx::FromRow<_>>::from_row(row)?;
5041 backend_pg::hydrate_joined_rels::<T>(&mut t, row, fk_join_fields)?;
5042 e.insert(typed.len());
5043 typed.push(t);
5044 }
5045 for m2m_field in m2m_join_fields {
5046 let m2m_seg = m2m_field.split("__").next().unwrap_or(m2m_field.as_str());
5047 let Some(rel) = T::M2M_RELATIONS.iter().find(|r| r.field_name == m2m_seg) else {
5048 continue;
5049 };
5050 let Some(child_meta) = registered.iter().find(|m| m.table == rel.target_table) else {
5051 continue;
5052 };
5053 let Some(child_json) =
5054 backend_pg::extract_m2m_child_json::<T>(row, m2m_field, child_meta)?
5055 else {
5056 continue;
5057 };
5058 let child_key = child_json
5059 .as_object()
5060 .and_then(|m| {
5061 let pk_col = child_meta.fields.iter().find(|c| c.primary_key)?;
5062 m.get(&pk_col.name).map(crate::orm::pk_key)
5063 })
5064 .unwrap_or_default();
5065 let key = (parent_key.clone(), m2m_seg.to_string());
5066 let seen = seen_children.entry(key.clone()).or_default();
5067 if seen.insert(child_key) {
5068 buckets.entry(key).or_default().push(child_json);
5069 }
5070 }
5071 }
5072 for ((parent_key, field), children) in buckets {
5073 if let Some(&idx) = idx_by_pk.get(&parent_key) {
5074 typed[idx].set_m2m_resolved_json(&field, children);
5075 }
5076 }
5077 // Same LEFT JOIN miss zero-init as the SQLite path.
5078 for (parent_key, &idx) in idx_by_pk.iter() {
5079 for field in m2m_join_fields {
5080 let seg = field.split("__").next().unwrap_or(field.as_str());
5081 if !seen_children.contains_key(&(parent_key.clone(), seg.to_string())) {
5082 typed[idx].set_m2m_resolved_json(seg, Vec::new());
5083 }
5084 }
5085 }
5086 Ok(typed)
5087}