Skip to main content

sqlxo/
read.rs

1use smallvec::SmallVec;
2use std::marker::PhantomData;
3
4use sqlx::{
5	postgres::PgRow,
6	Executor,
7	FromRow,
8	Postgres,
9};
10use sqlxo_traits::{
11	AliasedColumn,
12	FullTextSearchConfig,
13	FullTextSearchable,
14	GetDeleteMarker,
15	JoinIdentifiable,
16	JoinKind,
17	JoinNavigationModel,
18	JoinPath,
19	PrimaryKey,
20	QueryContext,
21	SqlWrite,
22};
23
24use crate::{
25	and,
26	blocks::{
27		BuildableFilter,
28		BuildableJoin,
29		BuildablePage,
30		BuildableSort,
31		Expression,
32		Page,
33		Pagination,
34		QualifiedColumn,
35		ReadHead,
36		SelectProjection,
37		SelectType,
38		SortOrder,
39		SqlWriter,
40	},
41	order_by,
42	select::{
43		AggregateFunction,
44		AggregateSelection,
45		GroupByList,
46		HavingList,
47		HavingPredicate,
48		SelectionColumn,
49		SelectionEntry,
50		SelectionList,
51	},
52	Buildable,
53	ExecutablePlan,
54	FetchablePlan,
55	Planable,
56	PrimaryKeyExpression,
57	QueryBuilder,
58};
59
60/// TODO: this will be useful once multiple sql dialects will be supported
61#[allow(dead_code)]
62pub trait BuildableReadQuery<C, Row = <C as QueryContext>::Model>:
63	Buildable<C, Row = Row, Plan: Planable<C, Row>>
64	+ BuildableFilter<C>
65	+ BuildableJoin<C>
66	+ BuildableSort<C>
67	+ BuildablePage<C>
68where
69	C: QueryContext,
70	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
71{
72}
73
74pub(crate) trait DynFullTextSearchPlan: Send + Sync {
75	fn write_condition(
76		&self,
77		w: &mut SqlWriter,
78		base_alias: &str,
79		joins: Option<&[JoinPath]>,
80	);
81
82	fn write_rank_expr(
83		&self,
84		w: &mut SqlWriter,
85		base_alias: &str,
86		joins: Option<&[JoinPath]>,
87	);
88
89	fn include_rank(&self) -> bool;
90}
91
92struct ModelFullTextSearchPlan<M>
93where
94	M: FullTextSearchable,
95{
96	config:  M::FullTextSearchConfig,
97	_marker: PhantomData<M>,
98}
99
100impl<M> ModelFullTextSearchPlan<M>
101where
102	M: FullTextSearchable,
103{
104	fn new(config: M::FullTextSearchConfig) -> Self {
105		Self {
106			config,
107			_marker: PhantomData,
108		}
109	}
110}
111
112impl<M> DynFullTextSearchPlan for ModelFullTextSearchPlan<M>
113where
114	M: FullTextSearchable + Send + Sync + 'static,
115	M::FullTextSearchConfig: Send + Sync,
116{
117	fn write_condition(
118		&self,
119		w: &mut SqlWriter,
120		base_alias: &str,
121		joins: Option<&[JoinPath]>,
122	) {
123		M::write_search_predicate(w, base_alias, joins, &self.config);
124	}
125
126	fn write_rank_expr(
127		&self,
128		w: &mut SqlWriter,
129		base_alias: &str,
130		joins: Option<&[JoinPath]>,
131	) {
132		M::write_search_score(w, base_alias, joins, &self.config);
133	}
134
135	fn include_rank(&self) -> bool {
136		self.config.include_rank()
137	}
138}
139
140pub struct ReadQueryPlan<'a, C: QueryContext, Row = <C as QueryContext>::Model>
141{
142	pub(crate) joins: Option<Vec<JoinPath>>,
143	pub(crate) where_expr: Option<Expression<C::Query>>,
144	pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
145	pub(crate) pagination: Option<Pagination>,
146	pub(crate) table: &'a str,
147	pub(crate) include_deleted: bool,
148	pub(crate) delete_marker_field: Option<&'a str>,
149	pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
150	pub(crate) group_by: Option<Vec<SelectionColumn>>,
151	pub(crate) having: Option<Vec<HavingPredicate>>,
152	pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
153	pub(crate) aggregate_filter: Option<AggregateFilter>,
154	row: PhantomData<Row>,
155}
156
157#[derive(Clone)]
158pub struct AggregateFilter {
159	pub columns:    SmallVec<[&'static str; 2]>,
160	pub predicates: Vec<HavingPredicate>,
161}
162
163fn build_alias_lookup(
164	joins: Option<&[JoinPath]>,
165) -> Vec<(&'static str, String)> {
166	let mut aliases = Vec::new();
167
168	if let Some(paths) = joins {
169		for path in paths {
170			let mut alias_prefix = String::new();
171			for segment in path.segments() {
172				if let Some(through) = segment.descriptor.through {
173					let mut through_alias = alias_prefix.clone();
174					through_alias.push_str(through.alias_segment);
175					aliases.push((through.table, through_alias));
176				}
177				alias_prefix.push_str(segment.descriptor.alias_segment);
178				aliases.push((
179					segment.descriptor.right_table,
180					alias_prefix.clone(),
181				));
182			}
183		}
184	}
185
186	aliases
187}
188
189fn path_is_prefix(prefix: &JoinPath, candidate: &JoinPath) -> bool {
190	let prefix_segments = prefix.segments();
191	let candidate_segments = candidate.segments();
192
193	if prefix_segments.len() > candidate_segments.len() {
194		return false;
195	}
196
197	prefix_segments
198		.iter()
199		.zip(candidate_segments.iter())
200		.all(|(left, right)| left == right)
201}
202
203fn merge_unique_join_paths(mut paths: Vec<JoinPath>) -> Vec<JoinPath> {
204	let mut unique: Vec<JoinPath> = Vec::new();
205	for path in paths.drain(..) {
206		if unique.iter().any(|existing| existing == &path) {
207			continue;
208		}
209		unique.push(path);
210	}
211
212	let mut merged: Vec<JoinPath> = Vec::new();
213	for idx in 0..unique.len() {
214		let current = &unique[idx];
215		let shadowed = unique.iter().enumerate().any(|(other_idx, other)| {
216			other_idx != idx &&
217				path_is_prefix(current, other) &&
218				current.len() < other.len()
219		});
220
221		if !shadowed {
222			merged.push(current.clone());
223		}
224	}
225
226	merged
227}
228
229fn resolve_alias_for_table(
230	table: &'static str,
231	column: &'static str,
232	base_table: &str,
233	aliases: &[(&'static str, String)],
234) -> String {
235	if table == base_table {
236		return base_table.to_string();
237	}
238
239	let mut matches =
240		aliases.iter().filter(|(tbl, _)| *tbl == table).peekable();
241
242	let Some((_, alias)) = matches.next() else {
243		panic!(
244			"`take!` requested column `{table}.{column}` but `{table}` is not \
245			 part of the join set"
246		);
247	};
248
249	if matches.peek().is_some() {
250		panic!(
251			"`take!` requested column `{table}.{column}` but `{table}` is \
252			 joined multiple times; disambiguation is not implemented yet"
253		);
254	}
255
256	alias.clone()
257}
258
259fn resolve_selection_columns(
260	selection: &[SelectionColumn],
261	base_table: &str,
262	joins: Option<&[JoinPath]>,
263) -> SmallVec<[QualifiedColumn; 4]> {
264	let aliases = build_alias_lookup(joins);
265	let mut resolved = SmallVec::new();
266
267	for col in selection {
268		resolved.push(resolve_selection_column(col, base_table, &aliases));
269	}
270
271	resolved
272}
273
274fn resolve_selection_column(
275	column: &SelectionColumn,
276	base_table: &str,
277	aliases: &[(&'static str, String)],
278) -> QualifiedColumn {
279	let table_alias = resolve_alias_for_table(
280		column.table,
281		column.column,
282		base_table,
283		aliases,
284	);
285	QualifiedColumn {
286		table_alias,
287		column: column.column,
288	}
289}
290
291fn format_aggregate_expression(
292	selection: &AggregateSelection,
293	base_table: &str,
294	aliases: &[(&'static str, String)],
295) -> String {
296	match selection.column {
297		Some(col) => {
298			let qualified = resolve_selection_column(&col, base_table, aliases);
299			match selection.function {
300				AggregateFunction::CountDistinct => format!(
301					r#"COUNT(DISTINCT "{}"."{}")"#,
302					qualified.table_alias, qualified.column
303				),
304				_ => format!(
305					r#"{}("{}"."{}")"#,
306					selection.function.sql_name(),
307					qualified.table_alias,
308					qualified.column
309				),
310			}
311		}
312		None => format!("{}(*)", selection.function.sql_name()),
313	}
314}
315
316fn write_having_predicate(
317	predicate: &HavingPredicate,
318	writer: &mut SqlWriter,
319	base_table: &str,
320	aliases: &[(&'static str, String)],
321) {
322	let expr =
323		format_aggregate_expression(&predicate.selection, base_table, aliases);
324	writer.push(&expr);
325	writer.push(" ");
326	writer.push(predicate.comparator.as_str());
327	writer.push(" ");
328	predicate.bind_value(writer);
329}
330
331impl<'a, C, Row> ReadQueryPlan<'a, C, Row>
332where
333	C: QueryContext,
334	C::Model: JoinNavigationModel,
335{
336	fn compute_aggregate_filter(&mut self) {
337		if self.having.is_none() ||
338			self.selection.is_some() ||
339			self.group_by.is_some()
340		{
341			return;
342		}
343
344		let pk_columns = <C::Model as PrimaryKey>::PRIMARY_KEY;
345		if pk_columns.is_empty() {
346			return;
347		}
348
349		let Some(predicates) = self.having.take() else {
350			return;
351		};
352		if predicates.is_empty() {
353			return;
354		}
355
356		let columns = SmallVec::<[&'static str; 2]>::from_slice(pk_columns);
357		self.aggregate_filter = Some(AggregateFilter {
358			columns,
359			predicates,
360		});
361	}
362
363	fn push_aggregate_filter_clause(
364		&self,
365		writer: &mut SqlWriter,
366		filter: &AggregateFilter,
367	) {
368		writer.push_where_raw(|w| {
369			if filter.columns.len() == 1 {
370				let col = filter.columns[0];
371				w.push(&format!(r#""{}"."{}""#, self.table, col));
372			} else {
373				w.push("(");
374				for (idx, col) in filter.columns.iter().enumerate() {
375					if idx > 0 {
376						w.push(", ");
377					}
378					w.push(&format!(r#""{}"."{}""#, self.table, col));
379				}
380				w.push(")");
381			}
382			w.push(" IN (");
383			self.write_aggregate_subquery(w, filter);
384			w.push(")");
385		});
386	}
387
388	fn write_aggregate_subquery(
389		&self,
390		writer: &mut SqlWriter,
391		filter: &AggregateFilter,
392	) {
393		writer.push("SELECT ");
394		for (idx, col) in filter.columns.iter().enumerate() {
395			if idx > 0 {
396				writer.push(", ");
397			}
398			writer.push(&format!(r#""{}"."{}""#, self.table, col));
399		}
400		writer.push(" FROM ");
401		writer.push(self.table);
402
403		if let Some(js) = &self.joins {
404			for path in js {
405				push_join_path_inline(
406					writer.query_builder_mut(),
407					path,
408					self.table,
409				);
410			}
411		}
412
413		self.write_subquery_filters(writer);
414		self.write_subquery_group_by(writer, filter);
415		self.write_subquery_having(writer, filter);
416	}
417
418	fn write_subquery_filters(&self, writer: &mut SqlWriter) {
419		let mut has_clause = false;
420
421		if !self.include_deleted {
422			if let Some(delete_field) = self.delete_marker_field {
423				writer.push(" WHERE ");
424				writer.push(&format!(
425					r#""{}"."{}" IS NULL"#,
426					self.table, delete_field
427				));
428				has_clause = true;
429			}
430		}
431
432		if let Some(expr) = &self.where_expr {
433			if has_clause {
434				writer.push(" AND (");
435			} else {
436				writer.push(" WHERE (");
437			}
438			expr.write(writer);
439			writer.push(")");
440			has_clause = true;
441		}
442
443		if let Some(fts) = &self.full_text_search {
444			if has_clause {
445				writer.push(" AND (");
446			} else {
447				writer.push(" WHERE (");
448			}
449			fts.write_condition(writer, self.table, self.joins.as_deref());
450			writer.push(")");
451		}
452	}
453
454	fn write_subquery_group_by(
455		&self,
456		writer: &mut SqlWriter,
457		filter: &AggregateFilter,
458	) {
459		writer.push(" GROUP BY ");
460		for (idx, col) in filter.columns.iter().enumerate() {
461			if idx > 0 {
462				writer.push(", ");
463			}
464			writer.push(&format!(r#""{}"."{}""#, self.table, col));
465		}
466	}
467
468	fn write_subquery_having(
469		&self,
470		writer: &mut SqlWriter,
471		filter: &AggregateFilter,
472	) {
473		if filter.predicates.is_empty() {
474			return;
475		}
476
477		let aliases = build_alias_lookup(self.joins.as_deref());
478		writer.push(" HAVING ");
479		for (idx, predicate) in filter.predicates.iter().enumerate() {
480			if idx > 0 {
481				writer.push(" AND ");
482			}
483			write_having_predicate(predicate, writer, self.table, &aliases);
484		}
485	}
486
487	fn to_query_builder(
488		&self,
489		select_type: SelectType,
490	) -> sqlx::QueryBuilder<'static, Postgres> {
491		let effective_select = self.select_type_for(select_type.clone());
492		let head = ReadHead::new(self.table, effective_select);
493		let mut w = SqlWriter::new(head);
494
495		if let Some(js) = &self.joins {
496			w.push_joins(js, self.table);
497		}
498
499		self.push_where_clause(&mut w);
500		if let Some(filter) = &self.aggregate_filter {
501			self.push_aggregate_filter_clause(&mut w, filter);
502		} else {
503			self.push_group_by_clause(&mut w);
504			self.push_having_clause(&mut w);
505		}
506
507		if let Some(s) = &self.sort_expr {
508			w.push_sort(s);
509		} else if !matches!(select_type, SelectType::Exists) {
510			if let Some(fts) = &self.full_text_search {
511				if fts.include_rank() {
512					w.push_order_by_raw(|writer| {
513						fts.write_rank_expr(
514							writer,
515							self.table,
516							self.joins.as_deref(),
517						);
518						writer.push(" DESC");
519					});
520				}
521			}
522		}
523
524		if let SelectType::Exists = select_type {
525			w.push_pagination(&Pagination {
526				page:      0,
527				page_size: 1,
528			});
529		} else if let Some(p) = &self.pagination {
530			w.push_pagination(p);
531		}
532
533		if let SelectType::Exists = select_type {
534			w.push(")");
535		}
536
537		w.into_builder()
538	}
539
540	fn select_type_for(&self, base: SelectType) -> SelectType {
541		let resolved = match base {
542			SelectType::Star => self
543				.selection
544				.as_ref()
545				.map(|s| self.selection_select_type(s))
546				.unwrap_or(SelectType::Star),
547			other => other,
548		};
549
550		self.apply_join_extras(resolved)
551	}
552
553	fn selection_select_type(
554		&self,
555		selection: &SelectionList<Row, SelectionEntry>,
556	) -> SelectType {
557		let mut has_columns = false;
558		let mut has_aggregates = false;
559		for entry in selection.entries() {
560			match entry {
561				SelectionEntry::Column(_) => has_columns = true,
562				SelectionEntry::Aggregate(_) => has_aggregates = true,
563			}
564		}
565
566		if has_columns && has_aggregates && self.group_by.is_none() {
567			panic!(
568				"`group_by!` must be provided when selecting columns \
569				 alongside aggregates"
570			);
571		}
572
573		if has_columns && !has_aggregates {
574			let mut cols: SmallVec<[SelectionColumn; 4]> =
575				SmallVec::with_capacity(selection.entries().len());
576			for entry in selection.entries() {
577				if let SelectionEntry::Column(col) = entry {
578					cols.push(*col);
579				}
580			}
581			return SelectType::Columns(resolve_selection_columns(
582				&cols,
583				self.table,
584				self.joins.as_deref(),
585			));
586		}
587
588		let projections = self.build_projections(selection);
589		SelectType::Projection(projections)
590	}
591
592	fn build_projections(
593		&self,
594		selection: &SelectionList<Row, SelectionEntry>,
595	) -> Vec<SelectProjection> {
596		let aliases = build_alias_lookup(self.joins.as_deref());
597		selection
598			.entries()
599			.iter()
600			.enumerate()
601			.map(|(idx, entry)| match entry {
602				SelectionEntry::Column(col) => {
603					let qualified =
604						resolve_selection_column(col, self.table, &aliases);
605					SelectProjection {
606						expression: format!(
607							r#""{}"."{}""#,
608							qualified.table_alias, qualified.column
609						),
610						alias:      None,
611					}
612				}
613				SelectionEntry::Aggregate(agg) => {
614					let expr =
615						format_aggregate_expression(agg, self.table, &aliases);
616					let alias = format!(r#"__sqlxo_sel_{}"#, idx);
617					SelectProjection {
618						expression: expr,
619						alias:      Some(alias),
620					}
621				}
622			})
623			.collect()
624	}
625
626	fn apply_join_extras(&self, select: SelectType) -> SelectType {
627		let extras = self.join_projection_columns();
628		if extras.is_empty() {
629			return select;
630		}
631
632		match select {
633			SelectType::Star => SelectType::StarWithExtras(extras),
634			SelectType::StarAndCount => SelectType::StarAndCountExtras(extras),
635			other => other,
636		}
637	}
638
639	fn join_projection_columns(&self) -> SmallVec<[AliasedColumn; 4]> {
640		if self.selection.is_some() {
641			return SmallVec::new();
642		}
643
644		C::Model::collect_join_columns(self.joins.as_deref(), "")
645	}
646
647	fn push_where_clause(&self, w: &mut SqlWriter) {
648		let mut has_clause = false;
649
650		if !self.include_deleted {
651			if let Some(delete_field) = self.delete_marker_field {
652				let qualified =
653					format!(r#""{}"."{}""#, self.table, delete_field);
654				w.push_where_raw(|writer| {
655					writer.push(&qualified);
656					writer.push(" IS NULL");
657				});
658				has_clause = true;
659			}
660		}
661
662		if let Some(e) = &self.where_expr {
663			let wrap = has_clause;
664			w.push_where_raw(|writer| {
665				if wrap {
666					writer.push("(");
667					e.write(writer);
668					writer.push(")");
669				} else {
670					e.write(writer);
671				}
672			});
673			has_clause = true;
674		}
675
676		if let Some(fts) = &self.full_text_search {
677			let wrap = has_clause;
678			w.push_where_raw(|writer| {
679				if wrap {
680					writer.push("(");
681				}
682				fts.write_condition(writer, self.table, self.joins.as_deref());
683				if wrap {
684					writer.push(")");
685				}
686			});
687		}
688	}
689
690	fn push_group_by_clause(&self, w: &mut SqlWriter) {
691		if let Some(columns) = &self.group_by {
692			if columns.is_empty() {
693				return;
694			}
695			let resolved = resolve_selection_columns(
696				columns,
697				self.table,
698				self.joins.as_deref(),
699			);
700			w.push_group_by_columns(&resolved);
701		}
702	}
703
704	fn push_having_clause(&self, w: &mut SqlWriter) {
705		let Some(predicates) = &self.having else {
706			return;
707		};
708		if predicates.is_empty() {
709			return;
710		}
711		let aliases = build_alias_lookup(self.joins.as_deref());
712		let table = self.table;
713		w.push_having(|writer| {
714			for (idx, predicate) in predicates.iter().enumerate() {
715				if idx > 0 {
716					writer.push(" AND ");
717				}
718				write_having_predicate(predicate, writer, table, &aliases);
719			}
720		});
721	}
722
723	pub async fn fetch_page<'e, E>(
724		&self,
725		exec: E,
726	) -> Result<Page<C::Model>, sqlx::Error>
727	where
728		E: Executor<'e, Database = Postgres>,
729	{
730		#[derive(sqlx::FromRow)]
731		struct RowWithCount<M> {
732			#[sqlx(flatten)]
733			model:       M,
734			total_count: i64,
735		}
736
737		let rows: Vec<PgRow> = self
738			.to_query_builder(SelectType::StarAndCount)
739			.build()
740			.fetch_all(exec)
741			.await?;
742
743		let pagination = self.pagination.unwrap_or_default();
744
745		if rows.is_empty() {
746			return Ok(Page::new(vec![], pagination, 0));
747		}
748
749		let mut total = 0;
750		let mut items = Vec::with_capacity(rows.len());
751		let hydrate = self.selection.is_none();
752
753		for row in rows {
754			let mut parsed = RowWithCount::<C::Model>::from_row(&row)?;
755			if hydrate {
756				parsed.model.hydrate_navigations(
757					self.joins.as_deref(),
758					&row,
759					"",
760				)?;
761			}
762			total = parsed.total_count;
763			items.push(parsed.model);
764		}
765
766		if hydrate && C::Model::has_collection_joins(self.joins.as_deref()) {
767			items = <C::Model as JoinNavigationModel>::merge_collection_rows(
768				items,
769				self.joins.as_deref(),
770			);
771		}
772
773		Ok(Page::new(items, pagination, total))
774	}
775
776	pub async fn exists<'e, E>(&self, exec: E) -> Result<bool, sqlx::Error>
777	where
778		E: Executor<'e, Database = Postgres>,
779	{
780		#[derive(sqlx::FromRow)]
781		struct ExistsRow {
782			exists: bool,
783		}
784
785		let row: ExistsRow = self
786			.to_query_builder(SelectType::Exists)
787			.build_query_as::<ExistsRow>()
788			.fetch_one(exec)
789			.await?;
790
791		Ok(row.exists)
792	}
793
794	#[cfg(any(test, feature = "test-utils"))]
795	pub fn sql(&self, build: SelectType) -> String {
796		use sqlx::Execute;
797		self.to_query_builder(build).build().sql().to_string()
798	}
799
800	fn map_pg_row(&self, row: PgRow) -> Result<Row, sqlx::Error>
801	where
802		Row: HydrateRow<C>,
803	{
804		<Row as HydrateRow<C>>::from_pg_row(self, row)
805	}
806}
807
808#[async_trait::async_trait]
809impl<'a, C, Row> FetchablePlan<C, Row> for ReadQueryPlan<'a, C, Row>
810where
811	C: QueryContext,
812	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
813{
814	async fn fetch_one<'e, E>(&self, exec: E) -> Result<Row, sqlx::Error>
815	where
816		E: Executor<'e, Database = Postgres>,
817	{
818		if <Row as HydrateRow<C>>::requires_collection_merge(self) {
819			let rows = self
820				.to_query_builder(SelectType::Star)
821				.build()
822				.fetch_all(exec)
823				.await?;
824			let mapped = rows
825				.into_iter()
826				.map(|row| self.map_pg_row(row))
827				.collect::<Result<Vec<Row>, _>>()?;
828			let merged =
829				<Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
830			return merged.into_iter().next().ok_or(sqlx::Error::RowNotFound);
831		}
832
833		let row = self
834			.to_query_builder(SelectType::Star)
835			.build()
836			.fetch_one(exec)
837			.await?;
838
839		self.map_pg_row(row)
840	}
841
842	async fn fetch_all<'e, E>(&self, exec: E) -> Result<Vec<Row>, sqlx::Error>
843	where
844		E: Executor<'e, Database = Postgres>,
845	{
846		let rows = self
847			.to_query_builder(SelectType::Star)
848			.build()
849			.fetch_all(exec)
850			.await?;
851
852		let mapped = rows
853			.into_iter()
854			.map(|row| self.map_pg_row(row))
855			.collect::<Result<Vec<Row>, _>>()?;
856
857		Ok(<Row as HydrateRow<C>>::merge_collection_rows(mapped, self))
858	}
859
860	async fn fetch_optional<'e, E>(
861		&self,
862		exec: E,
863	) -> Result<Option<Row>, sqlx::Error>
864	where
865		E: Executor<'e, Database = Postgres>,
866	{
867		if <Row as HydrateRow<C>>::requires_collection_merge(self) {
868			let rows = self
869				.to_query_builder(SelectType::Star)
870				.build()
871				.fetch_all(exec)
872				.await?;
873			let mapped = rows
874				.into_iter()
875				.map(|row| self.map_pg_row(row))
876				.collect::<Result<Vec<Row>, _>>()?;
877			let merged =
878				<Row as HydrateRow<C>>::merge_collection_rows(mapped, self);
879			return Ok(merged.into_iter().next());
880		}
881
882		let row = self
883			.to_query_builder(SelectType::Star)
884			.build()
885			.fetch_optional(exec)
886			.await?;
887
888		match row {
889			Some(row) => self.map_pg_row(row).map(Some),
890			None => Ok(None),
891		}
892	}
893}
894
895fn push_join_path_inline(
896	qb: &mut sqlx::QueryBuilder<'static, Postgres>,
897	path: &JoinPath,
898	base_table: &str,
899) {
900	if path.is_empty() {
901		return;
902	}
903
904	let mut left_alias = base_table.to_string();
905	let mut alias_prefix = String::new();
906
907	for segment in path.segments() {
908		let join_word = match segment.kind {
909			JoinKind::Inner => " INNER JOIN ",
910			JoinKind::Left => " LEFT JOIN ",
911		};
912
913		if let Some(through) = segment.descriptor.through {
914			let mut through_alias = alias_prefix.clone();
915			through_alias.push_str(through.alias_segment);
916			let clause = format!(
917				r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
918				join = join_word,
919				table = through.table,
920				alias = &through_alias,
921				left = &left_alias,
922				left_field = through.left_field,
923				right_field = through.right_field,
924			);
925			qb.push(clause);
926			left_alias = through_alias;
927		}
928
929		alias_prefix.push_str(segment.descriptor.alias_segment);
930		let right_alias = alias_prefix.clone();
931
932		let clause = format!(
933			r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
934			join = join_word,
935			table = segment.descriptor.right_table,
936			alias = &right_alias,
937			left = &left_alias,
938			left_field = segment.descriptor.left_field,
939			right_field = segment.descriptor.right_field,
940		);
941
942		qb.push(clause);
943		left_alias = right_alias;
944	}
945}
946
947#[async_trait::async_trait]
948impl<'a, C, Row> ExecutablePlan<C> for ReadQueryPlan<'a, C, Row>
949where
950	C: QueryContext,
951	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
952{
953	async fn execute<'e, E>(&self, exec: E) -> Result<u64, sqlx::Error>
954	where
955		E: Executor<'e, Database = Postgres>,
956	{
957		let rows = self
958			.to_query_builder(SelectType::Star)
959			.build()
960			.execute(exec)
961			.await?
962			.rows_affected();
963
964		Ok(rows)
965	}
966}
967
968impl<'a, C, Row> Planable<C, Row> for ReadQueryPlan<'a, C, Row>
969where
970	C: QueryContext,
971	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
972{
973}
974
975trait HydrateRow<C: QueryContext>: Sized {
976	fn from_pg_row(
977		plan: &ReadQueryPlan<C, Self>,
978		row: PgRow,
979	) -> Result<Self, sqlx::Error>;
980
981	fn requires_collection_merge(_plan: &ReadQueryPlan<C, Self>) -> bool {
982		false
983	}
984
985	fn merge_collection_rows(
986		rows: Vec<Self>,
987		_plan: &ReadQueryPlan<C, Self>,
988	) -> Vec<Self> {
989		rows
990	}
991}
992
993impl<C, Row> HydrateRow<C> for Row
994where
995	C: QueryContext,
996	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
997{
998	default fn from_pg_row(
999		_plan: &ReadQueryPlan<C, Row>,
1000		row: PgRow,
1001	) -> Result<Self, sqlx::Error> {
1002		Row::from_row(&row)
1003	}
1004
1005	default fn requires_collection_merge(
1006		_plan: &ReadQueryPlan<C, Self>,
1007	) -> bool {
1008		false
1009	}
1010
1011	default fn merge_collection_rows(
1012		rows: Vec<Self>,
1013		_plan: &ReadQueryPlan<C, Self>,
1014	) -> Vec<Self> {
1015		rows
1016	}
1017}
1018
1019impl<C> HydrateRow<C> for C::Model
1020where
1021	C: QueryContext,
1022	C::Model: JoinNavigationModel,
1023{
1024	fn from_pg_row(
1025		plan: &ReadQueryPlan<C, Self>,
1026		row: PgRow,
1027	) -> Result<Self, sqlx::Error> {
1028		let mut model = Self::from_row(&row)?;
1029		if plan.selection.is_none() {
1030			model.hydrate_navigations(plan.joins.as_deref(), &row, "")?;
1031		}
1032		Ok(model)
1033	}
1034
1035	fn requires_collection_merge(plan: &ReadQueryPlan<C, Self>) -> bool {
1036		plan.selection.is_none() &&
1037			Self::has_collection_joins(plan.joins.as_deref())
1038	}
1039
1040	fn merge_collection_rows(
1041		rows: Vec<Self>,
1042		plan: &ReadQueryPlan<C, Self>,
1043	) -> Vec<Self> {
1044		if !Self::has_collection_joins(plan.joins.as_deref()) {
1045			return rows;
1046		}
1047
1048		<Self as JoinNavigationModel>::merge_collection_rows(
1049			rows,
1050			plan.joins.as_deref(),
1051		)
1052	}
1053}
1054
1055enum RelationReloadTarget<'m, M> {
1056	One(&'m mut M),
1057	Many(&'m mut [M]),
1058}
1059
1060pub struct RelationReloadBuilder<'m, C: QueryContext> {
1061	target:                 RelationReloadTarget<'m, C::Model>,
1062	include_lazy_relations: bool,
1063	_phantom:               PhantomData<C>,
1064}
1065
1066impl<'m, C> RelationReloadBuilder<'m, C>
1067where
1068	C: QueryContext,
1069{
1070	pub(crate) fn one(model: &'m mut C::Model) -> Self {
1071		Self {
1072			target:                 RelationReloadTarget::One(model),
1073			include_lazy_relations: false,
1074			_phantom:               PhantomData,
1075		}
1076	}
1077
1078	pub(crate) fn many(models: &'m mut [C::Model]) -> Self {
1079		Self {
1080			target:                 RelationReloadTarget::Many(models),
1081			include_lazy_relations: false,
1082			_phantom:               PhantomData,
1083		}
1084	}
1085
1086	pub fn include_lazy_relations(mut self) -> Self {
1087		self.include_lazy_relations = true;
1088		self
1089	}
1090
1091	pub async fn execute<'e, E>(self, exec: E) -> Result<u64, sqlx::Error>
1092	where
1093		E: Executor<'e, Database = Postgres>,
1094		C::Model:
1095			PrimaryKeyExpression<C> + JoinIdentifiable + JoinNavigationModel,
1096	{
1097		match self.target {
1098			RelationReloadTarget::One(model) => {
1099				let mut builder = QueryBuilder::<C>::read();
1100				if self.include_lazy_relations {
1101					builder = builder.include_lazy_relations();
1102				}
1103				let loaded = builder
1104					.r#where(model.primary_key_expression())
1105					.build()
1106					.fetch_one(exec)
1107					.await?;
1108				*model = loaded;
1109				Ok(1)
1110			}
1111			RelationReloadTarget::Many(models) => {
1112				if models.is_empty() {
1113					return Ok(0);
1114				}
1115
1116				let mut filters: Vec<Expression<C::Query>> =
1117					Vec::with_capacity(models.len());
1118				for model in models.iter() {
1119					filters.push(model.primary_key_expression());
1120				}
1121
1122				let filter_expr = if filters.len() == 1 {
1123					filters.remove(0)
1124				} else {
1125					Expression::Or(filters)
1126				};
1127
1128				let mut builder = QueryBuilder::<C>::read();
1129				if self.include_lazy_relations {
1130					builder = builder.include_lazy_relations();
1131				}
1132
1133				let loaded = builder
1134					.r#where(filter_expr)
1135					.build()
1136					.fetch_all(exec)
1137					.await?;
1138
1139				let mut refreshed = 0u64;
1140				for model in models.iter_mut() {
1141					if let Some(found) = loaded.iter().find(|candidate| {
1142						candidate.join_key() == model.join_key()
1143					}) {
1144						*model = found.clone();
1145						refreshed += 1;
1146					}
1147				}
1148				Ok(refreshed)
1149			}
1150		}
1151	}
1152}
1153
1154pub struct ReadQueryBuilder<
1155	'a,
1156	C: QueryContext,
1157	Row = <C as QueryContext>::Model,
1158> {
1159	pub(crate) table: &'a str,
1160	pub(crate) joins: Option<Vec<JoinPath>>,
1161	pub(crate) where_expr: Option<Expression<C::Query>>,
1162	pub(crate) sort_expr: Option<SortOrder<C::Sort>>,
1163	pub(crate) pagination: Option<Pagination>,
1164	pub(crate) include_deleted: bool,
1165	pub(crate) auto_joins: bool,
1166	pub(crate) include_lazy_relations: bool,
1167	pub(crate) delete_marker_field: Option<&'a str>,
1168	pub(crate) selection: Option<SelectionList<Row, SelectionEntry>>,
1169	pub(crate) group_by: Option<Vec<SelectionColumn>>,
1170	pub(crate) having: Option<Vec<HavingPredicate>>,
1171	pub(crate) full_text_search: Option<Box<dyn DynFullTextSearchPlan>>,
1172	row: PhantomData<Row>,
1173}
1174
1175impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1176where
1177	C: QueryContext,
1178{
1179	pub fn include_deleted(mut self) -> Self {
1180		self.include_deleted = true;
1181		self
1182	}
1183
1184	pub fn without_auto_joins(mut self) -> Self {
1185		self.auto_joins = false;
1186		self
1187	}
1188
1189	pub fn include_lazy_relations(mut self) -> Self {
1190		self.include_lazy_relations = true;
1191		self
1192	}
1193
1194	pub fn search(
1195		mut self,
1196		config: <C::Model as FullTextSearchable>::FullTextSearchConfig,
1197	) -> Self
1198	where
1199		C::Model: FullTextSearchable + 'static,
1200		<C::Model as FullTextSearchable>::FullTextSearchConfig:
1201			Send + Sync + 'static,
1202	{
1203		self.full_text_search =
1204			Some(Box::new(ModelFullTextSearchPlan::<C::Model>::new(config)));
1205		self
1206	}
1207}
1208
1209impl<'a, C, Row> Buildable<C> for ReadQueryBuilder<'a, C, Row>
1210where
1211	C: QueryContext,
1212	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1213	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1214{
1215	type Row = Row;
1216	type Plan = ReadQueryPlan<'a, C, Row>;
1217
1218	fn from_ctx() -> Self {
1219		Self {
1220			table:                  C::TABLE,
1221			joins:                  None,
1222			where_expr:             None,
1223			sort_expr:              None,
1224			pagination:             None,
1225			include_deleted:        false,
1226			auto_joins:             true,
1227			include_lazy_relations: false,
1228			delete_marker_field:    C::Model::delete_marker_field(),
1229			selection:              None,
1230			group_by:               None,
1231			having:                 None,
1232			full_text_search:       None,
1233			row:                    PhantomData,
1234		}
1235	}
1236
1237	fn build(self) -> Self::Plan {
1238		let auto_join_paths: Vec<JoinPath> = if self.auto_joins {
1239			C::Model::default_join_paths(self.include_lazy_relations).into_vec()
1240		} else {
1241			Vec::new()
1242		};
1243		let mut combined_paths = auto_join_paths;
1244		if let Some(paths) = self.joins {
1245			combined_paths.extend(paths);
1246		}
1247		let resolved_joins = if combined_paths.is_empty() {
1248			None
1249		} else {
1250			Some(merge_unique_join_paths(combined_paths))
1251		};
1252
1253		let mut plan = ReadQueryPlan {
1254			joins:               resolved_joins,
1255			where_expr:          self.where_expr,
1256			sort_expr:           self.sort_expr,
1257			pagination:          self.pagination,
1258			table:               self.table,
1259			include_deleted:     self.include_deleted,
1260			delete_marker_field: self.delete_marker_field,
1261			selection:           self.selection,
1262			group_by:            self.group_by,
1263			having:              self.having,
1264			full_text_search:    self.full_text_search,
1265			aggregate_filter:    None,
1266			row:                 PhantomData,
1267		};
1268		plan.compute_aggregate_filter();
1269		plan
1270	}
1271}
1272
1273impl<'a, C, Row> ReadQueryBuilder<'a, C, Row>
1274where
1275	C: QueryContext,
1276	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1277	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1278{
1279	pub fn take<NewRow>(
1280		self,
1281		selection: SelectionList<NewRow, SelectionEntry>,
1282	) -> ReadQueryBuilder<'a, C, NewRow>
1283	where
1284		NewRow: Send
1285			+ Sync
1286			+ Unpin
1287			+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1288	{
1289		ReadQueryBuilder {
1290			table:                  self.table,
1291			joins:                  self.joins,
1292			where_expr:             self.where_expr,
1293			sort_expr:              self.sort_expr,
1294			pagination:             self.pagination,
1295			include_deleted:        self.include_deleted,
1296			auto_joins:             self.auto_joins,
1297			include_lazy_relations: self.include_lazy_relations,
1298			delete_marker_field:    self.delete_marker_field,
1299			selection:              Some(selection),
1300			group_by:               self.group_by,
1301			having:                 self.having,
1302			full_text_search:       self.full_text_search,
1303			row:                    PhantomData,
1304		}
1305	}
1306
1307	pub fn group_by(mut self, group_by: GroupByList) -> Self {
1308		let cols = group_by.into_columns().into_vec();
1309		self.group_by = Some(cols);
1310		self
1311	}
1312
1313	pub fn having(mut self, having: HavingList) -> Self {
1314		self.having = Some(having.into_predicates());
1315		self
1316	}
1317}
1318
1319impl<'a, C, Row> BuildableFilter<C> for ReadQueryBuilder<'a, C, Row>
1320where
1321	C: QueryContext,
1322	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1323{
1324	fn r#where(mut self, e: Expression<<C as QueryContext>::Query>) -> Self {
1325		match self.where_expr {
1326			Some(existing) => self.where_expr = Some(and![existing, e]),
1327			None => self.where_expr = Some(e),
1328		};
1329
1330		self
1331	}
1332}
1333
1334impl<'a, C, Row> BuildableJoin<C> for ReadQueryBuilder<'a, C, Row>
1335where
1336	C: QueryContext,
1337	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1338{
1339	fn join(self, join: <C as QueryContext>::Join, kind: JoinKind) -> Self {
1340		self.join_path(JoinPath::from_join(join, kind))
1341	}
1342
1343	fn join_path(mut self, path: JoinPath) -> Self {
1344		if let Some(expected) = path.first_table() {
1345			assert_eq!(
1346				expected, self.table,
1347				"join path must start at base table `{}` but started at `{}`",
1348				self.table, expected,
1349			);
1350		}
1351
1352		match &mut self.joins {
1353			Some(existing) => existing.push(path),
1354			None => self.joins = Some(vec![path]),
1355		};
1356
1357		self
1358	}
1359}
1360
1361impl<'a, C, Row> BuildableSort<C> for ReadQueryBuilder<'a, C, Row>
1362where
1363	C: QueryContext,
1364	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1365{
1366	fn order_by(mut self, s: SortOrder<<C as QueryContext>::Sort>) -> Self {
1367		match self.sort_expr {
1368			Some(existing) => self.sort_expr = Some(order_by![existing, s]),
1369			None => self.sort_expr = Some(s),
1370		}
1371
1372		self
1373	}
1374}
1375
1376impl<'a, C, Row> BuildablePage<C> for ReadQueryBuilder<'a, C, Row>
1377where
1378	C: QueryContext,
1379	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1380{
1381	fn paginate(mut self, p: Pagination) -> Self {
1382		self.pagination = Some(p);
1383		self
1384	}
1385}
1386
1387impl<'a, C, Row> BuildableReadQuery<C, Row> for ReadQueryBuilder<'a, C, Row>
1388where
1389	C: QueryContext,
1390	C::Model: crate::GetDeleteMarker + JoinNavigationModel,
1391	Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
1392{
1393}