multisql/executor/query/
set_expr.rs

1use {
2	super::QueryError,
3	crate::{
4		macros::warning,
5		recipe::{MetaRecipe, RecipeUtilities},
6		result::Result,
7		types::LabelsAndRows,
8		Error, Glue, Payload, Value,
9	},
10	async_recursion::async_recursion,
11	sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement},
12};
13
14impl Glue {
15	#[async_recursion(?Send)]
16	pub async fn from_body(
17		&mut self,
18		body: SetExpr,
19		order_by: Vec<OrderByExpr>,
20	) -> Result<LabelsAndRows> {
21		match body {
22			SetExpr::Select(query) => {
23				let (labels, rows) = self.select_query(*query, order_by).await?;
24				Ok((labels, rows))
25			}
26			SetExpr::Values(values) => {
27				if !order_by.is_empty() {
28					warning!("VALUES does not currently support ordering");
29				}
30				let values = values.0;
31				values
32					.into_iter()
33					.map(|values_row| {
34						values_row
35							.into_iter()
36							.map(|cell| {
37								MetaRecipe::new(cell)?
38									.simplify_by_tempdb(&self.tempdb)?
39									.confirm_or_err(QueryError::MissingComponentsForValues.into())
40							})
41							.collect::<Result<Vec<Value>>>()
42					})
43					.collect::<Result<Vec<Vec<Value>>>>()
44					.map(|values| {
45						(
46							(0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0))
47								.map(|index| format!("unnamed_{}", index))
48								.collect(),
49							values,
50						)
51					})
52			}
53			SetExpr::SetOperation {
54				op,
55				all,
56				left,
57				right,
58			} => {
59				use SetOperator::*;
60				if !order_by.is_empty() {
61					warning!(
62					"set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering"
63				);
64				}
65				let (left_labels, left) = self.from_body(*left, vec![]).await?;
66				let (right_labels, right) = self.from_body(*right, vec![]).await?;
67				if left_labels.len() != right_labels.len() {
68					return Err(QueryError::OperationColumnsMisaligned.into());
69				}
70				let mut rows = match op {
71					Union => [left, right].concat(),
72					Except => left
73						.into_iter()
74						.filter(|row| !right.contains(row))
75						.collect(),
76					Intersect => left.into_iter().filter(|row| right.contains(row)).collect(),
77				};
78				if !all {
79					rows.dedup();
80				}
81				Ok((left_labels, rows))
82			}
83			SetExpr::Insert(Statement::Insert {
84				table_name,
85				columns,
86				source,
87				..
88			}) => {
89				let inserted = self
90					.ast_insert(&table_name, &columns, &source, true)
91					.await?;
92				if let Payload::Select { labels, rows } = inserted {
93					Ok((labels, rows.into_iter().map(|row| row.0).collect()))
94				} else {
95					unreachable!(); // TODO: Handle
96				}
97			}
98			_ => Err(Error::Query(QueryError::QueryNotSupported)),
99		}
100	}
101}