Skip to main content

datafusion_physical_optimizer/
sanity_checker.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [SanityCheckPlan] rule ensures that a given plan can
19//! accommodate its infinite sources, if there are any. It will reject
20//! non-runnable query plans that use pipeline-breaking operators on
21//! infinite input(s). In addition, it will check if all order and
22//! distribution requirements of a plan are satisfied by its children.
23
24use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41/// The SanityCheckPlan rule rejects the following query plans:
42/// 1. Invalid plans containing nodes whose order and/or distribution requirements
43///    are not satisfied by their children.
44/// 2. Plans that use pipeline-breaking operators on infinite input(s),
45///    it is impossible to execute such queries (they will never generate output nor finish)
46#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50    #[expect(missing_docs)]
51    pub fn new() -> Self {
52        Self {}
53    }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57    fn optimize(
58        &self,
59        plan: Arc<dyn ExecutionPlan>,
60        config: &ConfigOptions,
61    ) -> Result<Arc<dyn ExecutionPlan>> {
62        check_plan_sanity_recursive(&plan, &config.optimizer)?;
63        Ok(plan)
64    }
65
66    fn name(&self) -> &str {
67        "SanityCheckPlan"
68    }
69
70    fn schema_check(&self) -> bool {
71        true
72    }
73}
74
75/// Bottom-up (post-order) read-only traversal that checks plan sanity.
76#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
77fn check_plan_sanity_recursive(
78    plan: &Arc<dyn ExecutionPlan>,
79    optimizer_options: &OptimizerOptions,
80) -> Result<TreeNodeRecursion> {
81    plan.apply_children(|child| check_plan_sanity_recursive(child, optimizer_options))?;
82    check_plan_sanity(plan, optimizer_options)?;
83    Ok(TreeNodeRecursion::Continue)
84}
85
86/// This function propagates finiteness information and rejects any plan with
87/// pipeline-breaking operators acting on infinite inputs.
88pub fn check_finiteness_requirements(
89    input: &dyn ExecutionPlan,
90    optimizer_options: &OptimizerOptions,
91) -> Result<()> {
92    if let Some(exec) = input.downcast_ref::<SymmetricHashJoinExec>()
93        && !(optimizer_options.allow_symmetric_joins_without_pruning
94            || (exec.check_if_order_information_available()? && is_prunable(exec)))
95    {
96        return plan_err!(
97            "Join operation cannot operate on a non-prunable stream without enabling \
98                              the 'allow_symmetric_joins_without_pruning' configuration flag"
99        );
100    }
101
102    if matches!(
103        input.boundedness(),
104        Boundedness::Unbounded {
105            requires_infinite_memory: true
106        }
107    ) || (input.boundedness().is_unbounded()
108        && input.pipeline_behavior() == EmissionType::Final)
109    {
110        plan_err!(
111            "Cannot execute pipeline breaking queries, operator: {:?}",
112            input
113        )
114    } else {
115        Ok(())
116    }
117}
118
119/// This function returns whether a given symmetric hash join is amenable to
120/// data pruning. For this to be possible, it needs to have a filter where
121/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
122/// interval calculations.
123///
124/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
125/// [`Operator`]: datafusion_expr::Operator
126fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
127    join.filter().is_some_and(|filter| {
128        check_support(filter.expression(), &join.schema())
129            && filter
130                .schema()
131                .fields()
132                .iter()
133                .all(|f| is_datatype_supported(f.data_type()))
134    })
135}
136
137/// Ensures that the plan is pipeline friendly and the order and
138/// distribution requirements from its children are satisfied.
139pub fn check_plan_sanity(
140    plan: &Arc<dyn ExecutionPlan>,
141    optimizer_options: &OptimizerOptions,
142) -> Result<()> {
143    check_finiteness_requirements(plan.as_ref(), optimizer_options)?;
144
145    for ((idx, child), sort_req, dist_req) in izip!(
146        plan.children().into_iter().enumerate(),
147        plan.required_input_ordering(),
148        plan.required_input_distribution(),
149    ) {
150        let child_eq_props = child.equivalence_properties();
151        if let Some(sort_req) = sort_req {
152            let sort_req = sort_req.into_single();
153            if !child_eq_props.ordering_satisfy_requirement(sort_req.clone())? {
154                let plan_str = get_plan_string(plan);
155                return plan_err!(
156                    "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
157                    plan_str,
158                    format_physical_sort_requirement_list(&sort_req),
159                    idx,
160                    child_eq_props.oeq_class()
161                );
162            }
163        }
164
165        if !child
166            .output_partitioning()
167            .satisfaction(&dist_req, child_eq_props, true)
168            .is_satisfied()
169        {
170            let plan_str = get_plan_string(plan);
171            return plan_err!(
172                "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
173                plan_str,
174                dist_req,
175                idx,
176                child.output_partitioning()
177            );
178        }
179    }
180
181    Ok(())
182}
183
184// See tests in datafusion/core/tests/physical_optimizer