multisql/executor/query/select/
mod.rs

1pub mod join;
2mod manual;
3mod order;
4mod plan;
5
6use {
7	crate::{
8		macros::try_option,
9		recipe::{PlannedRecipe, RecipeUtilities},
10		types::{LabelsAndRows, Row},
11		Glue, Result, Value,
12	},
13	futures::stream::{self, StreamExt, TryStreamExt},
14	rayon::prelude::*,
15	serde::Serialize,
16	sqlparser::ast::{OrderByExpr, Select},
17	thiserror::Error as ThisError,
18};
19pub use {
20	manual::{Manual, ManualError, SelectItem},
21	order::Order,
22	plan::*,
23};
24
25#[derive(ThisError, Serialize, Debug, PartialEq)]
26pub enum SelectError {
27	#[error("aggregate groups not supported")]
28	GrouperMayNotContainAggregate,
29
30	#[error("an aggregate was probably used where not allowed")]
31	FinalSolveFailure,
32
33	#[error("HAVING does not yet support aggregates")]
34	UnimplementedAggregateHaving,
35
36	#[error("this should be impossible, please report")]
37	UnreachableFinalSolveFailure,
38	#[error("this should be impossible, please report")]
39	Unreachable,
40}
41
42impl Glue {
43	pub async fn select(&self, plan: Plan) -> Result<LabelsAndRows> {
44		let Plan {
45			joins,
46			select_items,
47			constraint,
48			group_constraint,
49			groups,
50			order_by,
51			labels,
52		} = plan;
53		let rows = stream::iter(joins)
54			.map(Ok)
55			.try_fold(vec![], |rows, join| async {
56				join.execute(self, rows).await
57			})
58			.await?;
59
60		let rows = order_by.execute(rows)?; // TODO: This should be done after filtering
61
62		let selected_rows =
63			rows.into_par_iter()
64				.filter_map(|row| match constraint.confirm_constraint(&row) {
65					Ok(true) => Some(
66						select_items
67							.clone()
68							.into_iter()
69							.map(|selection| selection.simplify_by_row(&row))
70							.collect::<Result<Vec<PlannedRecipe>>>()
71							.map(|selection| (selection, row)),
72					),
73					Ok(false) => None,
74					Err(error) => Some(Err(error)),
75				});
76		let do_group = !groups.is_empty()
77			|| select_items
78				.iter()
79				.any(|select_item| !select_item.aggregates.is_empty());
80
81		let final_rows = if do_group {
82			let groups = if groups.is_empty() {
83				vec![PlannedRecipe::TRUE]
84			} else {
85				groups
86			};
87
88			let accumulations: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)> =
89				selected_rows
90					.filter_map(|selection| {
91						let (selected_row, row) = try_option!(selection);
92						let group_constraint =
93							try_option!(group_constraint.clone().simplify_by_row(&row));
94						let group_constraint = match group_constraint.as_solution() {
95							Some(Value::Bool(true)) => None,
96							Some(Value::Bool(false)) => return None,
97							Some(_) => unreachable!(), // TODO: Handle
98							None => Some(group_constraint),
99						};
100						let groupers = try_option!(groups
101							.iter()
102							.map(|group| {
103								group.clone().simplify_by_row(&row)?.confirm_or_err(
104									SelectError::GrouperMayNotContainAggregate.into(),
105								)
106							})
107							.collect::<Result<Vec<Value>>>());
108						Some(Ok((groupers, group_constraint, selected_row)))
109					})
110					.map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc]))
111					.try_reduce_with(accumulate)
112					.unwrap_or(Ok(vec![]))?; // TODO: Improve
113
114			accumulations
115				.into_par_iter()
116				.map(|(_grouper, _group_constraint, vals)| {
117					vals.into_iter()
118						.map(|val| val.finalise_accumulation())
119						.collect::<Result<Vec<Value>>>()
120				})
121				.collect::<Result<Vec<Vec<Value>>>>()?
122		// TODO: Manage grouper and constraint
123		} else {
124			selected_rows
125				.map(|selection| {
126					selection.and_then(|(selection, _)| {
127						selection
128							.into_iter()
129							.map(|selected| selected.confirm())
130							.collect::<Result<Row>>()
131					})
132				})
133				.collect::<Result<Vec<Row>>>()?
134		};
135
136		Ok((labels, final_rows))
137	}
138	pub async fn select_query(
139		&self,
140		query: Select,
141		order_by: Vec<OrderByExpr>,
142	) -> Result<LabelsAndRows> {
143		let plan = Plan::new(self, query, order_by).await?;
144		self.select(plan).await
145	}
146}
147
148#[allow(clippy::type_complexity)] // TODO
149fn accumulate(
150	mut rows_l: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>,
151	rows_r: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>,
152) -> Result<Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>> {
153	rows_r.into_iter().try_for_each::<_, Result<_>>(|row_r| {
154		let (grouper, group_constraint, vals) = row_r;
155		let group_index = rows_l.iter().position(|(group, _, _)| group == &grouper);
156		let new_group = if let Some(group_index) = group_index {
157			let (group_grouper, group_group_constraint, group_vals) =
158				rows_l.swap_remove(group_index);
159			/*rows_l[group_index].1.map(|constraint| {
160				if let Some(group_constraint) = group_constraint {
161					constraint.accumulate(group_constraint).unwrap() // TODO: Handle
162				};
163			});*/
164			// TODO
165
166			let group_vals = group_vals
167				.into_iter()
168				.zip(vals.into_iter())
169				.map(|(mut col, val)| {
170					col.accumulate(val)?;
171					Ok(col)
172				})
173				.collect::<Result<_>>()?;
174			(group_grouper, group_group_constraint, group_vals)
175		} else {
176			(grouper, group_constraint, vals)
177		};
178		rows_l.push(new_group);
179		Ok(())
180	})?;
181
182	Ok(rows_l)
183}