pub mod count_wildcard_rule;
pub mod inline_table_scan;
pub mod subquery;
pub mod type_coercion;
use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::utils::inspect_expr_pre;
use datafusion_expr::{Expr, LogicalPlan};
use log::debug;
use std::sync::Arc;
use std::time::Instant;
pub trait AnalyzerRule {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct Analyzer {
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}
impl Default for Analyzer {
fn default() -> Self {
Self::new()
}
}
impl Analyzer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
];
Self::with_rules(rules)
}
pub fn with_rules(rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>) -> Self {
Self { rules }
}
pub fn execute_and_check<F>(
&self,
plan: &LogicalPlan,
config: &ConfigOptions,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
let start_time = Instant::now();
let mut new_plan = plan.clone();
for rule in &self.rules {
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
fn check_plan(plan: &LogicalPlan) -> Result<()> {
plan.apply(&mut |plan: &LogicalPlan| {
for expr in plan.expressions().iter() {
inspect_expr_pre(expr, |expr| match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)
}
_ => Ok(()),
})?;
}
Ok(VisitRecursion::Continue)
})?;
Ok(())
}