1use smallvec::SmallVec;
2use std::marker::PhantomData;
3
4use sqlx::{
5 postgres::PgRow,
6 Executor,
7 FromRow,
8 Postgres,
9};
10use sqlxo_traits::{
11 AliasedColumn,
12 FullTextSearchConfig,
13 FullTextSearchable,
14 GetDeleteMarker,
15 JoinIdentifiable,
16 JoinKind,
17 JoinNavigationModel,
18 JoinPath,
19 PrimaryKey,
20 QueryContext,
21 SqlWrite,
22};
23
24use crate::{
25 and,
26 blocks::{
27 BuildableFilter,
28 BuildableJoin,
29 BuildablePage,
30 BuildableSort,
31 Expression,
32 Page,
33 Pagination,
34 QualifiedColumn,
35 ReadHead,
36 SelectProjection,
37 SelectType,
38 SortOrder,
39 SqlWriter,
40 },
41 order_by,
42 select::{
43 AggregateFunction,
44 AggregateSelection,
45 GroupByList,
46 HavingList,
47 HavingPredicate,
48 SelectionColumn,
49 SelectionEntry,
50 SelectionList,
51 },
52 Buildable,
53 ExecutablePlan,
54 FetchablePlan,
55 Planable,
56 PrimaryKeyExpression,
57 QueryBuilder,
58};
59
60#[allow(dead_code)]
62pub trait BuildableReadQuery<C, Row = <C as QueryContext>::Model>:
63 Buildable<C, Row = Row, Plan: Planable<C, Row>>
64 + BuildableFilter<C>
65 + BuildableJoin<C>
66 + BuildableSort<C>
67 + BuildablePage<C>
68where
69 C: QueryContext,
70 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
71{
72}
73
74pub(crate) trait DynFullTextSearchPlan: Send + Sync {
75 fn write_condition(
76 &self,
77 w: &mut SqlWriter,
78 base_alias: &str,
79 joins: Option<&[JoinPath]>,
80 );
81
82 fn write_rank_expr(
83 &self,
84 w: &mut SqlWriter,
85 base_alias: &str,
86 joins: Option<&[JoinPath]>,
87 );
88
89 fn include_rank(&self) -> bool;
90}
91
92struct ModelFullTextSearchPlan<M>
93where
94 M: FullTextSearchable,
95{
96 config: M::FullTextSearchConfig,
97 _marker: PhantomData<M>,
98}
99
100impl<M> ModelFullTextSearchPlan<M>
101where
102 M: FullTextSearchable,
103{
104 fn new(config: M::FullTextSearchConfig) -> Self {
105 Self {
106 config,
107 _marker: PhantomData,
108 }
109 }
110}
111
112impl<M> DynFullTextSearchPlan for ModelFullTextSearchPlan<M>
113where
114 M: FullTextSearchable + Send + Sync + 'static,
115 M::FullTextSearchConfig: Send + Sync,
116{
117 fn write_condition(
118 &self,
119 w: &mut SqlWriter,
120 base_alias: &str,
121 joins: Option<&[JoinPath]>,
122 ) {
123 M::write_search_predicate(w, base_alias, joins, &self.config);
124 }
125
126 fn write_rank_expr(
127 &self,
128 w: &mut SqlWriter,
129 base_alias: &str,
130 joins: Option<&[JoinPath]>,
131 ) {
132 M::write_search_score(w, base_alias, joins, &self.config);
133 }
134
135 fn include_rank(&self) -> bool {
136 self.config.include_rank()
137 }
138}
139
140pub struct ReadQueryPlan<'a, C: QueryContext, Row = <C as QueryContext>::Model>
141{
142 pub(crate) joins: Option<Vec<JoinPath>>,
143 pub(crate) where_expr: Option<Expression<C::Query>>,
144 pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
145 pub(crate) pagination: Option<Pagination>,
146 pub(crate) table: &'a str,
147 pub(crate) include_deleted: bool,
148 pub(crate) delete_marker_field: Option<&'a str>,
149 pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
150 pub(crate) group_by: Option<Vec<SelectionColumn>>,
151 pub(crate) having: Option<Vec<HavingPredicate>>,
152 pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
153 pub(crate) aggregate_filter: Option<AggregateFilter>,
154 row: PhantomData<Row>,
155}
156
157#[derive(Clone)]
158pub struct AggregateFilter {
159 pub columns: SmallVec<[&'static str; 2]>,
160 pub predicates: Vec<HavingPredicate>,
161}
162
163fn build_alias_lookup(
164 joins: Option<&[JoinPath]>,
165) -> Vec<(&'static str, String)> {
166 let mut aliases = Vec::new();
167
168 if let Some(paths) = joins {
169 for path in paths {
170 let mut alias_prefix = String::new();
171 for segment in path.segments() {
172 if let Some(through) = segment.descriptor.through {
173 let mut through_alias = alias_prefix.clone();
174 through_alias.push_str(through.alias_segment);
175 aliases.push((through.table, through_alias));
176 }
177 alias_prefix.push_str(segment.descriptor.alias_segment);
178 aliases.push((
179 segment.descriptor.right_table,
180 alias_prefix.clone(),
181 ));
182 }
183 }
184 }
185
186 aliases
187}
188
189fn path_is_prefix(prefix: &JoinPath, candidate: &JoinPath) -> bool {
190 let prefix_segments = prefix.segments();
191 let candidate_segments = candidate.segments();
192
193 if prefix_segments.len() > candidate_segments.len() {
194 return false;
195 }
196
197 prefix_segments
198 .iter()
199 .zip(candidate_segments.iter())
200 .all(|(left, right)| left == right)
201}
202
203fn merge_unique_join_paths(mut paths: Vec<JoinPath>) -> Vec<JoinPath> {
204 let mut unique: Vec<JoinPath> = Vec::new();
205 for path in paths.drain(..) {
206 if unique.iter().any(|existing| existing == &path) {
207 continue;
208 }
209 unique.push(path);
210 }
211
212 let mut merged: Vec<JoinPath> = Vec::new();
213 for idx in 0..unique.len() {
214 let current = &unique[idx];
215 let shadowed = unique.iter().enumerate().any(|(other_idx, other)| {
216 other_idx != idx &&
217 path_is_prefix(current, other) &&
218 current.len() < other.len()
219 });
220
221 if !shadowed {
222 merged.push(current.clone());
223 }
224 }
225
226 merged
227}
228
229fn resolve_alias_for_table(
230 table: &'static str,
231 column: &'static str,
232 base_table: &str,
233 aliases: &[(&'static str, String)],
234) -> String {
235 if table == base_table {
236 return base_table.to_string();
237 }
238
239 let mut matches =
240 aliases.iter().filter(|(tbl, _)| *tbl == table).peekable();
241
242 let Some((_, alias)) = matches.next() else {
243 panic!(
244 "`take!` requested column `{table}.{column}` but `{table}` is not \
245 part of the join set"
246 );
247 };
248
249 if matches.peek().is_some() {
250 panic!(
251 "`take!` requested column `{table}.{column}` but `{table}` is \
252 joined multiple times; disambiguation is not implemented yet"
253 );
254 }
255
256 alias.clone()
257}
258
259fn resolve_selection_columns(
260 selection: &[SelectionColumn],
261 base_table: &str,
262 joins: Option<&[JoinPath]>,
263) -> SmallVec<[QualifiedColumn; 4]> {
264 let aliases = build_alias_lookup(joins);
265 let mut resolved = SmallVec::new();
266
267 for col in selection {
268 resolved.push(resolve_selection_column(col, base_table, &aliases));
269 }
270
271 resolved
272}
273
274fn resolve_selection_column(
275 column: &SelectionColumn,
276 base_table: &str,
277 aliases: &[(&'static str, String)],
278) -> QualifiedColumn {
279 let table_alias = resolve_alias_for_table(
280 column.table,
281 column.column,
282 base_table,
283 aliases,
284 );
285 QualifiedColumn {
286 table_alias,
287 column: column.column,
288 }
289}
290
291fn format_aggregate_expression(
292 selection: &AggregateSelection,
293 base_table: &str,
294 aliases: &[(&'static str, String)],
295) -> String {
296 match selection.column {
297 Some(col) => {
298 let qualified = resolve_selection_column(&col, base_table, aliases);
299 match selection.function {
300 AggregateFunction::CountDistinct => format!(
301 r#"COUNT(DISTINCT "{}"."{}")"#,
302 qualified.table_alias, qualified.column
303 ),
304 _ => format!(
305 r#"{}("{}"."{}")"#,
306 selection.function.sql_name(),
307 qualified.table_alias,
308 qualified.column
309 ),
310 }
311 }
312 None => format!("{}(*)", selection.function.sql_name()),
313 }
314}
315
316fn write_having_predicate(
317 predicate: &HavingPredicate,
318 writer: &mut SqlWriter,
319 base_table: &str,
320 aliases: &[(&'static str, String)],
321) {
322 let expr =
323 format_aggregate_expression(&predicate.selection, base_table, aliases);
324 writer.push(&expr);
325 writer.push(" ");
326 writer.push(predicate.comparator.as_str());
327 writer.push(" ");
328 predicate.bind_value(writer);
329}
330
331impl<'a, C, Row> ReadQueryPlan<'a, C, Row>
332where
333 C: QueryContext,
334 C::Model: JoinNavigationModel,
335{
336 fn compute_aggregate_filter(&mut self) {
337 if self.having.is_none() ||
338 self.selection.is_some() ||
339 self.group_by.is_some()
340 {
341 return;
342 }
343
344 let pk_columns = <C::Model as PrimaryKey>::PRIMARY_KEY;
345 if pk_columns.is_empty() {
346 return;
347 }
348
349 let Some(predicates) = self.having.take() else {
350 return;
351 };
352 if predicates.is_empty() {
353 return;
354 }
355
356 let columns = SmallVec::<[&'static str; 2]>::from_slice(pk_columns);
357 self.aggregate_filter = Some(AggregateFilter {
358 columns,
359 predicates,
360 });
361 }
362
363 fn push_aggregate_filter_clause(
364 &self,
365 writer: &mut SqlWriter,
366 filter: &AggregateFilter,
367 ) {
368 writer.push_where_raw(|w| {
369 if filter.columns.len() == 1 {
370 let col = filter.columns[0];
371 w.push(&format!(r#""{}"."{}""#, self.table, col));
372 } else {
373 w.push("(");
374 for (idx, col) in filter.columns.iter().enumerate() {
375 if idx > 0 {
376 w.push(", ");
377 }
378 w.push(&format!(r#""{}"."{}""#, self.table, col));
379 }
380 w.push(")");
381 }
382 w.push(" IN (");
383 self.write_aggregate_subquery(w, filter);
384 w.push(")");
385 });
386 }
387
388 fn write_aggregate_subquery(
389 &self,
390 writer: &mut SqlWriter,
391 filter: &AggregateFilter,
392 ) {
393 writer.push("SELECT ");
394 for (idx, col) in filter.columns.iter().enumerate() {
395 if idx > 0 {
396 writer.push(", ");
397 }
398 writer.push(&format!(r#""{}"."{}""#, self.table, col));
399 }
400 writer.push(" FROM ");
401 writer.push(self.table);
402
403 if let Some(js) = &self.joins {
404 for path in js {
405 push_join_path_inline(
406 writer.query_builder_mut(),
407 path,
408 self.table,
409 );
410 }
411 }
412
413 self.write_subquery_filters(writer);
414 self.write_subquery_group_by(writer, filter);
415 self.write_subquery_having(writer, filter);
416 }
417
418 fn write_subquery_filters(&self, writer: &mut SqlWriter) {
419 let mut has_clause = false;
420
421 if !self.include_deleted {
422 if let Some(delete_field) = self.delete_marker_field {
423 writer.push(" WHERE ");
424 writer.push(&format!(
425 r#""{}"."{}" IS NULL"#,
426 self.table, delete_field
427 ));
428 has_clause = true;
429 }
430 }
431
432 if let Some(expr) = &self.where_expr {
433 if has_clause {
434 writer.push(" AND (");
435 } else {
436 writer.push(" WHERE (");
437 }
438 expr.write(writer);
439 writer.push(")");
440 has_clause = true;
441 }
442
443 if let Some(fts) = &self.full_text_search {
444 if has_clause {
445 writer.push(" AND (");
446 } else {
447 writer.push(" WHERE (");
448 }
449 fts.write_condition(writer, self.table, self.joins.as_deref());
450 writer.push(")");
451 }
452 }
453
454 fn write_subquery_group_by(
455 &self,
456 writer: &mut SqlWriter,
457 filter: &AggregateFilter,
458 ) {
459 writer.push(" GROUP BY ");
460 for (idx, col) in filter.columns.iter().enumerate() {
461 if idx > 0 {
462 writer.push(", ");
463 }
464 writer.push(&format!(r#""{}"."{}""#, self.table, col));
465 }
466 }
467
468 fn write_subquery_having(
469 &self,
470 writer: &mut SqlWriter,
471 filter: &AggregateFilter,
472 ) {
473 if filter.predicates.is_empty() {
474 return;
475 }
476
477 let aliases = build_alias_lookup(self.joins.as_deref());
478 writer.push(" HAVING ");
479 for (idx, predicate) in filter.predicates.iter().enumerate() {
480 if idx > 0 {
481 writer.push(" AND ");
482 }
483 write_having_predicate(predicate, writer, self.table, &aliases);
484 }
485 }
486
487 fn to_query_builder(
488 &self,
489 select_type: SelectType,
490 ) -> sqlx::QueryBuilder<'static, Postgres> {
491 let effective_select = self.select_type_for(select_type.clone());
492 let head = ReadHead::new(self.table, effective_select);
493 let mut w = SqlWriter::new(head);
494
495 if let Some(js) = &self.joins {
496 w.push_joins(js, self.table);
497 }
498
499 self.push_where_clause(&mut w);
500 if let Some(filter) = &self.aggregate_filter {
501 self.push_aggregate_filter_clause(&mut w, filter);
502 } else {
503 self.push_group_by_clause(&mut w);
504 self.push_having_clause(&mut w);
505 }
506
507 if let Some(s) = &self.sort_expr {
508 w.push_sort(s);
509 } else if !matches!(select_type, SelectType::Exists) {
510 if let Some(fts) = &self.full_text_search {
511 if fts.include_rank() {
512 w.push_order_by_raw(|writer| {
513 fts.write_rank_expr(
514 writer,
515 self.table,
516 self.joins.as_deref(),
517 );
518 writer.push(" DESC");
519 });
520 }
521 }
522 }
523
524 if let SelectType::Exists = select_type {
525 w.push_pagination(&Pagination {
526 page: 0,
527 page_size: 1,
528 });
529 } else if let Some(p) = &self.pagination {
530 w.push_pagination(p);
531 }
532
533 if let SelectType::Exists = select_type {
534 w.push(")");
535 }
536
537 w.into_builder()
538 }
539
540 fn select_type_for(&self, base: SelectType) -> SelectType {
541 let resolved = match base {
542 SelectType::Star => self
543 .selection
544 .as_ref()
545 .map(|s| self.selection_select_type(s))
546 .unwrap_or(SelectType::Star),
547 other => other,
548 };
549
550 self.apply_join_extras(resolved)
551 }
552
553 fn selection_select_type(
554 &self,
555 selection: &SelectionList<Row, SelectionEntry>,
556 ) -> SelectType {
557 let mut has_columns = false;
558 let mut has_aggregates = false;
559 for entry in selection.entries() {
560 match entry {
561 SelectionEntry::Column(_) => has_columns = true,
562 SelectionEntry::Aggregate(_) => has_aggregates = true,
563 }
564 }
565
566 if has_columns && has_aggregates && self.group_by.is_none() {
567 panic!(
568 "`group_by!` must be provided when selecting columns \
569 alongside aggregates"
570 );
571 }
572
573 if has_columns && !has_aggregates {
574 let mut cols: SmallVec<[SelectionColumn; 4]> =
575 SmallVec::with_capacity(selection.entries().len());
576 for entry in selection.entries() {
577 if let SelectionEntry::Column(col) = entry {
578 cols.push(*col);
579 }
580 }
581 return SelectType::Columns(resolve_selection_columns(
582 &cols,
583 self.table,
584 self.joins.as_deref(),
585 ));
586 }
587
588 let projections = self.build_projections(selection);
589 SelectType::Projection(projections)
590 }
591
592 fn build_projections(
593 &self,
594 selection: &SelectionList<Row, SelectionEntry>,
595 ) -> Vec<SelectProjection> {
596 let aliases = build_alias_lookup(self.joins.as_deref());
597 selection
598 .entries()
599 .iter()
600 .enumerate()
601 .map(|(idx, entry)| match entry {
602 SelectionEntry::Column(col) => {
603 let qualified =
604 resolve_selection_column(col, self.table, &aliases);
605 SelectProjection {
606 expression: format!(
607 r#""{}"."{}""#,
608 qualified.table_alias, qualified.column
609 ),
610 alias: None,
611 }
612 }
613 SelectionEntry::Aggregate(agg) => {
614 let expr =
615 format_aggregate_expression(agg, self.table, &aliases);
616 let alias = format!(r#"__sqlxo_sel_{}"#, idx);
617 SelectProjection {
618 expression: expr,
619 alias: Some(alias),
620 }
621 }
622 })
623 .collect()
624 }
625
626 fn apply_join_extras(&self, select: SelectType) -> SelectType {
627 let extras = self.join_projection_columns();
628 if extras.is_empty() {
629 return select;
630 }
631
632 match select {
633 SelectType::Star => SelectType::StarWithExtras(extras),
634 SelectType::StarAndCount => SelectType::StarAndCountExtras(extras),
635 other => other,
636 }
637 }
638
639 fn join_projection_columns(&self) -> SmallVec<[AliasedColumn; 4]> {
640 if self.selection.is_some() {
641 return SmallVec::new();
642 }
643
644 C::Model::collect_join_columns(self.joins.as_deref(), "")
645 }
646
647 fn push_where_clause(&self, w: &mut SqlWriter) {
648 let mut has_clause = false;
649
650 if !self.include_deleted {
651 if let Some(delete_field) = self.delete_marker_field {
652 let qualified =
653 format!(r#""{}"."{}""#, self.table, delete_field);
654 w.push_where_raw(|writer| {
655 writer.push(&qualified);
656 writer.push(" IS NULL");
657 });
658 has_clause = true;
659 }
660 }
661
662 if let Some(e) = &self.where_expr {
663 let wrap = has_clause;
664 w.push_where_raw(|writer| {
665 if wrap {
666 writer.push("(");
667 e.write(writer);
668 writer.push(")");
669 } else {
670 e.write(writer);
671 }
672 });
673 has_clause = true;
674 }
675
676 if let Some(fts) = &self.full_text_search {
677 let wrap = has_clause;
678 w.push_where_raw(|writer| {
679 if wrap {
680 writer.push("(");
681 }
682 fts.write_condition(writer, self.table, self.joins.as_deref());
683 if wrap {
684 writer.push(")");
685 }
686 });
687 }
688 }
689
690 fn push_group_by_clause(&self, w: &mut SqlWriter) {
691 if let Some(columns) = &self.group_by {
692 if columns.is_empty() {
693 return;
694 }
695 let resolved = resolve_selection_columns(
696 columns,
697 self.table,
698 self.joins.as_deref(),
699 );
700 w.push_group_by_columns(&resolved);
701 }
702 }
703
704 fn push_having_clause(&self, w: &mut SqlWriter) {
705 let Some(predicates) = &self.having else {
706 return;
707 };
708 if predicates.is_empty() {
709 return;
710 }
711 let aliases = build_alias_lookup(self.joins.as_deref());
712 let table = self.table;
713 w.push_having(|writer| {
714 for (idx, predicate) in predicates.iter().enumerate() {
715 if idx > 0 {
716 writer.push(" AND ");
717 }
718 write_having_predicate(predicate, writer, table, &aliases);
719 }
720 });
721 }
722
723 pub async fn fetch_page<'e, E>(
724 &self,
725 exec: E,
726 ) -> Result<Page<C::Model>, sqlx::Error>
727 where
728 E: Executor<'e, Database = Postgres>,
729 {
730 #[derive(sqlx::FromRow)]
731 struct RowWithCount<M> {
732 #[sqlx(flatten)]
733 model: M,
734 total_count: i64,
735 }
736
737 let rows: Vec<PgRow> = self
738 .to_query_builder(SelectType::StarAndCount)
739 .build()
740 .fetch_all(exec)
741 .await?;
742
743 let pagination = self.pagination.unwrap_or_default();
744
745 if rows.is_empty() {
746 return Ok(Page::new(vec![], pagination, 0));
747 }
748
749 let mut total = 0;
750 let mut items = Vec::with_capacity(rows.len());
751 let hydrate = self.selection.is_none();
752
753 for row in rows {
754 let mut parsed = RowWithCount::<C::Model>::from_row(&row)?;
755 if hydrate {
756 parsed.model.hydrate_navigations(
757 self.joins.as_deref(),
758 &row,
759 "",
760 )?;
761 }
762 total = parsed.total_count;
763 items.push(parsed.model);
764 }
765
766 if hydrate && C::Model::has_collection_joins(self.joins.as_deref()) {
767 items = <C::Model as JoinNavigationModel>::merge_collection_rows(
768 items,
769 self.joins.as_deref(),
770 );
771 }
772
773 Ok(Page::new(items, pagination, total))
774 }
775
776 pub async fn exists<'e, E>(&self, exec: E) -> Result<bool, sqlx::Error>
777 where
778 E: Executor<'e, Database = Postgres>,
779 {
780 #[derive(sqlx::FromRow)]
781 struct ExistsRow {
782 exists: bool,
783 }
784
785 let row: ExistsRow = self
786 .to_query_builder(SelectType::Exists)
787 .build_query_as::<ExistsRow>()
788 .fetch_one(exec)
789 .await?;
790
791 Ok(row.exists)
792 }
793
794 #[cfg(any(test, feature = "test-utils"))]
795 pub fn sql(&self, build: SelectType) -> String {
796 use sqlx::Execute;
797 self.to_query_builder(build).build().sql().to_string()
798 }
799
800 fn map_pg_row(&self, row: PgRow) -> Result<Row, sqlx::Error>
801 where
802 Row: HydrateRow<C>,
803 {
804 <Row as HydrateRow<C>>::from_pg_row(self, row)
805 }
806}
807
808#[async_trait::async_trait]
809impl<'a, C, Row> FetchablePlan<C, Row> for ReadQueryPlan<'a, C, Row>
810where
811 C: QueryContext,
812 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
813{
814 async fn fetch_one<'e, E>(&self, exec: E) -> Result<Row, sqlx::Error>
815 where
816 E: Executor<'e, Database = Postgres>,
817 {
818 if <Row as HydrateRow<C>>::requires_collection_merge(self) {
819 let rows = self
820 .to_query_builder(SelectType::Star)
821 .build()
822 .fetch_all(exec)
823 .await?;
824 let mapped = rows
825 .into_iter()
826 .map(|row| self.map_pg_row(row))
827 .collect::<Result<Vec<Row>, _>>()?;
828 let merged =
829 <Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
830 return merged.into_iter().next().ok_or(sqlx::Error::RowNotFound);
831 }
832
833 let row = self
834 .to_query_builder(SelectType::Star)
835 .build()
836 .fetch_one(exec)
837 .await?;
838
839 self.map_pg_row(row)
840 }
841
842 async fn fetch_all<'e, E>(&self, exec: E) -> Result<Vec<Row>, sqlx::Error>
843 where
844 E: Executor<'e, Database = Postgres>,
845 {
846 let rows = self
847 .to_query_builder(SelectType::Star)
848 .build()
849 .fetch_all(exec)
850 .await?;
851
852 let mapped = rows
853 .into_iter()
854 .map(|row| self.map_pg_row(row))
855 .collect::<Result<Vec<Row>, _>>()?;
856
857 Ok(<Row as HydrateRow<C>>::merge_collection_rows(mapped, self))
858 }
859
860 async fn fetch_optional<'e, E>(
861 &self,
862 exec: E,
863 ) -> Result<Option<Row>, sqlx::Error>
864 where
865 E: Executor<'e, Database = Postgres>,
866 {
867 if <Row as HydrateRow<C>>::requires_collection_merge(self) {
868 let rows = self
869 .to_query_builder(SelectType::Star)
870 .build()
871 .fetch_all(exec)
872 .await?;
873 let mapped = rows
874 .into_iter()
875 .map(|row| self.map_pg_row(row))
876 .collect::<Result<Vec<Row>, _>>()?;
877 let merged =
878 <Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
879 return Ok(merged.into_iter().next());
880 }
881
882 let row = self
883 .to_query_builder(SelectType::Star)
884 .build()
885 .fetch_optional(exec)
886 .await?;
887
888 match row {
889 Some(row) => self.map_pg_row(row).map(Some),
890 None => Ok(None),
891 }
892 }
893}
894
895fn push_join_path_inline(
896 qb: &mut sqlx::QueryBuilder<'static, Postgres>,
897 path: &JoinPath,
898 base_table: &str,
899) {
900 if path.is_empty() {
901 return;
902 }
903
904 let mut left_alias = base_table.to_string();
905 let mut alias_prefix = String::new();
906
907 for segment in path.segments() {
908 let join_word = match segment.kind {
909 JoinKind::Inner => " INNER JOIN ",
910 JoinKind::Left => " LEFT JOIN ",
911 };
912
913 if let Some(through) = segment.descriptor.through {
914 let mut through_alias = alias_prefix.clone();
915 through_alias.push_str(through.alias_segment);
916 let clause = format!(
917 r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
918 join = join_word,
919 table = through.table,
920 alias = &through_alias,
921 left = &left_alias,
922 left_field = through.left_field,
923 right_field = through.right_field,
924 );
925 qb.push(clause);
926 left_alias = through_alias;
927 }
928
929 alias_prefix.push_str(segment.descriptor.alias_segment);
930 let right_alias = alias_prefix.clone();
931
932 let clause = format!(
933 r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
934 join = join_word,
935 table = segment.descriptor.right_table,
936 alias = &right_alias,
937 left = &left_alias,
938 left_field = segment.descriptor.left_field,
939 right_field = segment.descriptor.right_field,
940 );
941
942 qb.push(clause);
943 left_alias = right_alias;
944 }
945}
946
947#[async_trait::async_trait]
948impl<'a, C, Row> ExecutablePlan<C> for ReadQueryPlan<'a, C, Row>
949where
950 C: QueryContext,
951 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
952{
953 async fn execute<'e, E>(&self, exec: E) -> Result<u64, sqlx::Error>
954 where
955 E: Executor<'e, Database = Postgres>,
956 {
957 let rows = self
958 .to_query_builder(SelectType::Star)
959 .build()
960 .execute(exec)
961 .await?
962 .rows_affected();
963
964 Ok(rows)
965 }
966}
967
968impl<'a, C, Row> Planable<C, Row> for ReadQueryPlan<'a, C, Row>
969where
970 C: QueryContext,
971 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
972{
973}
974
975trait HydrateRow<C: QueryContext>: Sized {
976 fn from_pg_row(
977 plan: &ReadQueryPlan<C, Self>,
978 row: PgRow,
979 ) -> Result<Self, sqlx::Error>;
980
981 fn requires_collection_merge(_plan: &ReadQueryPlan<C, Self>) -> bool {
982 false
983 }
984
985 fn merge_collection_rows(
986 rows: Vec<Self>,
987 _plan: &ReadQueryPlan<C, Self>,
988 ) -> Vec<Self> {
989 rows
990 }
991}
992
993impl<C, Row> HydrateRow<C> for Row
994where
995 C: QueryContext,
996 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
997{
998 default fn from_pg_row(
999 _plan: &ReadQueryPlan<C, Row>,
1000 row: PgRow,
1001 ) -> Result<Self, sqlx::Error> {
1002 Row::from_row(&row)
1003 }
1004
1005 default fn requires_collection_merge(
1006 _plan: &ReadQueryPlan<C, Self>,
1007 ) -> bool {
1008 false
1009 }
1010
1011 default fn merge_collection_rows(
1012 rows: Vec<Self>,
1013 _plan: &ReadQueryPlan<C, Self>,
1014 ) -> Vec<Self> {
1015 rows
1016 }
1017}
1018
1019impl<C> HydrateRow<C> for C::Model
1020where
1021 C: QueryContext,
1022 C::Model: JoinNavigationModel,
1023{
1024 fn from_pg_row(
1025 plan: &ReadQueryPlan<C, Self>,
1026 row: PgRow,
1027 ) -> Result<Self, sqlx::Error> {
1028 let mut model = Self::from_row(&row)?;
1029 if plan.selection.is_none() {
1030 model.hydrate_navigations(plan.joins.as_deref(), &row, "")?;
1031 }
1032 Ok(model)
1033 }
1034
1035 fn requires_collection_merge(plan: &ReadQueryPlan<C, Self>) -> bool {
1036 plan.selection.is_none() &&
1037 Self::has_collection_joins(plan.joins.as_deref())
1038 }
1039
1040 fn merge_collection_rows(
1041 rows: Vec<Self>,
1042 plan: &ReadQueryPlan<C, Self>,
1043 ) -> Vec<Self> {
1044 if !Self::has_collection_joins(plan.joins.as_deref()) {
1045 return rows;
1046 }
1047
1048 <Self as JoinNavigationModel>::merge_collection_rows(
1049 rows,
1050 plan.joins.as_deref(),
1051 )
1052 }
1053}
1054
1055enum RelationReloadTarget<'m, M> {
1056 One(&'m mut M),
1057 Many(&'m mut [M]),
1058}
1059
1060pub struct RelationReloadBuilder<'m, C: QueryContext> {
1061 target: RelationReloadTarget<'m, C::Model>,
1062 include_lazy_relations: bool,
1063 _phantom: PhantomData<C>,
1064}
1065
1066impl<'m, C> RelationReloadBuilder<'m, C>
1067where
1068 C: QueryContext,
1069{
1070 pub(crate) fn one(model: &'m mut C::Model) -> Self {
1071 Self {
1072 target: RelationReloadTarget::One(model),
1073 include_lazy_relations: false,
1074 _phantom: PhantomData,
1075 }
1076 }
1077
1078 pub(crate) fn many(models: &'m mut [C::Model]) -> Self {
1079 Self {
1080 target: RelationReloadTarget::Many(models),
1081 include_lazy_relations: false,
1082 _phantom: PhantomData,
1083 }
1084 }
1085
1086 pub fn include_lazy_relations(mut self) -> Self {
1087 self.include_lazy_relations = true;
1088 self
1089 }
1090
1091 pub async fn execute<'e, E>(self, exec: E) -> Result<u64, sqlx::Error>
1092 where
1093 E: Executor<'e, Database = Postgres>,
1094 C::Model:
1095 PrimaryKeyExpression<C> + JoinIdentifiable + JoinNavigationModel,
1096 {
1097 match self.target {
1098 RelationReloadTarget::One(model) => {
1099 let mut builder = QueryBuilder::<C>::read();
1100 if self.include_lazy_relations {
1101 builder = builder.include_lazy_relations();
1102 }
1103 let loaded = builder
1104 .r#where(model.primary_key_expression())
1105 .build()
1106 .fetch_one(exec)
1107 .await?;
1108 *model = loaded;
1109 Ok(1)
1110 }
1111 RelationReloadTarget::Many(models) => {
1112 if models.is_empty() {
1113 return Ok(0);
1114 }
1115
1116 let mut filters: Vec<Expression<C::Query>> =
1117 Vec::with_capacity(models.len());
1118 for model in models.iter() {
1119 filters.push(model.primary_key_expression());
1120 }
1121
1122 let filter_expr = if filters.len() == 1 {
1123 filters.remove(0)
1124 } else {
1125 Expression::Or(filters)
1126 };
1127
1128 let mut builder = QueryBuilder::<C>::read();
1129 if self.include_lazy_relations {
1130 builder = builder.include_lazy_relations();
1131 }
1132
1133 let loaded = builder
1134 .r#where(filter_expr)
1135 .build()
1136 .fetch_all(exec)
1137 .await?;
1138
1139 let mut refreshed = 0u64;
1140 for model in models.iter_mut() {
1141 if let Some(found) = loaded.iter().find(|candidate| {
1142 candidate.join_key() == model.join_key()
1143 }) {
1144 *model = found.clone();
1145 refreshed += 1;
1146 }
1147 }
1148 Ok(refreshed)
1149 }
1150 }
1151 }
1152}
1153
1154pub struct ReadQueryBuilder<
1155 'a,
1156 C: QueryContext,
1157 Row = <C as QueryContext>::Model,
1158> {
1159 pub(crate) table: &'a str,
1160 pub(crate) joins: Option<Vec<JoinPath>>,
1161 pub(crate) where_expr: Option<Expression<C::Query>>,
1162 pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
1163 pub(crate) pagination: Option<Pagination>,
1164 pub(crate) include_deleted: bool,
1165 pub(crate) auto_joins: bool,
1166 pub(crate) include_lazy_relations: bool,
1167 pub(crate) delete_marker_field: Option<&'a str>,
1168 pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
1169 pub(crate) group_by: Option<Vec<SelectionColumn>>,
1170 pub(crate) having: Option<Vec<HavingPredicate>>,
1171 pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
1172 row: PhantomData<Row>,
1173}
1174
1175impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1176where
1177 C: QueryContext,
1178{
1179 pub fn include_deleted(mut self) -> Self {
1180 self.include_deleted = true;
1181 self
1182 }
1183
1184 pub fn without_auto_joins(mut self) -> Self {
1185 self.auto_joins = false;
1186 self
1187 }
1188
1189 pub fn include_lazy_relations(mut self) -> Self {
1190 self.include_lazy_relations = true;
1191 self
1192 }
1193
1194 pub fn search(
1195 mut self,
1196 config: <C::Model as FullTextSearchable>::FullTextSearchConfig,
1197 ) -> Self
1198 where
1199 C::Model: FullTextSearchable + 'static,
1200 <C::Model as FullTextSearchable>::FullTextSearchConfig:
1201 Send + Sync + 'static,
1202 {
1203 self.full_text_search =
1204 Some(Box::new(ModelFullTextSearchPlan::<C::Model>::new(config)));
1205 self
1206 }
1207}
1208
1209impl<'a, C, Row> Buildable<C> for ReadQueryBuilder<'a, C, Row>
1210where
1211 C: QueryContext,
1212 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1213 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1214{
1215 type Row = Row;
1216 type Plan = ReadQueryPlan<'a, C, Row>;
1217
1218 fn from_ctx() -> Self {
1219 Self {
1220 table: C::TABLE,
1221 joins: None,
1222 where_expr: None,
1223 sort_expr: None,
1224 pagination: None,
1225 include_deleted: false,
1226 auto_joins: true,
1227 include_lazy_relations: false,
1228 delete_marker_field: C::Model::delete_marker_field(),
1229 selection: None,
1230 group_by: None,
1231 having: None,
1232 full_text_search: None,
1233 row: PhantomData,
1234 }
1235 }
1236
1237 fn build(self) -> Self::Plan {
1238 let auto_join_paths: Vec<JoinPath> = if self.auto_joins {
1239 C::Model::default_join_paths(self.include_lazy_relations).into_vec()
1240 } else {
1241 Vec::new()
1242 };
1243 let mut combined_paths = auto_join_paths;
1244 if let Some(paths) = self.joins {
1245 combined_paths.extend(paths);
1246 }
1247 let resolved_joins = if combined_paths.is_empty() {
1248 None
1249 } else {
1250 Some(merge_unique_join_paths(combined_paths))
1251 };
1252
1253 let mut plan = ReadQueryPlan {
1254 joins: resolved_joins,
1255 where_expr: self.where_expr,
1256 sort_expr: self.sort_expr,
1257 pagination: self.pagination,
1258 table: self.table,
1259 include_deleted: self.include_deleted,
1260 delete_marker_field: self.delete_marker_field,
1261 selection: self.selection,
1262 group_by: self.group_by,
1263 having: self.having,
1264 full_text_search: self.full_text_search,
1265 aggregate_filter: None,
1266 row: PhantomData,
1267 };
1268 plan.compute_aggregate_filter();
1269 plan
1270 }
1271}
1272
1273impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1274where
1275 C: QueryContext,
1276 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1277 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1278{
1279 pub fn take<NewRow>(
1280 self,
1281 selection: SelectionList<NewRow, SelectionEntry>,
1282 ) -> ReadQueryBuilder<'a, C, NewRow>
1283 where
1284 NewRow: Send
1285 + Sync
1286 + Unpin
1287 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1288 {
1289 ReadQueryBuilder {
1290 table: self.table,
1291 joins: self.joins,
1292 where_expr: self.where_expr,
1293 sort_expr: self.sort_expr,
1294 pagination: self.pagination,
1295 include_deleted: self.include_deleted,
1296 auto_joins: self.auto_joins,
1297 include_lazy_relations: self.include_lazy_relations,
1298 delete_marker_field: self.delete_marker_field,
1299 selection: Some(selection),
1300 group_by: self.group_by,
1301 having: self.having,
1302 full_text_search: self.full_text_search,
1303 row: PhantomData,
1304 }
1305 }
1306
1307 pub fn group_by(mut self, group_by: GroupByList) -> Self {
1308 let cols = group_by.into_columns().into_vec();
1309 self.group_by = Some(cols);
1310 self
1311 }
1312
1313 pub fn having(mut self, having: HavingList) -> Self {
1314 self.having = Some(having.into_predicates());
1315 self
1316 }
1317}
1318
1319impl<'a, C, Row> BuildableFilter<C> for ReadQueryBuilder<'a, C, Row>
1320where
1321 C: QueryContext,
1322 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1323{
1324 fn r#where(mut self, e: Expression<<C as QueryContext>::Query>) -> Self {
1325 match self.where_expr {
1326 Some(existing) => self.where_expr = Some(and![existing, e]),
1327 None => self.where_expr = Some(e),
1328 };
1329
1330 self
1331 }
1332}
1333
1334impl<'a, C, Row> BuildableJoin<C> for ReadQueryBuilder<'a, C, Row>
1335where
1336 C: QueryContext,
1337 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1338{
1339 fn join(self, join: <C as QueryContext>::Join, kind: JoinKind) -> Self {
1340 self.join_path(JoinPath::from_join(join, kind))
1341 }
1342
1343 fn join_path(mut self, path: JoinPath) -> Self {
1344 if let Some(expected) = path.first_table() {
1345 assert_eq!(
1346 expected, self.table,
1347 "join path must start at base table `{}` but started at `{}`",
1348 self.table, expected,
1349 );
1350 }
1351
1352 match &mut self.joins {
1353 Some(existing) => existing.push(path),
1354 None => self.joins = Some(vec![path]),
1355 };
1356
1357 self
1358 }
1359}
1360
1361impl<'a, C, Row> BuildableSort<C> for ReadQueryBuilder<'a, C, Row>
1362where
1363 C: QueryContext,
1364 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1365{
1366 fn order_by(mut self, s: SortOrder<<C as QueryContext>::Sort>) -> Self {
1367 match self.sort_expr {
1368 Some(existing) => self.sort_expr = Some(order_by![existing, s]),
1369 None => self.sort_expr = Some(s),
1370 }
1371
1372 self
1373 }
1374}
1375
1376impl<'a, C, Row> BuildablePage<C> for ReadQueryBuilder<'a, C, Row>
1377where
1378 C: QueryContext,
1379 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1380{
1381 fn paginate(mut self, p: Pagination) -> Self {
1382 self.pagination = Some(p);
1383 self
1384 }
1385}
1386
1387impl<'a, C, Row> BuildableReadQuery<C, Row> for ReadQueryBuilder<'a, C, Row>
1388where
1389 C: QueryContext,
1390 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1391 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1392{
1393}