datafusion_physical_optimizer/
sanity_checker.rs1use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50 #[allow(missing_docs)]
51 pub fn new() -> Self {
52 Self {}
53 }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57 fn optimize(
58 &self,
59 plan: Arc<dyn ExecutionPlan>,
60 config: &ConfigOptions,
61 ) -> Result<Arc<dyn ExecutionPlan>> {
62 plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
63 .data()
64 }
65
66 fn name(&self) -> &str {
67 "SanityCheckPlan"
68 }
69
70 fn schema_check(&self) -> bool {
71 true
72 }
73}
74
75pub fn check_finiteness_requirements(
78 input: Arc<dyn ExecutionPlan>,
79 optimizer_options: &OptimizerOptions,
80) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
81 if let Some(exec) = input.as_any().downcast_ref::<SymmetricHashJoinExec>() {
82 if !(optimizer_options.allow_symmetric_joins_without_pruning
83 || (exec.check_if_order_information_available()? && is_prunable(exec)))
84 {
85 return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \
86 the 'allow_symmetric_joins_without_pruning' configuration flag");
87 }
88 }
89
90 if matches!(
91 input.boundedness(),
92 Boundedness::Unbounded {
93 requires_infinite_memory: true
94 }
95 ) || (input.boundedness().is_unbounded()
96 && input.pipeline_behavior() == EmissionType::Final)
97 {
98 plan_err!(
99 "Cannot execute pipeline breaking queries, operator: {:?}",
100 input
101 )
102 } else {
103 Ok(Transformed::no(input))
104 }
105}
106
107fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
115 join.filter().is_some_and(|filter| {
116 check_support(filter.expression(), &join.schema())
117 && filter
118 .schema()
119 .fields()
120 .iter()
121 .all(|f| is_datatype_supported(f.data_type()))
122 })
123}
124
125pub fn check_plan_sanity(
128 plan: Arc<dyn ExecutionPlan>,
129 optimizer_options: &OptimizerOptions,
130) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
131 check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?;
132
133 for ((idx, child), sort_req, dist_req) in izip!(
134 plan.children().into_iter().enumerate(),
135 plan.required_input_ordering(),
136 plan.required_input_distribution(),
137 ) {
138 let child_eq_props = child.equivalence_properties();
139 if let Some(sort_req) = sort_req {
140 if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
141 let plan_str = get_plan_string(&plan);
142 return plan_err!(
143 "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
144 plan_str,
145 format_physical_sort_requirement_list(&sort_req),
146 idx,
147 child_eq_props.oeq_class()
148 );
149 }
150 }
151
152 if !child
153 .output_partitioning()
154 .satisfy(&dist_req, child_eq_props)
155 {
156 let plan_str = get_plan_string(&plan);
157 return plan_err!(
158 "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
159 plan_str,
160 dist_req,
161 idx,
162 child.output_partitioning()
163 );
164 }
165 }
166
167 Ok(Transformed::no(plan))
168}
169
170