multisql/executor/query/select/
mod.rs1pub mod join;
2mod manual;
3mod order;
4mod plan;
5
6use {
7 crate::{
8 macros::try_option,
9 recipe::{PlannedRecipe, RecipeUtilities},
10 types::{LabelsAndRows, Row},
11 Glue, Result, Value,
12 },
13 futures::stream::{self, StreamExt, TryStreamExt},
14 rayon::prelude::*,
15 serde::Serialize,
16 sqlparser::ast::{OrderByExpr, Select},
17 thiserror::Error as ThisError,
18};
19pub use {
20 manual::{Manual, ManualError, SelectItem},
21 order::Order,
22 plan::*,
23};
24
25#[derive(ThisError, Serialize, Debug, PartialEq)]
26pub enum SelectError {
27 #[error("aggregate groups not supported")]
28 GrouperMayNotContainAggregate,
29
30 #[error("an aggregate was probably used where not allowed")]
31 FinalSolveFailure,
32
33 #[error("HAVING does not yet support aggregates")]
34 UnimplementedAggregateHaving,
35
36 #[error("this should be impossible, please report")]
37 UnreachableFinalSolveFailure,
38 #[error("this should be impossible, please report")]
39 Unreachable,
40}
41
42impl Glue {
43 pub async fn select(&self, plan: Plan) -> Result<LabelsAndRows> {
44 let Plan {
45 joins,
46 select_items,
47 constraint,
48 group_constraint,
49 groups,
50 order_by,
51 labels,
52 } = plan;
53 let rows = stream::iter(joins)
54 .map(Ok)
55 .try_fold(vec![], |rows, join| async {
56 join.execute(self, rows).await
57 })
58 .await?;
59
60 let rows = order_by.execute(rows)?; let selected_rows =
63 rows.into_par_iter()
64 .filter_map(|row| match constraint.confirm_constraint(&row) {
65 Ok(true) => Some(
66 select_items
67 .clone()
68 .into_iter()
69 .map(|selection| selection.simplify_by_row(&row))
70 .collect::<Result<Vec<PlannedRecipe>>>()
71 .map(|selection| (selection, row)),
72 ),
73 Ok(false) => None,
74 Err(error) => Some(Err(error)),
75 });
76 let do_group = !groups.is_empty()
77 || select_items
78 .iter()
79 .any(|select_item| !select_item.aggregates.is_empty());
80
81 let final_rows = if do_group {
82 let groups = if groups.is_empty() {
83 vec![PlannedRecipe::TRUE]
84 } else {
85 groups
86 };
87
88 let accumulations: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)> =
89 selected_rows
90 .filter_map(|selection| {
91 let (selected_row, row) = try_option!(selection);
92 let group_constraint =
93 try_option!(group_constraint.clone().simplify_by_row(&row));
94 let group_constraint = match group_constraint.as_solution() {
95 Some(Value::Bool(true)) => None,
96 Some(Value::Bool(false)) => return None,
97 Some(_) => unreachable!(), None => Some(group_constraint),
99 };
100 let groupers = try_option!(groups
101 .iter()
102 .map(|group| {
103 group.clone().simplify_by_row(&row)?.confirm_or_err(
104 SelectError::GrouperMayNotContainAggregate.into(),
105 )
106 })
107 .collect::<Result<Vec<Value>>>());
108 Some(Ok((groupers, group_constraint, selected_row)))
109 })
110 .map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc]))
111 .try_reduce_with(accumulate)
112 .unwrap_or(Ok(vec![]))?; accumulations
115 .into_par_iter()
116 .map(|(_grouper, _group_constraint, vals)| {
117 vals.into_iter()
118 .map(|val| val.finalise_accumulation())
119 .collect::<Result<Vec<Value>>>()
120 })
121 .collect::<Result<Vec<Vec<Value>>>>()?
122 } else {
124 selected_rows
125 .map(|selection| {
126 selection.and_then(|(selection, _)| {
127 selection
128 .into_iter()
129 .map(|selected| selected.confirm())
130 .collect::<Result<Row>>()
131 })
132 })
133 .collect::<Result<Vec<Row>>>()?
134 };
135
136 Ok((labels, final_rows))
137 }
138 pub async fn select_query(
139 &self,
140 query: Select,
141 order_by: Vec<OrderByExpr>,
142 ) -> Result<LabelsAndRows> {
143 let plan = Plan::new(self, query, order_by).await?;
144 self.select(plan).await
145 }
146}
147
148#[allow(clippy::type_complexity)] fn accumulate(
150 mut rows_l: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>,
151 rows_r: Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>,
152) -> Result<Vec<(Vec<Value>, Option<PlannedRecipe>, Vec<PlannedRecipe>)>> {
153 rows_r.into_iter().try_for_each::<_, Result<_>>(|row_r| {
154 let (grouper, group_constraint, vals) = row_r;
155 let group_index = rows_l.iter().position(|(group, _, _)| group == &grouper);
156 let new_group = if let Some(group_index) = group_index {
157 let (group_grouper, group_group_constraint, group_vals) =
158 rows_l.swap_remove(group_index);
159 let group_vals = group_vals
167 .into_iter()
168 .zip(vals.into_iter())
169 .map(|(mut col, val)| {
170 col.accumulate(val)?;
171 Ok(col)
172 })
173 .collect::<Result<_>>()?;
174 (group_grouper, group_group_constraint, group_vals)
175 } else {
176 (grouper, group_constraint, vals)
177 };
178 rows_l.push(new_group);
179 Ok(())
180 })?;
181
182 Ok(rows_l)
183}