use crate::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::decorrelate_where_exists::DecorrelateWhereExists;
use crate::decorrelate_where_in::DecorrelateWhereIn;
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::reduce_cross_join::ReduceCrossJoin;
use crate::reduce_outer_join::ReduceOuterJoin;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
use crate::simplify_expressions::SimplifyExpressions;
use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::subquery_filter_to_join::SubqueryFilterToJoin;
use crate::type_coercion::TypeCoercion;
use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
use chrono::{DateTime, Utc};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
use std::sync::Arc;
use std::time::Instant;
pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Debug)]
pub struct OptimizerConfig {
query_execution_start_time: DateTime<Utc>,
next_id: usize,
skip_failing_rules: bool,
filter_null_keys: bool,
max_passes: u8,
}
impl OptimizerConfig {
pub fn new() -> Self {
Self {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, skip_failing_rules: true,
filter_null_keys: true,
max_passes: 3,
}
}
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
self.filter_null_keys = filter_null_keys;
self
}
pub fn with_query_execution_start_time(
mut self,
query_execution_tart_time: DateTime<Utc>,
) -> Self {
self.query_execution_start_time = query_execution_tart_time;
self
}
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
self.skip_failing_rules = b;
self
}
pub fn with_max_passes(mut self, v: u8) -> Self {
self.max_passes = v;
self
}
pub fn next_id(&mut self) -> usize {
self.next_id += 1;
self.next_id
}
pub fn query_execution_start_time(&self) -> DateTime<Utc> {
self.query_execution_start_time
}
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
impl Optimizer {
pub fn new(config: &OptimizerConfig) -> Self {
let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(EliminateFilter::new()),
Arc::new(ReduceCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
];
if config.filter_null_keys {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
rules.push(Arc::new(FilterPushDown::new()));
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));
rules.push(Arc::new(SimplifyExpressions::new()));
rules.push(Arc::new(UnwrapCastInComparison::new()));
rules.push(Arc::new(CommonSubexprEliminate::new()));
rules.push(Arc::new(ProjectionPushDown::new()));
Self::with_rules(rules)
}
pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let start_time = Instant::now();
let mut plan_str = format!("{}", plan.display_indent());
let mut new_plan = plan.clone();
let mut i = 0;
while i < optimizer_config.max_passes {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);
for rule in &self.rules {
let result = rule.optimize(&new_plan, optimizer_config);
match result {
Ok(plan) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Err(ref e) => {
if optimizer_config.skip_failing_rules {
warn!(
"Skipping optimizer rule '{}' due to unexpected error: {}",
rule.name(),
e
);
} else {
return Err(DataFusionError::Internal(format!(
"Optimizer rule '{}' failed due to unexpected error: {}",
rule.name(),
e
)));
}
}
}
}
log_plan(&format!("Optimized plan (pass {})", i), &new_plan);
let new_plan_str = format!("{}", new_plan.display_indent());
if plan_str == new_plan_str {
debug!("optimizer pass {} did not make changes", i);
break;
}
plan_str = new_plan_str;
i += 1;
}
log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
trace!("{description}::\n{}\n", plan.display_indent_schema());
}
#[cfg(test)]
mod tests {
use crate::optimizer::Optimizer;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::LogicalPlan;
use std::sync::Arc;
#[test]
fn skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
opt.optimize(&plan, &mut config, &observe)?;
Ok(())
}
#[test]
fn no_skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let result = opt.optimize(&plan, &mut config, &observe);
assert_eq!(
"Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \
Error during planning: rule failed. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
format!("{}", result.err().unwrap())
);
Ok(())
}
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
struct BadRule {}
impl OptimizerRule for BadRule {
fn optimize(
&self,
_plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Err(DataFusionError::Plan("rule failed".to_string()))
}
fn name(&self) -> &str {
"bad rule"
}
}
}