mod select_syntax_analyzer;
use crate::{
api::error::Result,
expr_resolver::{ExprLabel, ExprResolver},
pipeline::{
AggregateParameter, GroupByLabels, Pipeline, WindowOperationParameter, WindowParameter,
},
sql_processor::{
query_planner::select_syntax_analyzer::SelectSyntaxAnalyzer,
sql_parser::{GroupingElementSyntax, SelectStreamSyntax},
},
stream_engine::command::{
GroupAggregateWindowOp, JoinOp, LowerOps, ProjectionOp, QueryPlan, UpperOps,
},
};
#[derive(Debug)]
pub struct QueryPlanner {
analyzer: SelectSyntaxAnalyzer,
}
impl QueryPlanner {
pub fn new(select_stream_syntax: SelectStreamSyntax) -> Self {
Self {
analyzer: SelectSyntaxAnalyzer::new(select_stream_syntax),
}
}
pub fn plan(self, pipeline: &Pipeline) -> Result<QueryPlan> {
let (mut expr_resolver, labels_select_list) =
ExprResolver::new(self.analyzer.select_list().to_vec());
let projection = ProjectionOp {
expr_labels: labels_select_list,
};
let group_aggr_window =
self.create_group_aggr_window_op(&projection, &mut expr_resolver)?;
let upper_ops = UpperOps {
projection,
group_aggr_window,
};
let join = self.create_join_op(&mut expr_resolver, pipeline)?;
let lower_ops = LowerOps { join };
Ok(QueryPlan::new(upper_ops, lower_ops, expr_resolver))
}
fn create_group_aggr_window_op(
&self,
projection_op: &ProjectionOp,
expr_resolver: &mut ExprResolver,
) -> Result<Option<GroupAggregateWindowOp>> {
let window_param = self.create_window_param();
let group_aggr_param = self.create_group_aggr_param(expr_resolver, projection_op)?;
match (window_param, group_aggr_param) {
(Some(window_param), Some(group_aggr_param)) => Ok(Some(GroupAggregateWindowOp {
window_param,
op_param: WindowOperationParameter::Aggregate(group_aggr_param),
})),
_ => Ok(None),
}
}
fn create_window_param(&self) -> Option<WindowParameter> {
self.analyzer.window_parameter()
}
fn create_group_aggr_param(
&self,
expr_resolver: &mut ExprResolver,
projection_op: &ProjectionOp,
) -> Result<Option<AggregateParameter>> {
let grouping_elements = self.analyzer.grouping_elements();
let aggr_labels = projection_op
.expr_labels
.iter()
.filter_map(|label| {
if let ExprLabel::Aggr(aggr_label) = label {
Some(*aggr_label)
} else {
None
}
})
.collect::<Vec<_>>();
match aggr_labels.len() {
1 => {
let aggr_label = aggr_labels.get(0).expect("len checked");
let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func;
let group_by_labels = grouping_elements
.iter()
.map(|grouping_elem| match grouping_elem {
GroupingElementSyntax::ValueExpr(expr) => {
Ok(expr_resolver.register_value_expr(expr.clone()))
}
GroupingElementSyntax::ValueAlias(alias) => {
expr_resolver.resolve_value_alias(alias.clone())
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(AggregateParameter::new(
aggr_func,
*aggr_label,
GroupByLabels::new(group_by_labels),
)))
}
0 => Ok(None),
_ => unimplemented!("2 or more aggregate expressions"),
}
}
fn create_join_op(
&self,
expr_resolver: &mut ExprResolver,
pipeline: &Pipeline,
) -> Result<JoinOp> {
self.analyzer.join_op(expr_resolver, pipeline)
}
}