use chrono::{DateTime, Utc};
use datafusion_common::Result;
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
use std::sync::Arc;
pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Debug)]
pub struct OptimizerConfig {
pub query_execution_start_time: DateTime<Utc>,
next_id: usize,
skip_failing_rules: bool,
}
impl OptimizerConfig {
pub fn new() -> Self {
Self {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, skip_failing_rules: true,
}
}
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
self.skip_failing_rules = b;
self
}
pub fn next_id(&mut self) -> usize {
self.next_id += 1;
self.next_id
}
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
impl Optimizer {
pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
let result = rule.optimize(&new_plan, optimizer_config);
match result {
Ok(plan) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
debug!("After apply {} rule:\n", rule.name());
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
}
Err(ref e) => {
if optimizer_config.skip_failing_rules {
warn!(
"Skipping optimizer rule {} due to unexpected error: {}",
rule.name(),
e
);
} else {
return result;
}
}
}
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
}
}
#[cfg(test)]
mod tests {
use crate::optimizer::Optimizer;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::LogicalPlan;
use std::sync::Arc;
#[test]
fn skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
opt.optimize(&plan, &mut config, &observe)?;
Ok(())
}
#[test]
fn no_skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let result = opt.optimize(&plan, &mut config, &observe);
assert_eq!(
"Error during planning: rule failed",
format!("{}", result.err().unwrap())
);
Ok(())
}
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
struct BadRule {}
impl OptimizerRule for BadRule {
fn optimize(
&self,
_plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Err(DataFusionError::Plan("rule failed".to_string()))
}
fn name(&self) -> &str {
"bad rule"
}
}
}