use std::fmt::Debug;
use std::sync::Arc;
use log::debug;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{InvariantLevel, LogicalPlan};
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;
pub mod function_rewrite;
pub mod resolve_grouping_function;
pub mod type_coercion;
pub mod subquery {
#[deprecated(
since = "44.0.0",
note = "please use `datafusion_expr::check_subquery_expr` instead"
)]
pub use datafusion_expr::check_subquery_expr;
}
pub trait AnalyzerRule: Debug {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Clone, Debug)]
pub struct Analyzer {
pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}
impl Default for Analyzer {
fn default() -> Self {
Self::new()
}
}
impl Analyzer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(ResolveGroupingFunction::new()),
Arc::new(TypeCoercion::new()),
];
Self::with_rules(rules)
}
pub fn with_rules(rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>) -> Self {
Self {
function_rewrites: vec![],
rules,
}
}
pub fn add_function_rewrite(
&mut self,
rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
) {
self.function_rewrites.push(rewrite);
}
pub fn function_rewrites(&self) -> &[Arc<dyn FunctionRewrite + Send + Sync>] {
&self.function_rewrites
}
pub fn execute_and_check<F>(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
plan.check_invariants(InvariantLevel::Always)
.map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;
let start_time = Instant::now();
let mut new_plan = plan;
let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
if self.function_rewrites.is_empty() {
None
} else {
Some(Arc::new(ApplyFunctionRewrites::new(
self.function_rewrites.clone(),
)))
};
let rules = expr_to_function.iter().chain(self.rules.iter());
for rule in rules {
new_plan = rule
.analyze(new_plan, config)
.map_err(|e| e.context(rule.name()))?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}