use std::sync::Arc;
use log::debug;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, LogicalPlan};
use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;
pub mod count_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod subquery;
pub mod type_coercion;
pub trait AnalyzerRule {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct Analyzer {
pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}
impl Default for Analyzer {
fn default() -> Self {
Self::new()
}
}
impl Analyzer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
];
Self::with_rules(rules)
}
pub fn with_rules(rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>) -> Self {
Self {
function_rewrites: vec![],
rules,
}
}
pub fn add_function_rewrite(
&mut self,
rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
) {
self.function_rewrites.push(rewrite);
}
pub fn function_rewrites(&self) -> &[Arc<dyn FunctionRewrite + Send + Sync>] {
&self.function_rewrites
}
pub fn execute_and_check<F>(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
let start_time = Instant::now();
let mut new_plan = plan;
let expr_to_function: Arc<dyn AnalyzerRule + Send + Sync> =
Arc::new(ApplyFunctionRewrites::new(self.function_rewrites.clone()));
let rules = std::iter::once(&expr_to_function).chain(self.rules.iter());
for rule in rules {
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
fn check_plan(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)?;
}
_ => {}
};
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}