1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
mod get_columns;
mod get_data;
mod get_rows;
mod select;
mod set_expr;
pub use select::{join::*, ManualError, Plan, PlanError, SelectError};
use {
crate::{
executor::types::LabelsAndRows, result::Result, Cast, Glue, MetaRecipe, RecipeUtilities,
Value,
},
async_recursion::async_recursion,
serde::Serialize,
sqlparser::ast::{Cte, Query, TableAlias, With},
thiserror::Error as ThisError,
};
const ENSURE_SIZE: bool = true;
#[derive(ThisError, Serialize, Debug, PartialEq)]
pub enum QueryError {
#[error("query not supported")]
QueryNotSupported,
#[error("values does not support columns, aggregates or subqueries")]
MissingComponentsForValues,
#[error("limit does not support columns, aggregates or subqueries")]
MissingComponentsForLimit,
#[error("offset does not support columns, aggregates or subqueries")]
MissingComponentsForOffset,
#[error("expected values but found none")]
NoValues,
#[error(
"UNION/EXCEPT/INTERSECT columns misaligned, sides should have an equal number of columns"
)]
OperationColumnsMisaligned,
}
impl Glue {
#[async_recursion(?Send)]
pub async fn query(&mut self, query: Query) -> Result<LabelsAndRows> {
let Query {
body,
order_by,
limit,
offset,
with,
fetch: _,
lock: _,
} = query;
let limit: Option<usize> = limit
.map(|expression| {
MetaRecipe::new(expression)?
.simplify_by_context(&*self.get_context()?)?
.confirm_or_err(QueryError::MissingComponentsForLimit.into())?
.cast()
})
.transpose()?;
let offset: Option<usize> = offset
.map(|offset| {
MetaRecipe::new(offset.value)?
.simplify_by_context(&*self.get_context()?)?
.confirm_or_err(QueryError::MissingComponentsForOffset.into())?
.cast()
})
.transpose()?;
if let Some(with) = with {
let With {
recursive: _,
cte_tables,
} = with;
for cte in cte_tables.into_iter() {
let Cte {
alias,
query,
from: _,
} = cte;
let TableAlias {
name,
columns: _,
} = alias;
let name = name.value;
let data = self.query(query).await?;
self.get_mut_context()?.set_table(name, data);
}
}
let (mut labels, mut rows) = self.from_body(body, order_by).await?;
if let Some(offset) = offset {
rows.drain(0..offset);
}
if let Some(limit) = limit {
rows.truncate(limit);
}
if ENSURE_SIZE {
let row_width = rows
.iter()
.map(|values_row| values_row.len())
.max()
.unwrap_or(0);
if row_width > 0 {
rows = rows
.into_iter()
.map(|mut row| {
row.resize(row_width, Value::Null);
row
})
.collect();
labels.resize(row_width, String::new())
};
}
Ok((labels, rows))
}
}