1use smallvec::SmallVec;
2use std::marker::PhantomData;
3
4use sqlx::{
5 postgres::PgRow,
6 Executor,
7 FromRow,
8 Postgres,
9};
10use sqlxo_traits::{
11 AliasedColumn,
12 FullTextSearchConfig,
13 FullTextSearchable,
14 GetDeleteMarker,
15 JoinKind,
16 JoinNavigationModel,
17 JoinPath,
18 PrimaryKey,
19 QueryContext,
20 SqlWrite,
21};
22
23use crate::{
24 and,
25 blocks::{
26 BuildableFilter,
27 BuildableJoin,
28 BuildablePage,
29 BuildableSort,
30 Expression,
31 Page,
32 Pagination,
33 QualifiedColumn,
34 ReadHead,
35 SelectProjection,
36 SelectType,
37 SortOrder,
38 SqlWriter,
39 },
40 order_by,
41 select::{
42 AggregateFunction,
43 AggregateSelection,
44 GroupByList,
45 HavingList,
46 HavingPredicate,
47 SelectionColumn,
48 SelectionEntry,
49 SelectionList,
50 },
51 Buildable,
52 ExecutablePlan,
53 FetchablePlan,
54 Planable,
55};
56
57#[allow(dead_code)]
59pub trait BuildableReadQuery<C, Row = <C as QueryContext>::Model>:
60 Buildable<C, Row = Row, Plan: Planable<C, Row>>
61 + BuildableFilter<C>
62 + BuildableJoin<C>
63 + BuildableSort<C>
64 + BuildablePage<C>
65where
66 C: QueryContext,
67 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
68{
69}
70
71pub(crate) trait DynFullTextSearchPlan: Send + Sync {
72 fn write_condition(
73 &self,
74 w: &mut SqlWriter,
75 base_alias: &str,
76 joins: Option<&[JoinPath]>,
77 );
78
79 fn write_rank_expr(
80 &self,
81 w: &mut SqlWriter,
82 base_alias: &str,
83 joins: Option<&[JoinPath]>,
84 );
85
86 fn include_rank(&self) -> bool;
87}
88
89struct ModelFullTextSearchPlan<M>
90where
91 M: FullTextSearchable,
92{
93 config: M::FullTextSearchConfig,
94 _marker: PhantomData<M>,
95}
96
97impl<M> ModelFullTextSearchPlan<M>
98where
99 M: FullTextSearchable,
100{
101 fn new(config: M::FullTextSearchConfig) -> Self {
102 Self {
103 config,
104 _marker: PhantomData,
105 }
106 }
107}
108
109impl<M> DynFullTextSearchPlan for ModelFullTextSearchPlan<M>
110where
111 M: FullTextSearchable + Send + Sync + 'static,
112 M::FullTextSearchConfig: Send + Sync,
113{
114 fn write_condition(
115 &self,
116 w: &mut SqlWriter,
117 base_alias: &str,
118 joins: Option<&[JoinPath]>,
119 ) {
120 w.push("(");
121 w.push("(");
122 M::write_tsvector(w, base_alias, joins, &self.config);
123 w.push(") @@ (");
124 M::write_tsquery(w, &self.config);
125 w.push(")");
126 if self.config.fuzzy_threshold().is_some() &&
127 self.config
128 .fuzzy_tokens()
129 .map(|tokens| !tokens.is_empty())
130 .unwrap_or(false)
131 {
132 w.push(" OR ");
133 M::write_fuzzy(w, base_alias, joins, &self.config);
134 }
135 w.push(")");
136 }
137
138 fn write_rank_expr(
139 &self,
140 w: &mut SqlWriter,
141 base_alias: &str,
142 joins: Option<&[JoinPath]>,
143 ) {
144 M::write_rank(w, base_alias, joins, &self.config);
145 }
146
147 fn include_rank(&self) -> bool {
148 self.config.include_rank()
149 }
150}
151
152pub struct ReadQueryPlan<'a, C: QueryContext, Row = <C as QueryContext>::Model>
153{
154 pub(crate) joins: Option<Vec<JoinPath>>,
155 pub(crate) where_expr: Option<Expression<C::Query>>,
156 pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
157 pub(crate) pagination: Option<Pagination>,
158 pub(crate) table: &'a str,
159 pub(crate) include_deleted: bool,
160 pub(crate) delete_marker_field: Option<&'a str>,
161 pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
162 pub(crate) group_by: Option<Vec<SelectionColumn>>,
163 pub(crate) having: Option<Vec<HavingPredicate>>,
164 pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
165 pub(crate) aggregate_filter: Option<AggregateFilter>,
166 row: PhantomData<Row>,
167}
168
169#[derive(Clone)]
170pub struct AggregateFilter {
171 pub columns: SmallVec<[&'static str; 2]>,
172 pub predicates: Vec<HavingPredicate>,
173}
174
175fn build_alias_lookup(
176 joins: Option<&[JoinPath]>,
177) -> Vec<(&'static str, String)> {
178 let mut aliases = Vec::new();
179
180 if let Some(paths) = joins {
181 for path in paths {
182 let mut alias_prefix = String::new();
183 for segment in path.segments() {
184 if let Some(through) = segment.descriptor.through {
185 let mut through_alias = alias_prefix.clone();
186 through_alias.push_str(through.alias_segment);
187 aliases.push((through.table, through_alias));
188 }
189 alias_prefix.push_str(segment.descriptor.alias_segment);
190 aliases.push((
191 segment.descriptor.right_table,
192 alias_prefix.clone(),
193 ));
194 }
195 }
196 }
197
198 aliases
199}
200
201fn resolve_alias_for_table(
202 table: &'static str,
203 column: &'static str,
204 base_table: &str,
205 aliases: &[(&'static str, String)],
206) -> String {
207 if table == base_table {
208 return base_table.to_string();
209 }
210
211 let mut matches =
212 aliases.iter().filter(|(tbl, _)| *tbl == table).peekable();
213
214 let Some((_, alias)) = matches.next() else {
215 panic!(
216 "`take!` requested column `{table}.{column}` but `{table}` is not \
217 part of the join set"
218 );
219 };
220
221 if matches.peek().is_some() {
222 panic!(
223 "`take!` requested column `{table}.{column}` but `{table}` is \
224 joined multiple times; disambiguation is not implemented yet"
225 );
226 }
227
228 alias.clone()
229}
230
231fn resolve_selection_columns(
232 selection: &[SelectionColumn],
233 base_table: &str,
234 joins: Option<&[JoinPath]>,
235) -> SmallVec<[QualifiedColumn; 4]> {
236 let aliases = build_alias_lookup(joins);
237 let mut resolved = SmallVec::new();
238
239 for col in selection {
240 resolved.push(resolve_selection_column(col, base_table, &aliases));
241 }
242
243 resolved
244}
245
246fn resolve_selection_column(
247 column: &SelectionColumn,
248 base_table: &str,
249 aliases: &[(&'static str, String)],
250) -> QualifiedColumn {
251 let table_alias = resolve_alias_for_table(
252 column.table,
253 column.column,
254 base_table,
255 aliases,
256 );
257 QualifiedColumn {
258 table_alias,
259 column: column.column,
260 }
261}
262
263fn format_aggregate_expression(
264 selection: &AggregateSelection,
265 base_table: &str,
266 aliases: &[(&'static str, String)],
267) -> String {
268 match selection.column {
269 Some(col) => {
270 let qualified = resolve_selection_column(&col, base_table, aliases);
271 match selection.function {
272 AggregateFunction::CountDistinct => format!(
273 r#"COUNT(DISTINCT "{}"."{}")"#,
274 qualified.table_alias, qualified.column
275 ),
276 _ => format!(
277 r#"{}("{}"."{}")"#,
278 selection.function.sql_name(),
279 qualified.table_alias,
280 qualified.column
281 ),
282 }
283 }
284 None => format!("{}(*)", selection.function.sql_name()),
285 }
286}
287
288fn write_having_predicate(
289 predicate: &HavingPredicate,
290 writer: &mut SqlWriter,
291 base_table: &str,
292 aliases: &[(&'static str, String)],
293) {
294 let expr =
295 format_aggregate_expression(&predicate.selection, base_table, aliases);
296 writer.push(&expr);
297 writer.push(" ");
298 writer.push(predicate.comparator.as_str());
299 writer.push(" ");
300 predicate.bind_value(writer);
301}
302
303impl<'a, C, Row> ReadQueryPlan<'a, C, Row>
304where
305 C: QueryContext,
306 C::Model: JoinNavigationModel,
307{
308 fn compute_aggregate_filter(&mut self) {
309 if self.having.is_none() ||
310 self.selection.is_some() ||
311 self.group_by.is_some()
312 {
313 return;
314 }
315
316 let pk_columns = <C::Model as PrimaryKey>::PRIMARY_KEY;
317 if pk_columns.is_empty() {
318 return;
319 }
320
321 let Some(predicates) = self.having.take() else {
322 return;
323 };
324 if predicates.is_empty() {
325 return;
326 }
327
328 let columns = SmallVec::<[&'static str; 2]>::from_slice(pk_columns);
329 self.aggregate_filter = Some(AggregateFilter {
330 columns,
331 predicates,
332 });
333 }
334
335 fn push_aggregate_filter_clause(
336 &self,
337 writer: &mut SqlWriter,
338 filter: &AggregateFilter,
339 ) {
340 writer.push_where_raw(|w| {
341 if filter.columns.len() == 1 {
342 let col = filter.columns[0];
343 w.push(&format!(r#""{}"."{}""#, self.table, col));
344 } else {
345 w.push("(");
346 for (idx, col) in filter.columns.iter().enumerate() {
347 if idx > 0 {
348 w.push(", ");
349 }
350 w.push(&format!(r#""{}"."{}""#, self.table, col));
351 }
352 w.push(")");
353 }
354 w.push(" IN (");
355 self.write_aggregate_subquery(w, filter);
356 w.push(")");
357 });
358 }
359
360 fn write_aggregate_subquery(
361 &self,
362 writer: &mut SqlWriter,
363 filter: &AggregateFilter,
364 ) {
365 writer.push("SELECT ");
366 for (idx, col) in filter.columns.iter().enumerate() {
367 if idx > 0 {
368 writer.push(", ");
369 }
370 writer.push(&format!(r#""{}"."{}""#, self.table, col));
371 }
372 writer.push(" FROM ");
373 writer.push(self.table);
374
375 if let Some(js) = &self.joins {
376 for path in js {
377 push_join_path_inline(
378 writer.query_builder_mut(),
379 path,
380 self.table,
381 );
382 }
383 }
384
385 self.write_subquery_filters(writer);
386 self.write_subquery_group_by(writer, filter);
387 self.write_subquery_having(writer, filter);
388 }
389
390 fn write_subquery_filters(&self, writer: &mut SqlWriter) {
391 let mut has_clause = false;
392
393 if !self.include_deleted {
394 if let Some(delete_field) = self.delete_marker_field {
395 writer.push(" WHERE ");
396 writer.push(&format!(
397 r#""{}"."{}" IS NULL"#,
398 self.table, delete_field
399 ));
400 has_clause = true;
401 }
402 }
403
404 if let Some(expr) = &self.where_expr {
405 if has_clause {
406 writer.push(" AND (");
407 } else {
408 writer.push(" WHERE (");
409 }
410 expr.write(writer);
411 writer.push(")");
412 has_clause = true;
413 }
414
415 if let Some(fts) = &self.full_text_search {
416 if has_clause {
417 writer.push(" AND (");
418 } else {
419 writer.push(" WHERE (");
420 }
421 fts.write_condition(writer, self.table, self.joins.as_deref());
422 writer.push(")");
423 }
424 }
425
426 fn write_subquery_group_by(
427 &self,
428 writer: &mut SqlWriter,
429 filter: &AggregateFilter,
430 ) {
431 writer.push(" GROUP BY ");
432 for (idx, col) in filter.columns.iter().enumerate() {
433 if idx > 0 {
434 writer.push(", ");
435 }
436 writer.push(&format!(r#""{}"."{}""#, self.table, col));
437 }
438 }
439
440 fn write_subquery_having(
441 &self,
442 writer: &mut SqlWriter,
443 filter: &AggregateFilter,
444 ) {
445 if filter.predicates.is_empty() {
446 return;
447 }
448
449 let aliases = build_alias_lookup(self.joins.as_deref());
450 writer.push(" HAVING ");
451 for (idx, predicate) in filter.predicates.iter().enumerate() {
452 if idx > 0 {
453 writer.push(" AND ");
454 }
455 write_having_predicate(predicate, writer, self.table, &aliases);
456 }
457 }
458
459 fn to_query_builder(
460 &self,
461 select_type: SelectType,
462 ) -> sqlx::QueryBuilder<'static, Postgres> {
463 let effective_select = self.select_type_for(select_type.clone());
464 let head = ReadHead::new(self.table, effective_select);
465 let mut w = SqlWriter::new(head);
466
467 if let Some(js) = &self.joins {
468 w.push_joins(js, self.table);
469 }
470
471 self.push_where_clause(&mut w);
472 if let Some(filter) = &self.aggregate_filter {
473 self.push_aggregate_filter_clause(&mut w, filter);
474 } else {
475 self.push_group_by_clause(&mut w);
476 self.push_having_clause(&mut w);
477 }
478
479 if let Some(s) = &self.sort_expr {
480 w.push_sort(s);
481 } else if !matches!(select_type, SelectType::Exists) {
482 if let Some(fts) = &self.full_text_search {
483 if fts.include_rank() {
484 w.push_order_by_raw(|writer| {
485 fts.write_rank_expr(
486 writer,
487 self.table,
488 self.joins.as_deref(),
489 );
490 writer.push(" DESC");
491 });
492 }
493 }
494 }
495
496 if let SelectType::Exists = select_type {
497 w.push_pagination(&Pagination {
498 page: 0,
499 page_size: 1,
500 });
501 } else if let Some(p) = &self.pagination {
502 w.push_pagination(p);
503 }
504
505 if let SelectType::Exists = select_type {
506 w.push(")");
507 }
508
509 w.into_builder()
510 }
511
512 fn select_type_for(&self, base: SelectType) -> SelectType {
513 let resolved = match base {
514 SelectType::Star => self
515 .selection
516 .as_ref()
517 .map(|s| self.selection_select_type(s))
518 .unwrap_or(SelectType::Star),
519 other => other,
520 };
521
522 self.apply_join_extras(resolved)
523 }
524
525 fn selection_select_type(
526 &self,
527 selection: &SelectionList<Row, SelectionEntry>,
528 ) -> SelectType {
529 let mut has_columns = false;
530 let mut has_aggregates = false;
531 for entry in selection.entries() {
532 match entry {
533 SelectionEntry::Column(_) => has_columns = true,
534 SelectionEntry::Aggregate(_) => has_aggregates = true,
535 }
536 }
537
538 if has_columns && has_aggregates && self.group_by.is_none() {
539 panic!(
540 "`group_by!` must be provided when selecting columns \
541 alongside aggregates"
542 );
543 }
544
545 if has_columns && !has_aggregates {
546 let mut cols: SmallVec<[SelectionColumn; 4]> =
547 SmallVec::with_capacity(selection.entries().len());
548 for entry in selection.entries() {
549 if let SelectionEntry::Column(col) = entry {
550 cols.push(*col);
551 }
552 }
553 return SelectType::Columns(resolve_selection_columns(
554 &cols,
555 self.table,
556 self.joins.as_deref(),
557 ));
558 }
559
560 let projections = self.build_projections(selection);
561 SelectType::Projection(projections)
562 }
563
564 fn build_projections(
565 &self,
566 selection: &SelectionList<Row, SelectionEntry>,
567 ) -> Vec<SelectProjection> {
568 let aliases = build_alias_lookup(self.joins.as_deref());
569 selection
570 .entries()
571 .iter()
572 .enumerate()
573 .map(|(idx, entry)| match entry {
574 SelectionEntry::Column(col) => {
575 let qualified =
576 resolve_selection_column(col, self.table, &aliases);
577 SelectProjection {
578 expression: format!(
579 r#""{}"."{}""#,
580 qualified.table_alias, qualified.column
581 ),
582 alias: None,
583 }
584 }
585 SelectionEntry::Aggregate(agg) => {
586 let expr =
587 format_aggregate_expression(agg, self.table, &aliases);
588 let alias = format!(r#"__sqlxo_sel_{}"#, idx);
589 SelectProjection {
590 expression: expr,
591 alias: Some(alias),
592 }
593 }
594 })
595 .collect()
596 }
597
598 fn apply_join_extras(&self, select: SelectType) -> SelectType {
599 let extras = self.join_projection_columns();
600 if extras.is_empty() {
601 return select;
602 }
603
604 match select {
605 SelectType::Star => SelectType::StarWithExtras(extras),
606 SelectType::StarAndCount => SelectType::StarAndCountExtras(extras),
607 other => other,
608 }
609 }
610
611 fn join_projection_columns(&self) -> SmallVec<[AliasedColumn; 4]> {
612 if self.selection.is_some() {
613 return SmallVec::new();
614 }
615
616 C::Model::collect_join_columns(self.joins.as_deref(), "")
617 }
618
619 fn push_where_clause(&self, w: &mut SqlWriter) {
620 let mut has_clause = false;
621
622 if !self.include_deleted {
623 if let Some(delete_field) = self.delete_marker_field {
624 let qualified =
625 format!(r#""{}"."{}""#, self.table, delete_field);
626 w.push_where_raw(|writer| {
627 writer.push(&qualified);
628 writer.push(" IS NULL");
629 });
630 has_clause = true;
631 }
632 }
633
634 if let Some(e) = &self.where_expr {
635 let wrap = has_clause;
636 w.push_where_raw(|writer| {
637 if wrap {
638 writer.push("(");
639 e.write(writer);
640 writer.push(")");
641 } else {
642 e.write(writer);
643 }
644 });
645 has_clause = true;
646 }
647
648 if let Some(fts) = &self.full_text_search {
649 let wrap = has_clause;
650 w.push_where_raw(|writer| {
651 if wrap {
652 writer.push("(");
653 }
654 fts.write_condition(writer, self.table, self.joins.as_deref());
655 if wrap {
656 writer.push(")");
657 }
658 });
659 }
660 }
661
662 fn push_group_by_clause(&self, w: &mut SqlWriter) {
663 if let Some(columns) = &self.group_by {
664 if columns.is_empty() {
665 return;
666 }
667 let resolved = resolve_selection_columns(
668 columns,
669 self.table,
670 self.joins.as_deref(),
671 );
672 w.push_group_by_columns(&resolved);
673 }
674 }
675
676 fn push_having_clause(&self, w: &mut SqlWriter) {
677 let Some(predicates) = &self.having else {
678 return;
679 };
680 if predicates.is_empty() {
681 return;
682 }
683 let aliases = build_alias_lookup(self.joins.as_deref());
684 let table = self.table;
685 w.push_having(|writer| {
686 for (idx, predicate) in predicates.iter().enumerate() {
687 if idx > 0 {
688 writer.push(" AND ");
689 }
690 write_having_predicate(predicate, writer, table, &aliases);
691 }
692 });
693 }
694
695 pub async fn fetch_page<'e, E>(
696 &self,
697 exec: E,
698 ) -> Result<Page<C::Model>, sqlx::Error>
699 where
700 E: Executor<'e, Database = Postgres>,
701 {
702 #[derive(sqlx::FromRow)]
703 struct RowWithCount<M> {
704 #[sqlx(flatten)]
705 model: M,
706 total_count: i64,
707 }
708
709 let rows: Vec<PgRow> = self
710 .to_query_builder(SelectType::StarAndCount)
711 .build()
712 .fetch_all(exec)
713 .await?;
714
715 let pagination = self.pagination.unwrap_or_default();
716
717 if rows.is_empty() {
718 return Ok(Page::new(vec![], pagination, 0));
719 }
720
721 let mut total = 0;
722 let mut items = Vec::with_capacity(rows.len());
723 let hydrate = self.selection.is_none();
724
725 for row in rows {
726 let mut parsed = RowWithCount::<C::Model>::from_row(&row)?;
727 if hydrate {
728 parsed.model.hydrate_navigations(
729 self.joins.as_deref(),
730 &row,
731 "",
732 )?;
733 }
734 total = parsed.total_count;
735 items.push(parsed.model);
736 }
737
738 if hydrate && C::Model::has_collection_joins(self.joins.as_deref()) {
739 items = <C::Model as JoinNavigationModel>::merge_collection_rows(
740 items,
741 self.joins.as_deref(),
742 );
743 }
744
745 Ok(Page::new(items, pagination, total))
746 }
747
748 pub async fn exists<'e, E>(&self, exec: E) -> Result<bool, sqlx::Error>
749 where
750 E: Executor<'e, Database = Postgres>,
751 {
752 #[derive(sqlx::FromRow)]
753 struct ExistsRow {
754 exists: bool,
755 }
756
757 let row: ExistsRow = self
758 .to_query_builder(SelectType::Exists)
759 .build_query_as::<ExistsRow>()
760 .fetch_one(exec)
761 .await?;
762
763 Ok(row.exists)
764 }
765
766 #[cfg(any(test, feature = "test-utils"))]
767 pub fn sql(&self, build: SelectType) -> String {
768 use sqlx::Execute;
769 self.to_query_builder(build).build().sql().to_string()
770 }
771
772 fn map_pg_row(&self, row: PgRow) -> Result<Row, sqlx::Error>
773 where
774 Row: HydrateRow<C>,
775 {
776 <Row as HydrateRow<C>>::from_pg_row(self, row)
777 }
778}
779
780#[async_trait::async_trait]
781impl<'a, C, Row> FetchablePlan<C, Row> for ReadQueryPlan<'a, C, Row>
782where
783 C: QueryContext,
784 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
785{
786 async fn fetch_one<'e, E>(&self, exec: E) -> Result<Row, sqlx::Error>
787 where
788 E: Executor<'e, Database = Postgres>,
789 {
790 if <Row as HydrateRow<C>>::requires_collection_merge(self) {
791 let rows = self
792 .to_query_builder(SelectType::Star)
793 .build()
794 .fetch_all(exec)
795 .await?;
796 let mapped = rows
797 .into_iter()
798 .map(|row| self.map_pg_row(row))
799 .collect::<Result<Vec<Row>, _>>()?;
800 let merged =
801 <Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
802 return merged.into_iter().next().ok_or(sqlx::Error::RowNotFound);
803 }
804
805 let row = self
806 .to_query_builder(SelectType::Star)
807 .build()
808 .fetch_one(exec)
809 .await?;
810
811 self.map_pg_row(row)
812 }
813
814 async fn fetch_all<'e, E>(&self, exec: E) -> Result<Vec<Row>, sqlx::Error>
815 where
816 E: Executor<'e, Database = Postgres>,
817 {
818 let rows = self
819 .to_query_builder(SelectType::Star)
820 .build()
821 .fetch_all(exec)
822 .await?;
823
824 let mapped = rows
825 .into_iter()
826 .map(|row| self.map_pg_row(row))
827 .collect::<Result<Vec<Row>, _>>()?;
828
829 Ok(<Row as HydrateRow<C>>::merge_collection_rows(mapped, self))
830 }
831
832 async fn fetch_optional<'e, E>(
833 &self,
834 exec: E,
835 ) -> Result<Option<Row>, sqlx::Error>
836 where
837 E: Executor<'e, Database = Postgres>,
838 {
839 if <Row as HydrateRow<C>>::requires_collection_merge(self) {
840 let rows = self
841 .to_query_builder(SelectType::Star)
842 .build()
843 .fetch_all(exec)
844 .await?;
845 let mapped = rows
846 .into_iter()
847 .map(|row| self.map_pg_row(row))
848 .collect::<Result<Vec<Row>, _>>()?;
849 let merged =
850 <Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
851 return Ok(merged.into_iter().next());
852 }
853
854 let row = self
855 .to_query_builder(SelectType::Star)
856 .build()
857 .fetch_optional(exec)
858 .await?;
859
860 match row {
861 Some(row) => self.map_pg_row(row).map(Some),
862 None => Ok(None),
863 }
864 }
865}
866
867fn push_join_path_inline(
868 qb: &mut sqlx::QueryBuilder<'static, Postgres>,
869 path: &JoinPath,
870 base_table: &str,
871) {
872 if path.is_empty() {
873 return;
874 }
875
876 let mut left_alias = base_table.to_string();
877 let mut alias_prefix = String::new();
878
879 for segment in path.segments() {
880 let join_word = match segment.kind {
881 JoinKind::Inner => " INNER JOIN ",
882 JoinKind::Left => " LEFT JOIN ",
883 };
884
885 if let Some(through) = segment.descriptor.through {
886 let mut through_alias = alias_prefix.clone();
887 through_alias.push_str(through.alias_segment);
888 let clause = format!(
889 r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
890 join = join_word,
891 table = through.table,
892 alias = &through_alias,
893 left = &left_alias,
894 left_field = through.left_field,
895 right_field = through.right_field,
896 );
897 qb.push(clause);
898 left_alias = through_alias;
899 }
900
901 alias_prefix.push_str(segment.descriptor.alias_segment);
902 let right_alias = alias_prefix.clone();
903
904 let clause = format!(
905 r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
906 join = join_word,
907 table = segment.descriptor.right_table,
908 alias = &right_alias,
909 left = &left_alias,
910 left_field = segment.descriptor.left_field,
911 right_field = segment.descriptor.right_field,
912 );
913
914 qb.push(clause);
915 left_alias = right_alias;
916 }
917}
918
919#[async_trait::async_trait]
920impl<'a, C, Row> ExecutablePlan<C> for ReadQueryPlan<'a, C, Row>
921where
922 C: QueryContext,
923 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
924{
925 async fn execute<'e, E>(&self, exec: E) -> Result<u64, sqlx::Error>
926 where
927 E: Executor<'e, Database = Postgres>,
928 {
929 let rows = self
930 .to_query_builder(SelectType::Star)
931 .build()
932 .execute(exec)
933 .await?
934 .rows_affected();
935
936 Ok(rows)
937 }
938}
939
940impl<'a, C, Row> Planable<C, Row> for ReadQueryPlan<'a, C, Row>
941where
942 C: QueryContext,
943 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
944{
945}
946
947trait HydrateRow<C: QueryContext>: Sized {
948 fn from_pg_row(
949 plan: &ReadQueryPlan<C, Self>,
950 row: PgRow,
951 ) -> Result<Self, sqlx::Error>;
952
953 fn requires_collection_merge(_plan: &ReadQueryPlan<C, Self>) -> bool {
954 false
955 }
956
957 fn merge_collection_rows(
958 rows: Vec<Self>,
959 _plan: &ReadQueryPlan<C, Self>,
960 ) -> Vec<Self> {
961 rows
962 }
963}
964
965impl<C, Row> HydrateRow<C> for Row
966where
967 C: QueryContext,
968 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
969{
970 default fn from_pg_row(
971 _plan: &ReadQueryPlan<C, Row>,
972 row: PgRow,
973 ) -> Result<Self, sqlx::Error> {
974 Row::from_row(&row)
975 }
976
977 default fn requires_collection_merge(
978 _plan: &ReadQueryPlan<C, Self>,
979 ) -> bool {
980 false
981 }
982
983 default fn merge_collection_rows(
984 rows: Vec<Self>,
985 _plan: &ReadQueryPlan<C, Self>,
986 ) -> Vec<Self> {
987 rows
988 }
989}
990
991impl<C> HydrateRow<C> for C::Model
992where
993 C: QueryContext,
994 C::Model: JoinNavigationModel,
995{
996 fn from_pg_row(
997 plan: &ReadQueryPlan<C, Self>,
998 row: PgRow,
999 ) -> Result<Self, sqlx::Error> {
1000 let mut model = Self::from_row(&row)?;
1001 if plan.selection.is_none() {
1002 model.hydrate_navigations(plan.joins.as_deref(), &row, "")?;
1003 }
1004 Ok(model)
1005 }
1006
1007 fn requires_collection_merge(plan: &ReadQueryPlan<C, Self>) -> bool {
1008 plan.selection.is_none() &&
1009 Self::has_collection_joins(plan.joins.as_deref())
1010 }
1011
1012 fn merge_collection_rows(
1013 rows: Vec<Self>,
1014 plan: &ReadQueryPlan<C, Self>,
1015 ) -> Vec<Self> {
1016 if !Self::has_collection_joins(plan.joins.as_deref()) {
1017 return rows;
1018 }
1019
1020 <Self as JoinNavigationModel>::merge_collection_rows(
1021 rows,
1022 plan.joins.as_deref(),
1023 )
1024 }
1025}
1026
1027pub struct ReadQueryBuilder<
1028 'a,
1029 C: QueryContext,
1030 Row = <C as QueryContext>::Model,
1031> {
1032 pub(crate) table: &'a str,
1033 pub(crate) joins: Option<Vec<JoinPath>>,
1034 pub(crate) where_expr: Option<Expression<C::Query>>,
1035 pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
1036 pub(crate) pagination: Option<Pagination>,
1037 pub(crate) include_deleted: bool,
1038 pub(crate) delete_marker_field: Option<&'a str>,
1039 pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
1040 pub(crate) group_by: Option<Vec<SelectionColumn>>,
1041 pub(crate) having: Option<Vec<HavingPredicate>>,
1042 pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
1043 row: PhantomData<Row>,
1044}
1045
1046impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1047where
1048 C: QueryContext,
1049{
1050 pub fn include_deleted(mut self) -> Self {
1051 self.include_deleted = true;
1052 self
1053 }
1054
1055 pub fn search(
1056 mut self,
1057 config: <C::Model as FullTextSearchable>::FullTextSearchConfig,
1058 ) -> Self
1059 where
1060 C::Model: FullTextSearchable + 'static,
1061 <C::Model as FullTextSearchable>::FullTextSearchConfig:
1062 Send + Sync + 'static,
1063 {
1064 self.full_text_search =
1065 Some(Box::new(ModelFullTextSearchPlan::<C::Model>::new(config)));
1066 self
1067 }
1068}
1069
1070impl<'a, C, Row> Buildable<C> for ReadQueryBuilder<'a, C, Row>
1071where
1072 C: QueryContext,
1073 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1074 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1075{
1076 type Row = Row;
1077 type Plan = ReadQueryPlan<'a, C, Row>;
1078
1079 fn from_ctx() -> Self {
1080 Self {
1081 table: C::TABLE,
1082 joins: None,
1083 where_expr: None,
1084 sort_expr: None,
1085 pagination: None,
1086 include_deleted: false,
1087 delete_marker_field: C::Model::delete_marker_field(),
1088 selection: None,
1089 group_by: None,
1090 having: None,
1091 full_text_search: None,
1092 row: PhantomData,
1093 }
1094 }
1095
1096 fn build(self) -> Self::Plan {
1097 let mut plan = ReadQueryPlan {
1098 joins: self.joins,
1099 where_expr: self.where_expr,
1100 sort_expr: self.sort_expr,
1101 pagination: self.pagination,
1102 table: self.table,
1103 include_deleted: self.include_deleted,
1104 delete_marker_field: self.delete_marker_field,
1105 selection: self.selection,
1106 group_by: self.group_by,
1107 having: self.having,
1108 full_text_search: self.full_text_search,
1109 aggregate_filter: None,
1110 row: PhantomData,
1111 };
1112 plan.compute_aggregate_filter();
1113 plan
1114 }
1115}
1116
1117impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1118where
1119 C: QueryContext,
1120 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1121 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1122{
1123 pub fn take<NewRow>(
1124 self,
1125 selection: SelectionList<NewRow, SelectionEntry>,
1126 ) -> ReadQueryBuilder<'a, C, NewRow>
1127 where
1128 NewRow: Send
1129 + Sync
1130 + Unpin
1131 + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1132 {
1133 ReadQueryBuilder {
1134 table: self.table,
1135 joins: self.joins,
1136 where_expr: self.where_expr,
1137 sort_expr: self.sort_expr,
1138 pagination: self.pagination,
1139 include_deleted: self.include_deleted,
1140 delete_marker_field: self.delete_marker_field,
1141 selection: Some(selection),
1142 group_by: self.group_by,
1143 having: self.having,
1144 full_text_search: self.full_text_search,
1145 row: PhantomData,
1146 }
1147 }
1148
1149 pub fn group_by(mut self, group_by: GroupByList) -> Self {
1150 let cols = group_by.into_columns().into_vec();
1151 self.group_by = Some(cols);
1152 self
1153 }
1154
1155 pub fn having(mut self, having: HavingList) -> Self {
1156 self.having = Some(having.into_predicates());
1157 self
1158 }
1159}
1160
1161impl<'a, C, Row> BuildableFilter<C> for ReadQueryBuilder<'a, C, Row>
1162where
1163 C: QueryContext,
1164 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1165{
1166 fn r#where(mut self, e: Expression<<C as QueryContext>::Query>) -> Self {
1167 match self.where_expr {
1168 Some(existing) => self.where_expr = Some(and![existing, e]),
1169 None => self.where_expr = Some(e),
1170 };
1171
1172 self
1173 }
1174}
1175
1176impl<'a, C, Row> BuildableJoin<C> for ReadQueryBuilder<'a, C, Row>
1177where
1178 C: QueryContext,
1179 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1180{
1181 fn join(self, join: <C as QueryContext>::Join, kind: JoinKind) -> Self {
1182 self.join_path(JoinPath::from_join(join, kind))
1183 }
1184
1185 fn join_path(mut self, path: JoinPath) -> Self {
1186 if let Some(expected) = path.first_table() {
1187 assert_eq!(
1188 expected, self.table,
1189 "join path must start at base table `{}` but started at `{}`",
1190 self.table, expected,
1191 );
1192 }
1193
1194 match &mut self.joins {
1195 Some(existing) => existing.push(path),
1196 None => self.joins = Some(vec![path]),
1197 };
1198
1199 self
1200 }
1201}
1202
1203impl<'a, C, Row> BuildableSort<C> for ReadQueryBuilder<'a, C, Row>
1204where
1205 C: QueryContext,
1206 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1207{
1208 fn order_by(mut self, s: SortOrder<<C as QueryContext>::Sort>) -> Self {
1209 match self.sort_expr {
1210 Some(existing) => self.sort_expr = Some(order_by![existing, s]),
1211 None => self.sort_expr = Some(s),
1212 }
1213
1214 self
1215 }
1216}
1217
1218impl<'a, C, Row> BuildablePage<C> for ReadQueryBuilder<'a, C, Row>
1219where
1220 C: QueryContext,
1221 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1222{
1223 fn paginate(mut self, p: Pagination) -> Self {
1224 self.pagination = Some(p);
1225 self
1226 }
1227}
1228
1229impl<'a, C, Row> BuildableReadQuery<C, Row> for ReadQueryBuilder<'a, C, Row>
1230where
1231 C: QueryContext,
1232 C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1233 Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1234{
1235}