use std::sync::Arc;
use datafusion_common::Result;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_common::config::{ConfigOptions, OptimizerOptions};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
use crate::PhysicalOptimizerRule;
use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use itertools::izip;
#[derive(Default, Debug)]
pub struct SanityCheckPlan {}
impl SanityCheckPlan {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for SanityCheckPlan {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
.data()
}
fn name(&self) -> &str {
"SanityCheckPlan"
}
fn schema_check(&self) -> bool {
true
}
}
pub fn check_finiteness_requirements(
input: Arc<dyn ExecutionPlan>,
optimizer_options: &OptimizerOptions,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(exec) = input.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
{
return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag");
}
}
if matches!(
input.boundedness(),
Boundedness::Unbounded {
requires_infinite_memory: true
}
) || (input.boundedness().is_unbounded()
&& input.pipeline_behavior() == EmissionType::Final)
{
plan_err!(
"Cannot execute pipeline breaking queries, operator: {:?}",
input
)
} else {
Ok(Transformed::no(input))
}
}
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().is_some_and(|filter| {
check_support(filter.expression(), &join.schema())
&& filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
})
}
pub fn check_plan_sanity(
plan: Arc<dyn ExecutionPlan>,
optimizer_options: &OptimizerOptions,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?;
for ((idx, child), sort_req, dist_req) in izip!(
plan.children().into_iter().enumerate(),
plan.required_input_ordering(),
plan.required_input_distribution(),
) {
let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
plan_str,
format_physical_sort_requirement_list(&sort_req),
idx,
child_eq_props.oeq_class()
);
}
}
if !child
.output_partitioning()
.satisfy(&dist_req, child_eq_props)
{
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
plan_str,
dist_req,
idx,
child.output_partitioning()
);
}
}
Ok(Transformed::no(plan))
}