1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use {
super::QueryError,
crate::{
executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, Payload,
RecipeUtilities, Value,
},
async_recursion::async_recursion,
sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement},
};
impl Glue {
#[async_recursion(?Send)]
pub async fn from_body(
&mut self,
body: SetExpr,
order_by: Vec<OrderByExpr>,
) -> Result<LabelsAndRows> {
match body {
SetExpr::Select(query) => {
let (labels, rows) = self.select_query(*query, order_by).await?;
Ok((labels, rows))
}
SetExpr::Values(values) => {
if !order_by.is_empty() {
warning!("VALUES does not currently support ordering");
}
let values = values.0;
values
.into_iter()
.map(|values_row| {
values_row
.into_iter()
.map(|cell| {
MetaRecipe::new(cell)?
.simplify_by_context(&*self.get_context()?)?
.confirm_or_err(QueryError::MissingComponentsForValues.into())
})
.collect::<Result<Vec<Value>>>()
})
.collect::<Result<Vec<Vec<Value>>>>()
.map(|values| {
(
(0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0))
.map(|index| format!("unnamed_{}", index))
.collect(),
values,
)
})
}
SetExpr::SetOperation {
op,
all,
left,
right,
} => {
use SetOperator::*;
if !order_by.is_empty() {
warning!(
"set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering"
);
}
let (left_labels, left) = self.from_body(*left, vec![]).await?;
let (right_labels, right) = self.from_body(*right, vec![]).await?;
if left_labels.len() != right_labels.len() {
return Err(QueryError::OperationColumnsMisaligned.into());
}
let mut rows = match op {
Union => [left, right].concat(),
Except => left
.into_iter()
.filter(|row| !right.contains(row))
.collect(),
Intersect => left.into_iter().filter(|row| right.contains(row)).collect(),
};
if !all {
rows.dedup();
}
Ok((left_labels, rows))
}
SetExpr::Insert(Statement::Insert {
table_name,
columns,
source,
..
}) => {
let inserted = self.insert(&table_name, &columns, &source, true).await?;
if let Payload::Select { labels, rows } = inserted {
Ok((labels, rows.into_iter().map(|row| row.0).collect()))
} else {
unreachable!();
}
}
_ => Err(QueryError::QueryNotSupported.into()),
}
}
}