datafusion_physical_optimizer/
sanity_checker.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [SanityCheckPlan] rule ensures that a given plan can
19//! accommodate its infinite sources, if there are any. It will reject
20//! non-runnable query plans that use pipeline-breaking operators on
21//! infinite input(s). In addition, it will check if all order and
22//! distribution requirements of a plan are satisfied by its children.
23
24use std::sync::Arc;
25
26use datafusion_common::Result;
27use datafusion_physical_plan::ExecutionPlan;
28
29use datafusion_common::config::{ConfigOptions, OptimizerOptions};
30use datafusion_common::plan_err;
31use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
33use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
36
37use crate::PhysicalOptimizerRule;
38use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
39use itertools::izip;
40
41/// The SanityCheckPlan rule rejects the following query plans:
42/// 1. Invalid plans containing nodes whose order and/or distribution requirements
43///    are not satisfied by their children.
44/// 2. Plans that use pipeline-breaking operators on infinite input(s),
45///    it is impossible to execute such queries (they will never generate output nor finish)
46#[derive(Default, Debug)]
47pub struct SanityCheckPlan {}
48
49impl SanityCheckPlan {
50    #[allow(missing_docs)]
51    pub fn new() -> Self {
52        Self {}
53    }
54}
55
56impl PhysicalOptimizerRule for SanityCheckPlan {
57    fn optimize(
58        &self,
59        plan: Arc<dyn ExecutionPlan>,
60        config: &ConfigOptions,
61    ) -> Result<Arc<dyn ExecutionPlan>> {
62        plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
63            .data()
64    }
65
66    fn name(&self) -> &str {
67        "SanityCheckPlan"
68    }
69
70    fn schema_check(&self) -> bool {
71        true
72    }
73}
74
75/// This function propagates finiteness information and rejects any plan with
76/// pipeline-breaking operators acting on infinite inputs.
77pub fn check_finiteness_requirements(
78    input: Arc<dyn ExecutionPlan>,
79    optimizer_options: &OptimizerOptions,
80) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
81    if let Some(exec) = input.as_any().downcast_ref::<SymmetricHashJoinExec>() {
82        if !(optimizer_options.allow_symmetric_joins_without_pruning
83            || (exec.check_if_order_information_available()? && is_prunable(exec)))
84        {
85            return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \
86                              the 'allow_symmetric_joins_without_pruning' configuration flag");
87        }
88    }
89
90    if matches!(
91        input.boundedness(),
92        Boundedness::Unbounded {
93            requires_infinite_memory: true
94        }
95    ) || (input.boundedness().is_unbounded()
96        && input.pipeline_behavior() == EmissionType::Final)
97    {
98        plan_err!(
99            "Cannot execute pipeline breaking queries, operator: {:?}",
100            input
101        )
102    } else {
103        Ok(Transformed::no(input))
104    }
105}
106
107/// This function returns whether a given symmetric hash join is amenable to
108/// data pruning. For this to be possible, it needs to have a filter where
109/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
110/// interval calculations.
111///
112/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
113/// [`Operator`]: datafusion_expr::Operator
114fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
115    join.filter().is_some_and(|filter| {
116        check_support(filter.expression(), &join.schema())
117            && filter
118                .schema()
119                .fields()
120                .iter()
121                .all(|f| is_datatype_supported(f.data_type()))
122    })
123}
124
125/// Ensures that the plan is pipeline friendly and the order and
126/// distribution requirements from its children are satisfied.
127pub fn check_plan_sanity(
128    plan: Arc<dyn ExecutionPlan>,
129    optimizer_options: &OptimizerOptions,
130) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
131    check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?;
132
133    for ((idx, child), sort_req, dist_req) in izip!(
134        plan.children().into_iter().enumerate(),
135        plan.required_input_ordering(),
136        plan.required_input_distribution(),
137    ) {
138        let child_eq_props = child.equivalence_properties();
139        if let Some(sort_req) = sort_req {
140            if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
141                let plan_str = get_plan_string(&plan);
142                return plan_err!(
143                    "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
144                    plan_str,
145                    format_physical_sort_requirement_list(&sort_req),
146                    idx,
147                    child_eq_props.oeq_class()
148                );
149            }
150        }
151
152        if !child
153            .output_partitioning()
154            .satisfy(&dist_req, child_eq_props)
155        {
156            let plan_str = get_plan_string(&plan);
157            return plan_err!(
158                "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
159                plan_str,
160                dist_req,
161                idx,
162                child.output_partitioning()
163            );
164        }
165    }
166
167    Ok(Transformed::no(plan))
168}
169
170// See tests in datafusion/core/tests/physical_optimizer