datafusion_physical_optimizer/
sanity_checker.rs1use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50 #[expect(missing_docs)]
51 pub fn new() -> Self {
52 Self {}
53 }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57 fn optimize(
58 &self,
59 plan: Arc<dyn ExecutionPlan>,
60 config: &ConfigOptions,
61 ) -> Result<Arc<dyn ExecutionPlan>> {
62 plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
63 .data()
64 }
65
66 fn name(&self) -> &str {
67 "SanityCheckPlan"
68 }
69
70 fn schema_check(&self) -> bool {
71 true
72 }
73}
74
75pub fn check_finiteness_requirements(
78 input: Arc<dyn ExecutionPlan>,
79 optimizer_options: &OptimizerOptions,
80) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
81 if let Some(exec) = input.as_any().downcast_ref::<SymmetricHashJoinExec>()
82 && !(optimizer_options.allow_symmetric_joins_without_pruning
83 || (exec.check_if_order_information_available()? && is_prunable(exec)))
84 {
85 return plan_err!(
86 "Join operation cannot operate on a non-prunable stream without enabling \
87 the 'allow_symmetric_joins_without_pruning' configuration flag"
88 );
89 }
90
91 if matches!(
92 input.boundedness(),
93 Boundedness::Unbounded {
94 requires_infinite_memory: true
95 }
96 ) || (input.boundedness().is_unbounded()
97 && input.pipeline_behavior() == EmissionType::Final)
98 {
99 plan_err!(
100 "Cannot execute pipeline breaking queries, operator: {:?}",
101 input
102 )
103 } else {
104 Ok(Transformed::no(input))
105 }
106}
107
108fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
116 join.filter().is_some_and(|filter| {
117 check_support(filter.expression(), &join.schema())
118 && filter
119 .schema()
120 .fields()
121 .iter()
122 .all(|f| is_datatype_supported(f.data_type()))
123 })
124}
125
126pub fn check_plan_sanity(
129 plan: Arc<dyn ExecutionPlan>,
130 optimizer_options: &OptimizerOptions,
131) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
132 check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?;
133
134 for ((idx, child), sort_req, dist_req) in izip!(
135 plan.children().into_iter().enumerate(),
136 plan.required_input_ordering(),
137 plan.required_input_distribution(),
138 ) {
139 let child_eq_props = child.equivalence_properties();
140 if let Some(sort_req) = sort_req {
141 let sort_req = sort_req.into_single();
142 if !child_eq_props.ordering_satisfy_requirement(sort_req.clone())? {
143 let plan_str = get_plan_string(&plan);
144 return plan_err!(
145 "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
146 plan_str,
147 format_physical_sort_requirement_list(&sort_req),
148 idx,
149 child_eq_props.oeq_class()
150 );
151 }
152 }
153
154 if !child
155 .output_partitioning()
156 .satisfaction(&dist_req, child_eq_props, true)
157 .is_satisfied()
158 {
159 let plan_str = get_plan_string(&plan);
160 return plan_err!(
161 "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
162 plan_str,
163 dist_req,
164 idx,
165 child.output_partitioning()
166 );
167 }
168 }
169
170 Ok(Transformed::no(plan))
171}
172
173