datafusion_physical_optimizer/
sanity_checker.rs1use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50 #[expect(missing_docs)]
51 pub fn new() -> Self {
52 Self {}
53 }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57 fn optimize(
58 &self,
59 plan: Arc<dyn ExecutionPlan>,
60 config: &ConfigOptions,
61 ) -> Result<Arc<dyn ExecutionPlan>> {
62 check_plan_sanity_recursive(&plan, &config.optimizer)?;
63 Ok(plan)
64 }
65
66 fn name(&self) -> &str {
67 "SanityCheckPlan"
68 }
69
70 fn schema_check(&self) -> bool {
71 true
72 }
73}
74
75#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
77fn check_plan_sanity_recursive(
78 plan: &Arc<dyn ExecutionPlan>,
79 optimizer_options: &OptimizerOptions,
80) -> Result<TreeNodeRecursion> {
81 plan.apply_children(|child| check_plan_sanity_recursive(child, optimizer_options))?;
82 check_plan_sanity(plan, optimizer_options)?;
83 Ok(TreeNodeRecursion::Continue)
84}
85
86pub fn check_finiteness_requirements(
89 input: &dyn ExecutionPlan,
90 optimizer_options: &OptimizerOptions,
91) -> Result<()> {
92 if let Some(exec) = input.downcast_ref::<SymmetricHashJoinExec>()
93 && !(optimizer_options.allow_symmetric_joins_without_pruning
94 || (exec.check_if_order_information_available()? && is_prunable(exec)))
95 {
96 return plan_err!(
97 "Join operation cannot operate on a non-prunable stream without enabling \
98 the 'allow_symmetric_joins_without_pruning' configuration flag"
99 );
100 }
101
102 if matches!(
103 input.boundedness(),
104 Boundedness::Unbounded {
105 requires_infinite_memory: true
106 }
107 ) || (input.boundedness().is_unbounded()
108 && input.pipeline_behavior() == EmissionType::Final)
109 {
110 plan_err!(
111 "Cannot execute pipeline breaking queries, operator: {:?}",
112 input
113 )
114 } else {
115 Ok(())
116 }
117}
118
119fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
127 join.filter().is_some_and(|filter| {
128 check_support(filter.expression(), &join.schema())
129 && filter
130 .schema()
131 .fields()
132 .iter()
133 .all(|f| is_datatype_supported(f.data_type()))
134 })
135}
136
137pub fn check_plan_sanity(
140 plan: &Arc<dyn ExecutionPlan>,
141 optimizer_options: &OptimizerOptions,
142) -> Result<()> {
143 check_finiteness_requirements(plan.as_ref(), optimizer_options)?;
144
145 for ((idx, child), sort_req, dist_req) in izip!(
146 plan.children().into_iter().enumerate(),
147 plan.required_input_ordering(),
148 plan.required_input_distribution(),
149 ) {
150 let child_eq_props = child.equivalence_properties();
151 if let Some(sort_req) = sort_req {
152 let sort_req = sort_req.into_single();
153 if !child_eq_props.ordering_satisfy_requirement(sort_req.clone())? {
154 let plan_str = get_plan_string(plan);
155 return plan_err!(
156 "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
157 plan_str,
158 format_physical_sort_requirement_list(&sort_req),
159 idx,
160 child_eq_props.oeq_class()
161 );
162 }
163 }
164
165 if !child
166 .output_partitioning()
167 .satisfaction(&dist_req, child_eq_props, true)
168 .is_satisfied()
169 {
170 let plan_str = get_plan_string(plan);
171 return plan_err!(
172 "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
173 plan_str,
174 dist_req,
175 idx,
176 child.output_partitioning()
177 );
178 }
179 }
180
181 Ok(())
182}
183
184