quill_sql/plan/logical_planner/
plan_set_expr.rs1use crate::catalog::{Column, Schema};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::expression::{columnize_expr, Alias, ColumnExpr, Expr, ExprTrait};
4use crate::plan::logical_plan::{
5 build_join_schema, project_schema, EmptyRelation, Filter, Join, LogicalPlan, Project,
6 TableScan, Values,
7};
8use crate::plan::logical_plan::{Aggregate, JoinType};
9use crate::plan::LogicalPlanner;
10use std::sync::Arc;
11use std::vec;
12
13impl LogicalPlanner<'_> {
14 pub fn plan_set_expr(&self, set_expr: &sqlparser::ast::SetExpr) -> QuillSQLResult<LogicalPlan> {
15 match set_expr {
16 sqlparser::ast::SetExpr::Select(select) => self.plan_select(select),
17 sqlparser::ast::SetExpr::Values(values) => self.plan_values(values),
18 _ => Err(QuillSQLError::Plan(format!(
19 "Failed to plan set expr: {}",
20 set_expr
21 ))),
22 }
23 }
24
25 pub fn plan_select(&self, select: &sqlparser::ast::Select) -> QuillSQLResult<LogicalPlan> {
26 let table_scan = self.plan_from_tables(&select.from)?;
27 let selection = self.plan_selection(table_scan, &select.selection)?;
28 let aggregate = self.plan_aggregate(selection, &select.projection, &select.group_by)?;
29 self.plan_project(aggregate, &select.projection)
30 }
31
32 pub fn plan_aggregate(
33 &self,
34 input: LogicalPlan,
35 project: &Vec<sqlparser::ast::SelectItem>,
36 group_by: &[sqlparser::ast::Expr],
37 ) -> QuillSQLResult<LogicalPlan> {
38 let mut exprs = vec![];
39 for select_item in project {
40 exprs.extend(self.bind_select_item(&input, select_item)?);
41 }
42
43 let aggr_exprs = exprs
44 .iter()
45 .filter(|e| matches!(e, Expr::AggregateFunction(_)))
46 .cloned()
47 .collect::<Vec<Expr>>();
48 let group_exprs = group_by
49 .iter()
50 .map(|e| self.bind_expr(e))
51 .collect::<QuillSQLResult<Vec<Expr>>>()?;
52
53 if aggr_exprs.is_empty() && group_exprs.is_empty() {
54 Ok(input)
55 } else {
56 let mut columns = aggr_exprs
57 .iter()
58 .map(|e| e.to_column(input.schema()))
59 .collect::<QuillSQLResult<Vec<Column>>>()?;
60 columns.extend(
61 group_exprs
62 .iter()
63 .map(|e| e.to_column(input.schema()))
64 .collect::<QuillSQLResult<Vec<Column>>>()?,
65 );
66 Ok(LogicalPlan::Aggregate(Aggregate {
67 input: Arc::new(input),
68 group_exprs,
69 aggr_exprs,
70 schema: Arc::new(Schema::new(columns)),
71 }))
72 }
73 }
74
75 pub fn plan_project(
76 &self,
77 input: LogicalPlan,
78 project: &Vec<sqlparser::ast::SelectItem>,
79 ) -> QuillSQLResult<LogicalPlan> {
80 let mut exprs = vec![];
81 for select_item in project {
82 exprs.extend(self.bind_select_item(&input, select_item)?);
83 }
84 let columnized_exprs = exprs
85 .into_iter()
86 .map(|e| {
87 if let Ok(new_expr) = columnize_expr(&e, input.schema()) {
88 new_expr
89 } else {
90 e
91 }
92 })
93 .collect::<Vec<Expr>>();
94
95 let schema = Arc::new(project_schema(&input, &columnized_exprs)?);
96 Ok(LogicalPlan::Project(Project {
97 exprs: columnized_exprs,
98 input: Arc::new(input),
99 schema,
100 }))
101 }
102
103 pub fn bind_select_item(
104 &self,
105 input: &LogicalPlan,
106 item: &sqlparser::ast::SelectItem,
107 ) -> QuillSQLResult<Vec<Expr>> {
108 match item {
109 sqlparser::ast::SelectItem::UnnamedExpr(expr) => Ok(vec![self.bind_expr(expr)?]),
110 sqlparser::ast::SelectItem::ExprWithAlias { expr, alias } => {
111 Ok(vec![Expr::Alias(Alias {
112 name: alias.value.clone(),
113 expr: Box::new(self.bind_expr(expr)?),
114 })])
115 }
116 sqlparser::ast::SelectItem::Wildcard(_) => {
117 let all_columns = input
118 .schema()
119 .columns
120 .iter()
121 .map(|col| {
122 Expr::Column(ColumnExpr {
123 relation: col.relation.clone(),
124 name: col.name.clone(),
125 })
126 })
127 .collect::<Vec<Expr>>();
128 Ok(all_columns)
129 }
130 _ => Err(QuillSQLError::Plan(format!(
131 "sqlparser select item {} not supported",
132 item
133 ))),
134 }
135 }
136
137 pub fn plan_selection(
138 &self,
139 input: LogicalPlan,
140 selection: &Option<sqlparser::ast::Expr>,
141 ) -> QuillSQLResult<LogicalPlan> {
142 match selection {
143 None => Ok(input),
144 Some(predicate) => {
145 let predicate = self.bind_expr(predicate)?;
146 Ok(LogicalPlan::Filter(Filter {
147 input: Arc::new(input),
148 predicate,
149 }))
150 }
151 }
152 }
153
154 pub fn plan_from_tables(
155 &self,
156 from: &Vec<sqlparser::ast::TableWithJoins>,
157 ) -> QuillSQLResult<LogicalPlan> {
158 match from.len() {
159 0 => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
160 produce_one_row: true,
161 schema: Arc::new(Schema::empty()),
162 })),
163 1 => self.plan_table_with_joins(&from[0]),
164 _ => {
165 let mut left = self.plan_table_with_joins(&from[0])?;
166 for t in from.iter().skip(1) {
167 let right = self.plan_table_with_joins(t)?;
168 left = self.plan_cross_join(left, right)?;
169 }
170 Ok(left)
171 }
172 }
173 }
174
175 pub fn plan_table_with_joins(
176 &self,
177 t: &sqlparser::ast::TableWithJoins,
178 ) -> QuillSQLResult<LogicalPlan> {
179 let mut left = self.plan_relation(&t.relation)?;
180 match t.joins.len() {
181 0 => Ok(left),
182 _ => {
183 for join in t.joins.iter() {
184 left = self.plan_relation_join(left, join)?;
185 }
186 Ok(left)
187 }
188 }
189 }
190
191 pub fn plan_relation_join(
192 &self,
193 left: LogicalPlan,
194 join: &sqlparser::ast::Join,
195 ) -> QuillSQLResult<LogicalPlan> {
196 let right = self.plan_relation(&join.relation)?;
197 match &join.join_operator {
198 sqlparser::ast::JoinOperator::Inner(constraint) => {
199 self.plan_join(left, right, constraint, JoinType::Inner)
200 }
201 sqlparser::ast::JoinOperator::LeftOuter(constraint) => {
202 self.plan_join(left, right, constraint, JoinType::Inner)
203 }
204 sqlparser::ast::JoinOperator::RightOuter(constraint) => {
205 self.plan_join(left, right, constraint, JoinType::Inner)
206 }
207 sqlparser::ast::JoinOperator::FullOuter(constraint) => {
208 self.plan_join(left, right, constraint, JoinType::Inner)
209 }
210 sqlparser::ast::JoinOperator::CrossJoin => self.plan_cross_join(left, right),
211 _ => Err(QuillSQLError::Plan(format!(
212 "sqlparser join operator {:?} not supported",
213 join.join_operator
214 ))),
215 }
216 }
217
218 pub fn plan_join(
219 &self,
220 left: LogicalPlan,
221 right: LogicalPlan,
222 constraint: &sqlparser::ast::JoinConstraint,
223 join_type: JoinType,
224 ) -> QuillSQLResult<LogicalPlan> {
225 match constraint {
226 sqlparser::ast::JoinConstraint::On(expr) => {
227 let expr = self.bind_expr(expr)?;
228 let schema = Arc::new(build_join_schema(left.schema(), right.schema(), join_type)?);
229 Ok(LogicalPlan::Join(Join {
230 left: Arc::new(left),
231 right: Arc::new(right),
232 join_type,
233 condition: Some(expr),
234 schema,
235 }))
236 }
237 _ => Err(QuillSQLError::Plan(format!(
238 "Only support join on constraint, {:?}",
239 constraint
240 ))),
241 }
242 }
243
244 pub fn plan_cross_join(
245 &self,
246 left: LogicalPlan,
247 right: LogicalPlan,
248 ) -> QuillSQLResult<LogicalPlan> {
249 let schema = Arc::new(build_join_schema(
250 left.schema(),
251 right.schema(),
252 JoinType::Cross,
253 )?);
254 Ok(LogicalPlan::Join(Join {
255 left: Arc::new(left),
256 right: Arc::new(right),
257 join_type: JoinType::Cross,
258 condition: None,
259 schema,
260 }))
261 }
262
263 pub fn plan_relation(
264 &self,
265 relation: &sqlparser::ast::TableFactor,
266 ) -> QuillSQLResult<LogicalPlan> {
267 match relation {
268 sqlparser::ast::TableFactor::Table { name, .. } => {
269 let table_ref = self.bind_table_name(name)?;
271 let schema = self.context.catalog.table_heap(&table_ref)?.schema.clone();
272 let hint = std::env::var("QUILL_STREAM_HINT")
274 .ok()
275 .and_then(|v| match v.as_str() {
276 "1" | "true" | "on" => Some(true),
277 "0" | "false" | "off" => Some(false),
278 _ => None,
279 });
280 Ok(LogicalPlan::TableScan(TableScan {
281 table_ref,
282 table_schema: schema,
283 filters: vec![],
284 limit: None,
285 streaming_hint: hint,
286 }))
287 }
288 sqlparser::ast::TableFactor::NestedJoin {
289 table_with_joins,
290 alias: _,
291 } => {
292 self.plan_table_with_joins(table_with_joins)
294 }
295 sqlparser::ast::TableFactor::Derived { subquery, .. } => self.plan_query(subquery),
296 _ => Err(QuillSQLError::Plan(format!(
297 "sqlparser relation {} not supported",
298 relation
299 ))),
300 }
301 }
302
303 pub fn plan_values(&self, values: &sqlparser::ast::Values) -> QuillSQLResult<LogicalPlan> {
304 let mut result = vec![];
305 for row in values.rows.iter() {
306 let mut record = vec![];
307 for item in row {
308 record.push(self.bind_expr(item)?);
309 }
310 result.push(record);
311 }
312
313 Ok(LogicalPlan::Values(Values {
315 schema: Arc::new(Schema::empty()),
316 values: result,
317 }))
318 }
319}