1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use {
	super::QueryError,
	crate::{
		executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, Payload,
		RecipeUtilities, Value,
	},
	async_recursion::async_recursion,
	sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement},
};

impl Glue {
	#[async_recursion(?Send)]
	pub async fn from_body(
		&mut self,
		body: SetExpr,
		order_by: Vec<OrderByExpr>,
	) -> Result<LabelsAndRows> {
		match body {
			SetExpr::Select(query) => {
				let (labels, rows) = self.select_query(*query, order_by).await?;
				Ok((labels, rows))
			}
			SetExpr::Values(values) => {
				if !order_by.is_empty() {
					warning!("VALUES does not currently support ordering");
				}
				let values = values.0;
				values
					.into_iter()
					.map(|values_row| {
						values_row
							.into_iter()
							.map(|cell| {
								MetaRecipe::new(cell)?
									.simplify_by_context(&*self.get_context()?)?
									.confirm_or_err(QueryError::MissingComponentsForValues.into())
							})
							.collect::<Result<Vec<Value>>>()
					})
					.collect::<Result<Vec<Vec<Value>>>>()
					.map(|values| {
						(
							(0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0))
								.map(|index| format!("unnamed_{}", index))
								.collect(),
							values,
						)
					})
			}
			SetExpr::SetOperation {
				op,
				all,
				left,
				right,
			} => {
				use SetOperator::*;
				if !order_by.is_empty() {
					warning!(
					"set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering"
				);
				}
				let (left_labels, left) = self.from_body(*left, vec![]).await?;
				let (right_labels, right) = self.from_body(*right, vec![]).await?;
				if left_labels.len() != right_labels.len() {
					return Err(QueryError::OperationColumnsMisaligned.into());
				}
				let mut rows = match op {
					Union => [left, right].concat(),
					Except => left
						.into_iter()
						.filter(|row| !right.contains(row))
						.collect(),
					Intersect => left.into_iter().filter(|row| right.contains(row)).collect(),
				};
				if !all {
					rows.dedup();
				}
				Ok((left_labels, rows))
			}
			SetExpr::Insert(Statement::Insert {
				table_name,
				columns,
				source,
				..
			}) => {
				let inserted = self.insert(&table_name, &columns, &source, true).await?;
				if let Payload::Select { labels, rows } = inserted {
					Ok((labels, rows.into_iter().map(|row| row.0).collect()))
				} else {
					unreachable!(); // TODO: Handle
				}
			}
			_ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries
		}
	}
}