quill_sql/plan/logical_planner/
plan_set_expr.rs

1use crate::catalog::{Column, Schema};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::expression::{columnize_expr, Alias, ColumnExpr, Expr, ExprTrait};
4use crate::plan::logical_plan::{
5    build_join_schema, project_schema, EmptyRelation, Filter, Join, LogicalPlan, Project,
6    TableScan, Values,
7};
8use crate::plan::logical_plan::{Aggregate, JoinType};
9use crate::plan::LogicalPlanner;
10use std::sync::Arc;
11use std::vec;
12
13impl LogicalPlanner<'_> {
14    pub fn plan_set_expr(&self, set_expr: &sqlparser::ast::SetExpr) -> QuillSQLResult<LogicalPlan> {
15        match set_expr {
16            sqlparser::ast::SetExpr::Select(select) => self.plan_select(select),
17            sqlparser::ast::SetExpr::Values(values) => self.plan_values(values),
18            _ => Err(QuillSQLError::Plan(format!(
19                "Failed to plan set expr: {}",
20                set_expr
21            ))),
22        }
23    }
24
25    pub fn plan_select(&self, select: &sqlparser::ast::Select) -> QuillSQLResult<LogicalPlan> {
26        let table_scan = self.plan_from_tables(&select.from)?;
27        let selection = self.plan_selection(table_scan, &select.selection)?;
28        let aggregate = self.plan_aggregate(selection, &select.projection, &select.group_by)?;
29        self.plan_project(aggregate, &select.projection)
30    }
31
32    pub fn plan_aggregate(
33        &self,
34        input: LogicalPlan,
35        project: &Vec<sqlparser::ast::SelectItem>,
36        group_by: &[sqlparser::ast::Expr],
37    ) -> QuillSQLResult<LogicalPlan> {
38        let mut exprs = vec![];
39        for select_item in project {
40            exprs.extend(self.bind_select_item(&input, select_item)?);
41        }
42
43        let aggr_exprs = exprs
44            .iter()
45            .filter(|e| matches!(e, Expr::AggregateFunction(_)))
46            .cloned()
47            .collect::<Vec<Expr>>();
48        let group_exprs = group_by
49            .iter()
50            .map(|e| self.bind_expr(e))
51            .collect::<QuillSQLResult<Vec<Expr>>>()?;
52
53        if aggr_exprs.is_empty() && group_exprs.is_empty() {
54            Ok(input)
55        } else {
56            let mut columns = aggr_exprs
57                .iter()
58                .map(|e| e.to_column(input.schema()))
59                .collect::<QuillSQLResult<Vec<Column>>>()?;
60            columns.extend(
61                group_exprs
62                    .iter()
63                    .map(|e| e.to_column(input.schema()))
64                    .collect::<QuillSQLResult<Vec<Column>>>()?,
65            );
66            Ok(LogicalPlan::Aggregate(Aggregate {
67                input: Arc::new(input),
68                group_exprs,
69                aggr_exprs,
70                schema: Arc::new(Schema::new(columns)),
71            }))
72        }
73    }
74
75    pub fn plan_project(
76        &self,
77        input: LogicalPlan,
78        project: &Vec<sqlparser::ast::SelectItem>,
79    ) -> QuillSQLResult<LogicalPlan> {
80        let mut exprs = vec![];
81        for select_item in project {
82            exprs.extend(self.bind_select_item(&input, select_item)?);
83        }
84        let columnized_exprs = exprs
85            .into_iter()
86            .map(|e| {
87                if let Ok(new_expr) = columnize_expr(&e, input.schema()) {
88                    new_expr
89                } else {
90                    e
91                }
92            })
93            .collect::<Vec<Expr>>();
94
95        let schema = Arc::new(project_schema(&input, &columnized_exprs)?);
96        Ok(LogicalPlan::Project(Project {
97            exprs: columnized_exprs,
98            input: Arc::new(input),
99            schema,
100        }))
101    }
102
103    pub fn bind_select_item(
104        &self,
105        input: &LogicalPlan,
106        item: &sqlparser::ast::SelectItem,
107    ) -> QuillSQLResult<Vec<Expr>> {
108        match item {
109            sqlparser::ast::SelectItem::UnnamedExpr(expr) => Ok(vec![self.bind_expr(expr)?]),
110            sqlparser::ast::SelectItem::ExprWithAlias { expr, alias } => {
111                Ok(vec![Expr::Alias(Alias {
112                    name: alias.value.clone(),
113                    expr: Box::new(self.bind_expr(expr)?),
114                })])
115            }
116            sqlparser::ast::SelectItem::Wildcard(_) => {
117                let all_columns = input
118                    .schema()
119                    .columns
120                    .iter()
121                    .map(|col| {
122                        Expr::Column(ColumnExpr {
123                            relation: col.relation.clone(),
124                            name: col.name.clone(),
125                        })
126                    })
127                    .collect::<Vec<Expr>>();
128                Ok(all_columns)
129            }
130            _ => Err(QuillSQLError::Plan(format!(
131                "sqlparser select item {} not supported",
132                item
133            ))),
134        }
135    }
136
137    pub fn plan_selection(
138        &self,
139        input: LogicalPlan,
140        selection: &Option<sqlparser::ast::Expr>,
141    ) -> QuillSQLResult<LogicalPlan> {
142        match selection {
143            None => Ok(input),
144            Some(predicate) => {
145                let predicate = self.bind_expr(predicate)?;
146                Ok(LogicalPlan::Filter(Filter {
147                    input: Arc::new(input),
148                    predicate,
149                }))
150            }
151        }
152    }
153
154    pub fn plan_from_tables(
155        &self,
156        from: &Vec<sqlparser::ast::TableWithJoins>,
157    ) -> QuillSQLResult<LogicalPlan> {
158        match from.len() {
159            0 => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
160                produce_one_row: true,
161                schema: Arc::new(Schema::empty()),
162            })),
163            1 => self.plan_table_with_joins(&from[0]),
164            _ => {
165                let mut left = self.plan_table_with_joins(&from[0])?;
166                for t in from.iter().skip(1) {
167                    let right = self.plan_table_with_joins(t)?;
168                    left = self.plan_cross_join(left, right)?;
169                }
170                Ok(left)
171            }
172        }
173    }
174
175    pub fn plan_table_with_joins(
176        &self,
177        t: &sqlparser::ast::TableWithJoins,
178    ) -> QuillSQLResult<LogicalPlan> {
179        let mut left = self.plan_relation(&t.relation)?;
180        match t.joins.len() {
181            0 => Ok(left),
182            _ => {
183                for join in t.joins.iter() {
184                    left = self.plan_relation_join(left, join)?;
185                }
186                Ok(left)
187            }
188        }
189    }
190
191    pub fn plan_relation_join(
192        &self,
193        left: LogicalPlan,
194        join: &sqlparser::ast::Join,
195    ) -> QuillSQLResult<LogicalPlan> {
196        let right = self.plan_relation(&join.relation)?;
197        match &join.join_operator {
198            sqlparser::ast::JoinOperator::Inner(constraint) => {
199                self.plan_join(left, right, constraint, JoinType::Inner)
200            }
201            sqlparser::ast::JoinOperator::LeftOuter(constraint) => {
202                self.plan_join(left, right, constraint, JoinType::Inner)
203            }
204            sqlparser::ast::JoinOperator::RightOuter(constraint) => {
205                self.plan_join(left, right, constraint, JoinType::Inner)
206            }
207            sqlparser::ast::JoinOperator::FullOuter(constraint) => {
208                self.plan_join(left, right, constraint, JoinType::Inner)
209            }
210            sqlparser::ast::JoinOperator::CrossJoin => self.plan_cross_join(left, right),
211            _ => Err(QuillSQLError::Plan(format!(
212                "sqlparser join operator {:?} not supported",
213                join.join_operator
214            ))),
215        }
216    }
217
218    pub fn plan_join(
219        &self,
220        left: LogicalPlan,
221        right: LogicalPlan,
222        constraint: &sqlparser::ast::JoinConstraint,
223        join_type: JoinType,
224    ) -> QuillSQLResult<LogicalPlan> {
225        match constraint {
226            sqlparser::ast::JoinConstraint::On(expr) => {
227                let expr = self.bind_expr(expr)?;
228                let schema = Arc::new(build_join_schema(left.schema(), right.schema(), join_type)?);
229                Ok(LogicalPlan::Join(Join {
230                    left: Arc::new(left),
231                    right: Arc::new(right),
232                    join_type,
233                    condition: Some(expr),
234                    schema,
235                }))
236            }
237            _ => Err(QuillSQLError::Plan(format!(
238                "Only support join on constraint, {:?}",
239                constraint
240            ))),
241        }
242    }
243
244    pub fn plan_cross_join(
245        &self,
246        left: LogicalPlan,
247        right: LogicalPlan,
248    ) -> QuillSQLResult<LogicalPlan> {
249        let schema = Arc::new(build_join_schema(
250            left.schema(),
251            right.schema(),
252            JoinType::Cross,
253        )?);
254        Ok(LogicalPlan::Join(Join {
255            left: Arc::new(left),
256            right: Arc::new(right),
257            join_type: JoinType::Cross,
258            condition: None,
259            schema,
260        }))
261    }
262
263    pub fn plan_relation(
264        &self,
265        relation: &sqlparser::ast::TableFactor,
266    ) -> QuillSQLResult<LogicalPlan> {
267        match relation {
268            sqlparser::ast::TableFactor::Table { name, .. } => {
269                // TODO handle alias
270                let table_ref = self.bind_table_name(name)?;
271                let schema = self.context.catalog.table_heap(&table_ref)?.schema.clone();
272                // Optional hint to force streaming: env QUILL_STREAM_HINT=1
273                let hint = std::env::var("QUILL_STREAM_HINT")
274                    .ok()
275                    .and_then(|v| match v.as_str() {
276                        "1" | "true" | "on" => Some(true),
277                        "0" | "false" | "off" => Some(false),
278                        _ => None,
279                    });
280                Ok(LogicalPlan::TableScan(TableScan {
281                    table_ref,
282                    table_schema: schema,
283                    filters: vec![],
284                    limit: None,
285                    streaming_hint: hint,
286                }))
287            }
288            sqlparser::ast::TableFactor::NestedJoin {
289                table_with_joins,
290                alias: _,
291            } => {
292                // TODO handle alias
293                self.plan_table_with_joins(table_with_joins)
294            }
295            sqlparser::ast::TableFactor::Derived { subquery, .. } => self.plan_query(subquery),
296            _ => Err(QuillSQLError::Plan(format!(
297                "sqlparser relation {} not supported",
298                relation
299            ))),
300        }
301    }
302
303    pub fn plan_values(&self, values: &sqlparser::ast::Values) -> QuillSQLResult<LogicalPlan> {
304        let mut result = vec![];
305        for row in values.rows.iter() {
306            let mut record = vec![];
307            for item in row {
308                record.push(self.bind_expr(item)?);
309            }
310            result.push(record);
311        }
312
313        // schema will be replaced later
314        Ok(LogicalPlan::Values(Values {
315            schema: Arc::new(Schema::empty()),
316            values: result,
317        }))
318    }
319}