Skip to main content

sqlxo/
read.rs

1use smallvec::SmallVec;
2use std::marker::PhantomData;
3
4use sqlx::{
5	postgres::PgRow,
6	Executor,
7	FromRow,
8	Postgres,
9};
10use sqlxo_traits::{
11	AliasedColumn,
12	FullTextSearchConfig,
13	FullTextSearchable,
14	GetDeleteMarker,
15	JoinKind,
16	JoinNavigationModel,
17	JoinPath,
18	PrimaryKey,
19	QueryContext,
20	SqlWrite,
21};
22
23use crate::{
24	and,
25	blocks::{
26		BuildableFilter,
27		BuildableJoin,
28		BuildablePage,
29		BuildableSort,
30		Expression,
31		Page,
32		Pagination,
33		QualifiedColumn,
34		ReadHead,
35		SelectProjection,
36		SelectType,
37		SortOrder,
38		SqlWriter,
39	},
40	order_by,
41	select::{
42		AggregateFunction,
43		AggregateSelection,
44		GroupByList,
45		HavingList,
46		HavingPredicate,
47		SelectionColumn,
48		SelectionEntry,
49		SelectionList,
50	},
51	Buildable,
52	ExecutablePlan,
53	FetchablePlan,
54	Planable,
55};
56
57/// TODO: this will be useful once multiple sql dialects will be supported
58#[allow(dead_code)]
59pub trait BuildableReadQuery<C, Row = <C as QueryContext>::Model>:
60	Buildable<C, Row = Row, Plan: Planable<C, Row>>
61	+ BuildableFilter<C>
62	+ BuildableJoin<C>
63	+ BuildableSort<C>
64	+ BuildablePage<C>
65where
66	C: QueryContext,
67	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
68{
69}
70
71pub(crate) trait DynFullTextSearchPlan: Send + Sync {
72	fn write_condition(
73		&self,
74		w: &mut SqlWriter,
75		base_alias: &str,
76		joins: Option<&[JoinPath]>,
77	);
78
79	fn write_rank_expr(
80		&self,
81		w: &mut SqlWriter,
82		base_alias: &str,
83		joins: Option<&[JoinPath]>,
84	);
85
86	fn include_rank(&self) -> bool;
87}
88
89struct ModelFullTextSearchPlan<M>
90where
91	M: FullTextSearchable,
92{
93	config:  M::FullTextSearchConfig,
94	_marker: PhantomData<M>,
95}
96
97impl<M> ModelFullTextSearchPlan<M>
98where
99	M: FullTextSearchable,
100{
101	fn new(config: M::FullTextSearchConfig) -> Self {
102		Self {
103			config,
104			_marker: PhantomData,
105		}
106	}
107}
108
109impl<M> DynFullTextSearchPlan for ModelFullTextSearchPlan<M>
110where
111	M: FullTextSearchable + Send + Sync + 'static,
112	M::FullTextSearchConfig: Send + Sync,
113{
114	fn write_condition(
115		&self,
116		w: &mut SqlWriter,
117		base_alias: &str,
118		joins: Option<&[JoinPath]>,
119	) {
120		w.push("(");
121		w.push("(");
122		M::write_tsvector(w, base_alias, joins, &self.config);
123		w.push(") @@ (");
124		M::write_tsquery(w, &self.config);
125		w.push(")");
126		if self.config.fuzzy_threshold().is_some() &&
127			self.config
128				.fuzzy_tokens()
129				.map(|tokens| !tokens.is_empty())
130				.unwrap_or(false)
131		{
132			w.push(" OR ");
133			M::write_fuzzy(w, base_alias, joins, &self.config);
134		}
135		w.push(")");
136	}
137
138	fn write_rank_expr(
139		&self,
140		w: &mut SqlWriter,
141		base_alias: &str,
142		joins: Option<&[JoinPath]>,
143	) {
144		M::write_rank(w, base_alias, joins, &self.config);
145	}
146
147	fn include_rank(&self) -> bool {
148		self.config.include_rank()
149	}
150}
151
152pub struct ReadQueryPlan<'a, C: QueryContext, Row = <C as QueryContext>::Model>
153{
154	pub(crate) joins: Option<Vec<JoinPath>>,
155	pub(crate) where_expr: Option<Expression<C::Query>>,
156	pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
157	pub(crate) pagination: Option<Pagination>,
158	pub(crate) table: &'a str,
159	pub(crate) include_deleted: bool,
160	pub(crate) delete_marker_field: Option<&'a str>,
161	pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
162	pub(crate) group_by: Option<Vec<SelectionColumn>>,
163	pub(crate) having: Option<Vec<HavingPredicate>>,
164	pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
165	pub(crate) aggregate_filter: Option<AggregateFilter>,
166	row: PhantomData<Row>,
167}
168
169#[derive(Clone)]
170pub struct AggregateFilter {
171	pub columns:    SmallVec<[&'static str; 2]>,
172	pub predicates: Vec<HavingPredicate>,
173}
174
175fn build_alias_lookup(
176	joins: Option<&[JoinPath]>,
177) -> Vec<(&'static str, String)> {
178	let mut aliases = Vec::new();
179
180	if let Some(paths) = joins {
181		for path in paths {
182			let mut alias_prefix = String::new();
183			for segment in path.segments() {
184				if let Some(through) = segment.descriptor.through {
185					let mut through_alias = alias_prefix.clone();
186					through_alias.push_str(through.alias_segment);
187					aliases.push((through.table, through_alias));
188				}
189				alias_prefix.push_str(segment.descriptor.alias_segment);
190				aliases.push((
191					segment.descriptor.right_table,
192					alias_prefix.clone(),
193				));
194			}
195		}
196	}
197
198	aliases
199}
200
201fn resolve_alias_for_table(
202	table: &'static str,
203	column: &'static str,
204	base_table: &str,
205	aliases: &[(&'static str, String)],
206) -> String {
207	if table == base_table {
208		return base_table.to_string();
209	}
210
211	let mut matches =
212		aliases.iter().filter(|(tbl, _)| *tbl == table).peekable();
213
214	let Some((_, alias)) = matches.next() else {
215		panic!(
216			"`take!` requested column `{table}.{column}` but `{table}` is not \
217			 part of the join set"
218		);
219	};
220
221	if matches.peek().is_some() {
222		panic!(
223			"`take!` requested column `{table}.{column}` but `{table}` is \
224			 joined multiple times; disambiguation is not implemented yet"
225		);
226	}
227
228	alias.clone()
229}
230
231fn resolve_selection_columns(
232	selection: &[SelectionColumn],
233	base_table: &str,
234	joins: Option<&[JoinPath]>,
235) -> SmallVec<[QualifiedColumn; 4]> {
236	let aliases = build_alias_lookup(joins);
237	let mut resolved = SmallVec::new();
238
239	for col in selection {
240		resolved.push(resolve_selection_column(col, base_table, &aliases));
241	}
242
243	resolved
244}
245
246fn resolve_selection_column(
247	column: &SelectionColumn,
248	base_table: &str,
249	aliases: &[(&'static str, String)],
250) -> QualifiedColumn {
251	let table_alias = resolve_alias_for_table(
252		column.table,
253		column.column,
254		base_table,
255		aliases,
256	);
257	QualifiedColumn {
258		table_alias,
259		column: column.column,
260	}
261}
262
263fn format_aggregate_expression(
264	selection: &AggregateSelection,
265	base_table: &str,
266	aliases: &[(&'static str, String)],
267) -> String {
268	match selection.column {
269		Some(col) => {
270			let qualified = resolve_selection_column(&col, base_table, aliases);
271			match selection.function {
272				AggregateFunction::CountDistinct => format!(
273					r#"COUNT(DISTINCT "{}"."{}")"#,
274					qualified.table_alias, qualified.column
275				),
276				_ => format!(
277					r#"{}("{}"."{}")"#,
278					selection.function.sql_name(),
279					qualified.table_alias,
280					qualified.column
281				),
282			}
283		}
284		None => format!("{}(*)", selection.function.sql_name()),
285	}
286}
287
288fn write_having_predicate(
289	predicate: &HavingPredicate,
290	writer: &mut SqlWriter,
291	base_table: &str,
292	aliases: &[(&'static str, String)],
293) {
294	let expr =
295		format_aggregate_expression(&predicate.selection, base_table, aliases);
296	writer.push(&expr);
297	writer.push(" ");
298	writer.push(predicate.comparator.as_str());
299	writer.push(" ");
300	predicate.bind_value(writer);
301}
302
303impl<'a, C, Row> ReadQueryPlan<'a, C, Row>
304where
305	C: QueryContext,
306	C::Model: JoinNavigationModel,
307{
308	fn compute_aggregate_filter(&mut self) {
309		if self.having.is_none() ||
310			self.selection.is_some() ||
311			self.group_by.is_some()
312		{
313			return;
314		}
315
316		let pk_columns = <C::Model as PrimaryKey>::PRIMARY_KEY;
317		if pk_columns.is_empty() {
318			return;
319		}
320
321		let Some(predicates) = self.having.take() else {
322			return;
323		};
324		if predicates.is_empty() {
325			return;
326		}
327
328		let columns = SmallVec::<[&'static str; 2]>::from_slice(pk_columns);
329		self.aggregate_filter = Some(AggregateFilter {
330			columns,
331			predicates,
332		});
333	}
334
335	fn push_aggregate_filter_clause(
336		&self,
337		writer: &mut SqlWriter,
338		filter: &AggregateFilter,
339	) {
340		writer.push_where_raw(|w| {
341			if filter.columns.len() == 1 {
342				let col = filter.columns[0];
343				w.push(&format!(r#""{}"."{}""#, self.table, col));
344			} else {
345				w.push("(");
346				for (idx, col) in filter.columns.iter().enumerate() {
347					if idx > 0 {
348						w.push(", ");
349					}
350					w.push(&format!(r#""{}"."{}""#, self.table, col));
351				}
352				w.push(")");
353			}
354			w.push(" IN (");
355			self.write_aggregate_subquery(w, filter);
356			w.push(")");
357		});
358	}
359
360	fn write_aggregate_subquery(
361		&self,
362		writer: &mut SqlWriter,
363		filter: &AggregateFilter,
364	) {
365		writer.push("SELECT ");
366		for (idx, col) in filter.columns.iter().enumerate() {
367			if idx > 0 {
368				writer.push(", ");
369			}
370			writer.push(&format!(r#""{}"."{}""#, self.table, col));
371		}
372		writer.push(" FROM ");
373		writer.push(self.table);
374
375		if let Some(js) = &self.joins {
376			for path in js {
377				push_join_path_inline(
378					writer.query_builder_mut(),
379					path,
380					self.table,
381				);
382			}
383		}
384
385		self.write_subquery_filters(writer);
386		self.write_subquery_group_by(writer, filter);
387		self.write_subquery_having(writer, filter);
388	}
389
390	fn write_subquery_filters(&self, writer: &mut SqlWriter) {
391		let mut has_clause = false;
392
393		if !self.include_deleted {
394			if let Some(delete_field) = self.delete_marker_field {
395				writer.push(" WHERE ");
396				writer.push(&format!(
397					r#""{}"."{}" IS NULL"#,
398					self.table, delete_field
399				));
400				has_clause = true;
401			}
402		}
403
404		if let Some(expr) = &self.where_expr {
405			if has_clause {
406				writer.push(" AND (");
407			} else {
408				writer.push(" WHERE (");
409			}
410			expr.write(writer);
411			writer.push(")");
412			has_clause = true;
413		}
414
415		if let Some(fts) = &self.full_text_search {
416			if has_clause {
417				writer.push(" AND (");
418			} else {
419				writer.push(" WHERE (");
420			}
421			fts.write_condition(writer, self.table, self.joins.as_deref());
422			writer.push(")");
423		}
424	}
425
426	fn write_subquery_group_by(
427		&self,
428		writer: &mut SqlWriter,
429		filter: &AggregateFilter,
430	) {
431		writer.push(" GROUP BY ");
432		for (idx, col) in filter.columns.iter().enumerate() {
433			if idx > 0 {
434				writer.push(", ");
435			}
436			writer.push(&format!(r#""{}"."{}""#, self.table, col));
437		}
438	}
439
440	fn write_subquery_having(
441		&self,
442		writer: &mut SqlWriter,
443		filter: &AggregateFilter,
444	) {
445		if filter.predicates.is_empty() {
446			return;
447		}
448
449		let aliases = build_alias_lookup(self.joins.as_deref());
450		writer.push(" HAVING ");
451		for (idx, predicate) in filter.predicates.iter().enumerate() {
452			if idx > 0 {
453				writer.push(" AND ");
454			}
455			write_having_predicate(predicate, writer, self.table, &aliases);
456		}
457	}
458
459	fn to_query_builder(
460		&self,
461		select_type: SelectType,
462	) -> sqlx::QueryBuilder<'static, Postgres> {
463		let effective_select = self.select_type_for(select_type.clone());
464		let head = ReadHead::new(self.table, effective_select);
465		let mut w = SqlWriter::new(head);
466
467		if let Some(js) = &self.joins {
468			w.push_joins(js, self.table);
469		}
470
471		self.push_where_clause(&mut w);
472		if let Some(filter) = &self.aggregate_filter {
473			self.push_aggregate_filter_clause(&mut w, filter);
474		} else {
475			self.push_group_by_clause(&mut w);
476			self.push_having_clause(&mut w);
477		}
478
479		if let Some(s) = &self.sort_expr {
480			w.push_sort(s);
481		} else if !matches!(select_type, SelectType::Exists) {
482			if let Some(fts) = &self.full_text_search {
483				if fts.include_rank() {
484					w.push_order_by_raw(|writer| {
485						fts.write_rank_expr(
486							writer,
487							self.table,
488							self.joins.as_deref(),
489						);
490						writer.push(" DESC");
491					});
492				}
493			}
494		}
495
496		if let SelectType::Exists = select_type {
497			w.push_pagination(&Pagination {
498				page:      0,
499				page_size: 1,
500			});
501		} else if let Some(p) = &self.pagination {
502			w.push_pagination(p);
503		}
504
505		if let SelectType::Exists = select_type {
506			w.push(")");
507		}
508
509		w.into_builder()
510	}
511
512	fn select_type_for(&self, base: SelectType) -> SelectType {
513		let resolved = match base {
514			SelectType::Star => self
515				.selection
516				.as_ref()
517				.map(|s| self.selection_select_type(s))
518				.unwrap_or(SelectType::Star),
519			other => other,
520		};
521
522		self.apply_join_extras(resolved)
523	}
524
525	fn selection_select_type(
526		&self,
527		selection: &SelectionList<Row, SelectionEntry>,
528	) -> SelectType {
529		let mut has_columns = false;
530		let mut has_aggregates = false;
531		for entry in selection.entries() {
532			match entry {
533				SelectionEntry::Column(_) => has_columns = true,
534				SelectionEntry::Aggregate(_) => has_aggregates = true,
535			}
536		}
537
538		if has_columns && has_aggregates && self.group_by.is_none() {
539			panic!(
540				"`group_by!` must be provided when selecting columns \
541				 alongside aggregates"
542			);
543		}
544
545		if has_columns && !has_aggregates {
546			let mut cols: SmallVec<[SelectionColumn; 4]> =
547				SmallVec::with_capacity(selection.entries().len());
548			for entry in selection.entries() {
549				if let SelectionEntry::Column(col) = entry {
550					cols.push(*col);
551				}
552			}
553			return SelectType::Columns(resolve_selection_columns(
554				&cols,
555				self.table,
556				self.joins.as_deref(),
557			));
558		}
559
560		let projections = self.build_projections(selection);
561		SelectType::Projection(projections)
562	}
563
564	fn build_projections(
565		&self,
566		selection: &SelectionList<Row, SelectionEntry>,
567	) -> Vec<SelectProjection> {
568		let aliases = build_alias_lookup(self.joins.as_deref());
569		selection
570			.entries()
571			.iter()
572			.enumerate()
573			.map(|(idx, entry)| match entry {
574				SelectionEntry::Column(col) => {
575					let qualified =
576						resolve_selection_column(col, self.table, &aliases);
577					SelectProjection {
578						expression: format!(
579							r#""{}"."{}""#,
580							qualified.table_alias, qualified.column
581						),
582						alias:      None,
583					}
584				}
585				SelectionEntry::Aggregate(agg) => {
586					let expr =
587						format_aggregate_expression(agg, self.table, &aliases);
588					let alias = format!(r#"__sqlxo_sel_{}"#, idx);
589					SelectProjection {
590						expression: expr,
591						alias:      Some(alias),
592					}
593				}
594			})
595			.collect()
596	}
597
598	fn apply_join_extras(&self, select: SelectType) -> SelectType {
599		let extras = self.join_projection_columns();
600		if extras.is_empty() {
601			return select;
602		}
603
604		match select {
605			SelectType::Star => SelectType::StarWithExtras(extras),
606			SelectType::StarAndCount => SelectType::StarAndCountExtras(extras),
607			other => other,
608		}
609	}
610
611	fn join_projection_columns(&self) -> SmallVec<[AliasedColumn; 4]> {
612		if self.selection.is_some() {
613			return SmallVec::new();
614		}
615
616		C::Model::collect_join_columns(self.joins.as_deref(), "")
617	}
618
619	fn push_where_clause(&self, w: &mut SqlWriter) {
620		let mut has_clause = false;
621
622		if !self.include_deleted {
623			if let Some(delete_field) = self.delete_marker_field {
624				let qualified =
625					format!(r#""{}"."{}""#, self.table, delete_field);
626				w.push_where_raw(|writer| {
627					writer.push(&qualified);
628					writer.push(" IS NULL");
629				});
630				has_clause = true;
631			}
632		}
633
634		if let Some(e) = &self.where_expr {
635			let wrap = has_clause;
636			w.push_where_raw(|writer| {
637				if wrap {
638					writer.push("(");
639					e.write(writer);
640					writer.push(")");
641				} else {
642					e.write(writer);
643				}
644			});
645			has_clause = true;
646		}
647
648		if let Some(fts) = &self.full_text_search {
649			let wrap = has_clause;
650			w.push_where_raw(|writer| {
651				if wrap {
652					writer.push("(");
653				}
654				fts.write_condition(writer, self.table, self.joins.as_deref());
655				if wrap {
656					writer.push(")");
657				}
658			});
659		}
660	}
661
662	fn push_group_by_clause(&self, w: &mut SqlWriter) {
663		if let Some(columns) = &self.group_by {
664			if columns.is_empty() {
665				return;
666			}
667			let resolved = resolve_selection_columns(
668				columns,
669				self.table,
670				self.joins.as_deref(),
671			);
672			w.push_group_by_columns(&resolved);
673		}
674	}
675
676	fn push_having_clause(&self, w: &mut SqlWriter) {
677		let Some(predicates) = &self.having else {
678			return;
679		};
680		if predicates.is_empty() {
681			return;
682		}
683		let aliases = build_alias_lookup(self.joins.as_deref());
684		let table = self.table;
685		w.push_having(|writer| {
686			for (idx, predicate) in predicates.iter().enumerate() {
687				if idx > 0 {
688					writer.push(" AND ");
689				}
690				write_having_predicate(predicate, writer, table, &aliases);
691			}
692		});
693	}
694
695	pub async fn fetch_page<'e, E>(
696		&self,
697		exec: E,
698	) -> Result<Page<C::Model>, sqlx::Error>
699	where
700		E: Executor<'e, Database = Postgres>,
701	{
702		#[derive(sqlx::FromRow)]
703		struct RowWithCount<M> {
704			#[sqlx(flatten)]
705			model:       M,
706			total_count: i64,
707		}
708
709		let rows: Vec<PgRow> = self
710			.to_query_builder(SelectType::StarAndCount)
711			.build()
712			.fetch_all(exec)
713			.await?;
714
715		let pagination = self.pagination.unwrap_or_default();
716
717		if rows.is_empty() {
718			return Ok(Page::new(vec![], pagination, 0));
719		}
720
721		let mut total = 0;
722		let mut items = Vec::with_capacity(rows.len());
723		let hydrate = self.selection.is_none();
724
725		for row in rows {
726			let mut parsed = RowWithCount::<C::Model>::from_row(&row)?;
727			if hydrate {
728				parsed.model.hydrate_navigations(
729					self.joins.as_deref(),
730					&row,
731					"",
732				)?;
733			}
734			total = parsed.total_count;
735			items.push(parsed.model);
736		}
737
738		if hydrate && C::Model::has_collection_joins(self.joins.as_deref()) {
739			items = <C::Model as JoinNavigationModel>::merge_collection_rows(
740				items,
741				self.joins.as_deref(),
742			);
743		}
744
745		Ok(Page::new(items, pagination, total))
746	}
747
748	pub async fn exists<'e, E>(&self, exec: E) -> Result<bool, sqlx::Error>
749	where
750		E: Executor<'e, Database = Postgres>,
751	{
752		#[derive(sqlx::FromRow)]
753		struct ExistsRow {
754			exists: bool,
755		}
756
757		let row: ExistsRow = self
758			.to_query_builder(SelectType::Exists)
759			.build_query_as::<ExistsRow>()
760			.fetch_one(exec)
761			.await?;
762
763		Ok(row.exists)
764	}
765
766	#[cfg(any(test, feature = "test-utils"))]
767	pub fn sql(&self, build: SelectType) -> String {
768		use sqlx::Execute;
769		self.to_query_builder(build).build().sql().to_string()
770	}
771
772	fn map_pg_row(&self, row: PgRow) -> Result<Row, sqlx::Error>
773	where
774		Row: HydrateRow<C>,
775	{
776		<Row as HydrateRow<C>>::from_pg_row(self, row)
777	}
778}
779
780#[async_trait::async_trait]
781impl<'a, C, Row> FetchablePlan<C, Row> for ReadQueryPlan<'a, C, Row>
782where
783	C: QueryContext,
784	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
785{
786	async fn fetch_one<'e, E>(&self, exec: E) -> Result<Row, sqlx::Error>
787	where
788		E: Executor<'e, Database = Postgres>,
789	{
790		if <Row as HydrateRow<C>>::requires_collection_merge(self) {
791			let rows = self
792				.to_query_builder(SelectType::Star)
793				.build()
794				.fetch_all(exec)
795				.await?;
796			let mapped = rows
797				.into_iter()
798				.map(|row| self.map_pg_row(row))
799				.collect::<Result<Vec<Row>, _>>()?;
800			let merged =
801				<Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
802			return merged.into_iter().next().ok_or(sqlx::Error::RowNotFound);
803		}
804
805		let row = self
806			.to_query_builder(SelectType::Star)
807			.build()
808			.fetch_one(exec)
809			.await?;
810
811		self.map_pg_row(row)
812	}
813
814	async fn fetch_all<'e, E>(&self, exec: E) -> Result<Vec<Row>, sqlx::Error>
815	where
816		E: Executor<'e, Database = Postgres>,
817	{
818		let rows = self
819			.to_query_builder(SelectType::Star)
820			.build()
821			.fetch_all(exec)
822			.await?;
823
824		let mapped = rows
825			.into_iter()
826			.map(|row| self.map_pg_row(row))
827			.collect::<Result<Vec<Row>, _>>()?;
828
829		Ok(<Row as HydrateRow<C>>::merge_collection_rows(mapped, self))
830	}
831
832	async fn fetch_optional<'e, E>(
833		&self,
834		exec: E,
835	) -> Result<Option<Row>, sqlx::Error>
836	where
837		E: Executor<'e, Database = Postgres>,
838	{
839		if <Row as HydrateRow<C>>::requires_collection_merge(self) {
840			let rows = self
841				.to_query_builder(SelectType::Star)
842				.build()
843				.fetch_all(exec)
844				.await?;
845			let mapped = rows
846				.into_iter()
847				.map(|row| self.map_pg_row(row))
848				.collect::<Result<Vec<Row>, _>>()?;
849			let merged =
850				<Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
851			return Ok(merged.into_iter().next());
852		}
853
854		let row = self
855			.to_query_builder(SelectType::Star)
856			.build()
857			.fetch_optional(exec)
858			.await?;
859
860		match row {
861			Some(row) => self.map_pg_row(row).map(Some),
862			None => Ok(None),
863		}
864	}
865}
866
867fn push_join_path_inline(
868	qb: &mut sqlx::QueryBuilder<'static, Postgres>,
869	path: &JoinPath,
870	base_table: &str,
871) {
872	if path.is_empty() {
873		return;
874	}
875
876	let mut left_alias = base_table.to_string();
877	let mut alias_prefix = String::new();
878
879	for segment in path.segments() {
880		let join_word = match segment.kind {
881			JoinKind::Inner => " INNER JOIN ",
882			JoinKind::Left => " LEFT JOIN ",
883		};
884
885		if let Some(through) = segment.descriptor.through {
886			let mut through_alias = alias_prefix.clone();
887			through_alias.push_str(through.alias_segment);
888			let clause = format!(
889				r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
890				join = join_word,
891				table = through.table,
892				alias = &through_alias,
893				left = &left_alias,
894				left_field = through.left_field,
895				right_field = through.right_field,
896			);
897			qb.push(clause);
898			left_alias = through_alias;
899		}
900
901		alias_prefix.push_str(segment.descriptor.alias_segment);
902		let right_alias = alias_prefix.clone();
903
904		let clause = format!(
905			r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
906			join = join_word,
907			table = segment.descriptor.right_table,
908			alias = &right_alias,
909			left = &left_alias,
910			left_field = segment.descriptor.left_field,
911			right_field = segment.descriptor.right_field,
912		);
913
914		qb.push(clause);
915		left_alias = right_alias;
916	}
917}
918
919#[async_trait::async_trait]
920impl<'a, C, Row> ExecutablePlan<C> for ReadQueryPlan<'a, C, Row>
921where
922	C: QueryContext,
923	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
924{
925	async fn execute<'e, E>(&self, exec: E) -> Result<u64, sqlx::Error>
926	where
927		E: Executor<'e, Database = Postgres>,
928	{
929		let rows = self
930			.to_query_builder(SelectType::Star)
931			.build()
932			.execute(exec)
933			.await?
934			.rows_affected();
935
936		Ok(rows)
937	}
938}
939
940impl<'a, C, Row> Planable<C, Row> for ReadQueryPlan<'a, C, Row>
941where
942	C: QueryContext,
943	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
944{
945}
946
947trait HydrateRow<C: QueryContext>: Sized {
948	fn from_pg_row(
949		plan: &ReadQueryPlan<C, Self>,
950		row: PgRow,
951	) -> Result<Self, sqlx::Error>;
952
953	fn requires_collection_merge(_plan: &ReadQueryPlan<C, Self>) -> bool {
954		false
955	}
956
957	fn merge_collection_rows(
958		rows: Vec<Self>,
959		_plan: &ReadQueryPlan<C, Self>,
960	) -> Vec<Self> {
961		rows
962	}
963}
964
965impl<C, Row> HydrateRow<C> for Row
966where
967	C: QueryContext,
968	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
969{
970	default fn from_pg_row(
971		_plan: &ReadQueryPlan<C, Row>,
972		row: PgRow,
973	) -> Result<Self, sqlx::Error> {
974		Row::from_row(&row)
975	}
976
977	default fn requires_collection_merge(
978		_plan: &ReadQueryPlan<C, Self>,
979	) -> bool {
980		false
981	}
982
983	default fn merge_collection_rows(
984		rows: Vec<Self>,
985		_plan: &ReadQueryPlan<C, Self>,
986	) -> Vec<Self> {
987		rows
988	}
989}
990
991impl<C> HydrateRow<C> for C::Model
992where
993	C: QueryContext,
994	C::Model: JoinNavigationModel,
995{
996	fn from_pg_row(
997		plan: &ReadQueryPlan<C, Self>,
998		row: PgRow,
999	) -> Result<Self, sqlx::Error> {
1000		let mut model = Self::from_row(&row)?;
1001		if plan.selection.is_none() {
1002			model.hydrate_navigations(plan.joins.as_deref(), &row, "")?;
1003		}
1004		Ok(model)
1005	}
1006
1007	fn requires_collection_merge(plan: &ReadQueryPlan<C, Self>) -> bool {
1008		plan.selection.is_none() &&
1009			Self::has_collection_joins(plan.joins.as_deref())
1010	}
1011
1012	fn merge_collection_rows(
1013		rows: Vec<Self>,
1014		plan: &ReadQueryPlan<C, Self>,
1015	) -> Vec<Self> {
1016		if !Self::has_collection_joins(plan.joins.as_deref()) {
1017			return rows;
1018		}
1019
1020		<Self as JoinNavigationModel>::merge_collection_rows(
1021			rows,
1022			plan.joins.as_deref(),
1023		)
1024	}
1025}
1026
1027pub struct ReadQueryBuilder<
1028	'a,
1029	C: QueryContext,
1030	Row = <C as QueryContext>::Model,
1031> {
1032	pub(crate) table: &'a str,
1033	pub(crate) joins: Option<Vec<JoinPath>>,
1034	pub(crate) where_expr: Option<Expression<C::Query>>,
1035	pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
1036	pub(crate) pagination: Option<Pagination>,
1037	pub(crate) include_deleted: bool,
1038	pub(crate) delete_marker_field: Option<&'a str>,
1039	pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
1040	pub(crate) group_by: Option<Vec<SelectionColumn>>,
1041	pub(crate) having: Option<Vec<HavingPredicate>>,
1042	pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
1043	row: PhantomData<Row>,
1044}
1045
1046impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1047where
1048	C: QueryContext,
1049{
1050	pub fn include_deleted(mut self) -> Self {
1051		self.include_deleted = true;
1052		self
1053	}
1054
1055	pub fn search(
1056		mut self,
1057		config: <C::Model as FullTextSearchable>::FullTextSearchConfig,
1058	) -> Self
1059	where
1060		C::Model: FullTextSearchable + 'static,
1061		<C::Model as FullTextSearchable>::FullTextSearchConfig:
1062			Send + Sync + 'static,
1063	{
1064		self.full_text_search =
1065			Some(Box::new(ModelFullTextSearchPlan::<C::Model>::new(config)));
1066		self
1067	}
1068}
1069
1070impl<'a, C, Row> Buildable<C> for ReadQueryBuilder<'a, C, Row>
1071where
1072	C: QueryContext,
1073	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1074	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1075{
1076	type Row = Row;
1077	type Plan = ReadQueryPlan<'a, C, Row>;
1078
1079	fn from_ctx() -> Self {
1080		Self {
1081			table:               C::TABLE,
1082			joins:               None,
1083			where_expr:          None,
1084			sort_expr:           None,
1085			pagination:          None,
1086			include_deleted:     false,
1087			delete_marker_field: C::Model::delete_marker_field(),
1088			selection:           None,
1089			group_by:            None,
1090			having:              None,
1091			full_text_search:    None,
1092			row:                 PhantomData,
1093		}
1094	}
1095
1096	fn build(self) -> Self::Plan {
1097		let mut plan = ReadQueryPlan {
1098			joins:               self.joins,
1099			where_expr:          self.where_expr,
1100			sort_expr:           self.sort_expr,
1101			pagination:          self.pagination,
1102			table:               self.table,
1103			include_deleted:     self.include_deleted,
1104			delete_marker_field: self.delete_marker_field,
1105			selection:           self.selection,
1106			group_by:            self.group_by,
1107			having:              self.having,
1108			full_text_search:    self.full_text_search,
1109			aggregate_filter:    None,
1110			row:                 PhantomData,
1111		};
1112		plan.compute_aggregate_filter();
1113		plan
1114	}
1115}
1116
1117impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1118where
1119	C: QueryContext,
1120	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1121	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1122{
1123	pub fn take<NewRow>(
1124		self,
1125		selection: SelectionList<NewRow, SelectionEntry>,
1126	) -> ReadQueryBuilder<'a, C, NewRow>
1127	where
1128		NewRow: Send
1129			+ Sync
1130			+ Unpin
1131			+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1132	{
1133		ReadQueryBuilder {
1134			table:               self.table,
1135			joins:               self.joins,
1136			where_expr:          self.where_expr,
1137			sort_expr:           self.sort_expr,
1138			pagination:          self.pagination,
1139			include_deleted:     self.include_deleted,
1140			delete_marker_field: self.delete_marker_field,
1141			selection:           Some(selection),
1142			group_by:            self.group_by,
1143			having:              self.having,
1144			full_text_search:    self.full_text_search,
1145			row:                 PhantomData,
1146		}
1147	}
1148
1149	pub fn group_by(mut self, group_by: GroupByList) -> Self {
1150		let cols = group_by.into_columns().into_vec();
1151		self.group_by = Some(cols);
1152		self
1153	}
1154
1155	pub fn having(mut self, having: HavingList) -> Self {
1156		self.having = Some(having.into_predicates());
1157		self
1158	}
1159}
1160
1161impl<'a, C, Row> BuildableFilter<C> for ReadQueryBuilder<'a, C, Row>
1162where
1163	C: QueryContext,
1164	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1165{
1166	fn r#where(mut self, e: Expression<<C as QueryContext>::Query>) -> Self {
1167		match self.where_expr {
1168			Some(existing) => self.where_expr = Some(and![existing, e]),
1169			None => self.where_expr = Some(e),
1170		};
1171
1172		self
1173	}
1174}
1175
1176impl<'a, C, Row> BuildableJoin<C> for ReadQueryBuilder<'a, C, Row>
1177where
1178	C: QueryContext,
1179	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1180{
1181	fn join(self, join: <C as QueryContext>::Join, kind: JoinKind) -> Self {
1182		self.join_path(JoinPath::from_join(join, kind))
1183	}
1184
1185	fn join_path(mut self, path: JoinPath) -> Self {
1186		if let Some(expected) = path.first_table() {
1187			assert_eq!(
1188				expected, self.table,
1189				"join path must start at base table `{}` but started at `{}`",
1190				self.table, expected,
1191			);
1192		}
1193
1194		match &mut self.joins {
1195			Some(existing) => existing.push(path),
1196			None => self.joins = Some(vec![path]),
1197		};
1198
1199		self
1200	}
1201}
1202
1203impl<'a, C, Row> BuildableSort<C> for ReadQueryBuilder<'a, C, Row>
1204where
1205	C: QueryContext,
1206	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1207{
1208	fn order_by(mut self, s: SortOrder<<C as QueryContext>::Sort>) -> Self {
1209		match self.sort_expr {
1210			Some(existing) => self.sort_expr = Some(order_by![existing, s]),
1211			None => self.sort_expr = Some(s),
1212		}
1213
1214		self
1215	}
1216}
1217
1218impl<'a, C, Row> BuildablePage<C> for ReadQueryBuilder<'a, C, Row>
1219where
1220	C: QueryContext,
1221	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1222{
1223	fn paginate(mut self, p: Pagination) -> Self {
1224		self.pagination = Some(p);
1225		self
1226	}
1227}
1228
1229impl<'a, C, Row> BuildableReadQuery<C, Row> for ReadQueryBuilder<'a, C, Row>
1230where
1231	C: QueryContext,
1232	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1233	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1234{
1235}