multisql/executor/query/
set_expr.rs1use {
2 super::QueryError,
3 crate::{
4 macros::warning,
5 recipe::{MetaRecipe, RecipeUtilities},
6 result::Result,
7 types::LabelsAndRows,
8 Error, Glue, Payload, Value,
9 },
10 async_recursion::async_recursion,
11 sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement},
12};
13
14impl Glue {
15 #[async_recursion(?Send)]
16 pub async fn from_body(
17 &mut self,
18 body: SetExpr,
19 order_by: Vec<OrderByExpr>,
20 ) -> Result<LabelsAndRows> {
21 match body {
22 SetExpr::Select(query) => {
23 let (labels, rows) = self.select_query(*query, order_by).await?;
24 Ok((labels, rows))
25 }
26 SetExpr::Values(values) => {
27 if !order_by.is_empty() {
28 warning!("VALUES does not currently support ordering");
29 }
30 let values = values.0;
31 values
32 .into_iter()
33 .map(|values_row| {
34 values_row
35 .into_iter()
36 .map(|cell| {
37 MetaRecipe::new(cell)?
38 .simplify_by_tempdb(&self.tempdb)?
39 .confirm_or_err(QueryError::MissingComponentsForValues.into())
40 })
41 .collect::<Result<Vec<Value>>>()
42 })
43 .collect::<Result<Vec<Vec<Value>>>>()
44 .map(|values| {
45 (
46 (0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0))
47 .map(|index| format!("unnamed_{}", index))
48 .collect(),
49 values,
50 )
51 })
52 }
53 SetExpr::SetOperation {
54 op,
55 all,
56 left,
57 right,
58 } => {
59 use SetOperator::*;
60 if !order_by.is_empty() {
61 warning!(
62 "set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering"
63 );
64 }
65 let (left_labels, left) = self.from_body(*left, vec![]).await?;
66 let (right_labels, right) = self.from_body(*right, vec![]).await?;
67 if left_labels.len() != right_labels.len() {
68 return Err(QueryError::OperationColumnsMisaligned.into());
69 }
70 let mut rows = match op {
71 Union => [left, right].concat(),
72 Except => left
73 .into_iter()
74 .filter(|row| !right.contains(row))
75 .collect(),
76 Intersect => left.into_iter().filter(|row| right.contains(row)).collect(),
77 };
78 if !all {
79 rows.dedup();
80 }
81 Ok((left_labels, rows))
82 }
83 SetExpr::Insert(Statement::Insert {
84 table_name,
85 columns,
86 source,
87 ..
88 }) => {
89 let inserted = self
90 .ast_insert(&table_name, &columns, &source, true)
91 .await?;
92 if let Payload::Select { labels, rows } = inserted {
93 Ok((labels, rows.into_iter().map(|row| row.0).collect()))
94 } else {
95 unreachable!(); }
97 }
98 _ => Err(Error::Query(QueryError::QueryNotSupported)),
99 }
100 }
101}