datafusion_physical_optimizer/
sanity_checker.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [SanityCheckPlan] rule ensures that a given plan can
19//! accommodate its infinite sources, if there are any. It will reject
20//! non-runnable query plans that use pipeline-breaking operators on
21//! infinite input(s). In addition, it will check if all order and
22//! distribution requirements of a plan are satisfied by its children.
23
24use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41/// The SanityCheckPlan rule rejects the following query plans:
42/// 1. Invalid plans containing nodes whose order and/or distribution requirements
43///    are not satisfied by their children.
44/// 2. Plans that use pipeline-breaking operators on infinite input(s),
45///    it is impossible to execute such queries (they will never generate output nor finish)
46#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50    #[expect(missing_docs)]
51    pub fn new() -> Self {
52        Self {}
53    }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57    fn optimize(
58        &self,
59        plan: Arc<dyn ExecutionPlan>,
60        config: &ConfigOptions,
61    ) -> Result<Arc<dyn ExecutionPlan>> {
62        plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
63            .data()
64    }
65
66    fn name(&self) -> &str {
67        "SanityCheckPlan"
68    }
69
70    fn schema_check(&self) -> bool {
71        true
72    }
73}
74
75/// This function propagates finiteness information and rejects any plan with
76/// pipeline-breaking operators acting on infinite inputs.
77pub fn check_finiteness_requirements(
78    input: Arc<dyn ExecutionPlan>,
79    optimizer_options: &OptimizerOptions,
80) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
81    if let Some(exec) = input.as_any().downcast_ref::<SymmetricHashJoinExec>()
82        && !(optimizer_options.allow_symmetric_joins_without_pruning
83            || (exec.check_if_order_information_available()? && is_prunable(exec)))
84    {
85        return plan_err!(
86            "Join operation cannot operate on a non-prunable stream without enabling \
87                              the 'allow_symmetric_joins_without_pruning' configuration flag"
88        );
89    }
90
91    if matches!(
92        input.boundedness(),
93        Boundedness::Unbounded {
94            requires_infinite_memory: true
95        }
96    ) || (input.boundedness().is_unbounded()
97        && input.pipeline_behavior() == EmissionType::Final)
98    {
99        plan_err!(
100            "Cannot execute pipeline breaking queries, operator: {:?}",
101            input
102        )
103    } else {
104        Ok(Transformed::no(input))
105    }
106}
107
108/// This function returns whether a given symmetric hash join is amenable to
109/// data pruning. For this to be possible, it needs to have a filter where
110/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
111/// interval calculations.
112///
113/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
114/// [`Operator`]: datafusion_expr::Operator
115fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
116    join.filter().is_some_and(|filter| {
117        check_support(filter.expression(), &join.schema())
118            && filter
119                .schema()
120                .fields()
121                .iter()
122                .all(|f| is_datatype_supported(f.data_type()))
123    })
124}
125
126/// Ensures that the plan is pipeline friendly and the order and
127/// distribution requirements from its children are satisfied.
128pub fn check_plan_sanity(
129    plan: Arc<dyn ExecutionPlan>,
130    optimizer_options: &OptimizerOptions,
131) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
132    check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?;
133
134    for ((idx, child), sort_req, dist_req) in izip!(
135        plan.children().into_iter().enumerate(),
136        plan.required_input_ordering(),
137        plan.required_input_distribution(),
138    ) {
139        let child_eq_props = child.equivalence_properties();
140        if let Some(sort_req) = sort_req {
141            let sort_req = sort_req.into_single();
142            if !child_eq_props.ordering_satisfy_requirement(sort_req.clone())? {
143                let plan_str = get_plan_string(&plan);
144                return plan_err!(
145                    "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
146                    plan_str,
147                    format_physical_sort_requirement_list(&sort_req),
148                    idx,
149                    child_eq_props.oeq_class()
150                );
151            }
152        }
153
154        if !child
155            .output_partitioning()
156            .satisfaction(&dist_req, child_eq_props, true)
157            .is_satisfied()
158        {
159            let plan_str = get_plan_string(&plan);
160            return plan_err!(
161                "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
162                plan_str,
163                dist_req,
164                idx,
165                child.output_partitioning()
166            );
167        }
168    }
169
170    Ok(Transformed::no(plan))
171}
172
173// See tests in datafusion/core/tests/physical_optimizer