datafusion_physical_optimizer/
output_requirements.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The GlobalOrderRequire optimizer rule either:
19//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global
20//!   ordering and distribution requirement across rules, or
21//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan.
22//!   Since the `OutputRequirementExec` operator is only a helper operator, it
23//!   shouldn't occur in the final plan (i.e. the executed plan).
24
25use std::sync::Arc;
26
27use crate::PhysicalOptimizerRule;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{Result, Statistics};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
34use datafusion_physical_plan::projection::{
35    make_with_child, update_expr, ProjectionExec,
36};
37use datafusion_physical_plan::sorts::sort::SortExec;
38use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
39use datafusion_physical_plan::{
40    DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
41};
42use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
43
44/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
45/// plan according to its `mode` attribute, which is set by the constructors
46/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
47/// the global requirements (ordering and distribution) across rules.
48///
49/// The primary use case of this node and rule is to specify and preserve the desired output
50/// ordering and distribution the entire plan. When sending to a single client, a single partition may
51/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be
52/// better.
53#[derive(Debug)]
54pub struct OutputRequirements {
55    mode: RuleMode,
56}
57
58impl OutputRequirements {
59    /// Create a new rule which works in `Add` mode; i.e. it simply adds a
60    /// top-level [`OutputRequirementExec`] into the physical plan to keep track
61    /// of global ordering and distribution requirements if there are any.
62    /// Note that this rule should run at the beginning.
63    pub fn new_add_mode() -> Self {
64        Self {
65            mode: RuleMode::Add,
66        }
67    }
68
69    /// Create a new rule which works in `Remove` mode; i.e. it simply removes
70    /// the top-level [`OutputRequirementExec`] from the physical plan if there is
71    /// any. We do this because a `OutputRequirementExec` is an ancillary,
72    /// non-executable operator whose sole purpose is to track global
73    /// requirements during optimization. Therefore, a
74    /// `OutputRequirementExec` should not appear in the final plan.
75    pub fn new_remove_mode() -> Self {
76        Self {
77            mode: RuleMode::Remove,
78        }
79    }
80}
81
82#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)]
83enum RuleMode {
84    Add,
85    Remove,
86}
87
88/// An ancillary, non-executable operator whose sole purpose is to track global
89/// requirements during optimization. It imposes
90/// - the ordering requirement in its `order_requirement` attribute.
91/// - the distribution requirement in its `dist_requirement` attribute.
92///
93/// See [`OutputRequirements`] for more details
94#[derive(Debug)]
95pub struct OutputRequirementExec {
96    input: Arc<dyn ExecutionPlan>,
97    order_requirement: Option<LexRequirement>,
98    dist_requirement: Distribution,
99    cache: PlanProperties,
100}
101
102impl OutputRequirementExec {
103    pub fn new(
104        input: Arc<dyn ExecutionPlan>,
105        requirements: Option<LexRequirement>,
106        dist_requirement: Distribution,
107    ) -> Self {
108        let cache = Self::compute_properties(&input);
109        Self {
110            input,
111            order_requirement: requirements,
112            dist_requirement,
113            cache,
114        }
115    }
116
117    pub fn input(&self) -> Arc<dyn ExecutionPlan> {
118        Arc::clone(&self.input)
119    }
120
121    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
122    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
123        PlanProperties::new(
124            input.equivalence_properties().clone(), // Equivalence Properties
125            input.output_partitioning().clone(),    // Output Partitioning
126            input.pipeline_behavior(),              // Pipeline Behavior
127            input.boundedness(),                    // Boundedness
128        )
129    }
130}
131
132impl DisplayAs for OutputRequirementExec {
133    fn fmt_as(
134        &self,
135        _t: DisplayFormatType,
136        f: &mut std::fmt::Formatter,
137    ) -> std::fmt::Result {
138        write!(f, "OutputRequirementExec")
139    }
140}
141
142impl ExecutionPlan for OutputRequirementExec {
143    fn name(&self) -> &'static str {
144        "OutputRequirementExec"
145    }
146
147    fn as_any(&self) -> &dyn std::any::Any {
148        self
149    }
150
151    fn properties(&self) -> &PlanProperties {
152        &self.cache
153    }
154
155    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
156        vec![false]
157    }
158
159    fn required_input_distribution(&self) -> Vec<Distribution> {
160        vec![self.dist_requirement.clone()]
161    }
162
163    fn maintains_input_order(&self) -> Vec<bool> {
164        vec![true]
165    }
166
167    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
168        vec![&self.input]
169    }
170
171    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
172        vec![self.order_requirement.clone()]
173    }
174
175    fn with_new_children(
176        self: Arc<Self>,
177        mut children: Vec<Arc<dyn ExecutionPlan>>,
178    ) -> Result<Arc<dyn ExecutionPlan>> {
179        Ok(Arc::new(Self::new(
180            children.remove(0), // has a single child
181            self.order_requirement.clone(),
182            self.dist_requirement.clone(),
183        )))
184    }
185
186    fn execute(
187        &self,
188        _partition: usize,
189        _context: Arc<TaskContext>,
190    ) -> Result<SendableRecordBatchStream> {
191        unreachable!();
192    }
193
194    fn statistics(&self) -> Result<Statistics> {
195        self.input.statistics()
196    }
197
198    fn try_swapping_with_projection(
199        &self,
200        projection: &ProjectionExec,
201    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
202        // If the projection does not narrow the schema, we should not try to push it down:
203        if projection.expr().len() >= projection.input().schema().fields().len() {
204            return Ok(None);
205        }
206
207        let mut updated_sort_reqs = LexRequirement::new(vec![]);
208        // None or empty_vec can be treated in the same way.
209        if let Some(reqs) = &self.required_input_ordering()[0] {
210            for req in &reqs.inner {
211                let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)?
212                else {
213                    return Ok(None);
214                };
215                updated_sort_reqs.push(PhysicalSortRequirement {
216                    expr: new_expr,
217                    options: req.options,
218                });
219            }
220        }
221
222        let dist_req = match &self.required_input_distribution()[0] {
223            Distribution::HashPartitioned(exprs) => {
224                let mut updated_exprs = vec![];
225                for expr in exprs {
226                    let Some(new_expr) = update_expr(expr, projection.expr(), false)?
227                    else {
228                        return Ok(None);
229                    };
230                    updated_exprs.push(new_expr);
231                }
232                Distribution::HashPartitioned(updated_exprs)
233            }
234            dist => dist.clone(),
235        };
236
237        make_with_child(projection, &self.input())
238            .map(|input| {
239                OutputRequirementExec::new(
240                    input,
241                    (!updated_sort_reqs.is_empty()).then_some(updated_sort_reqs),
242                    dist_req,
243                )
244            })
245            .map(|e| Some(Arc::new(e) as _))
246    }
247}
248
249impl PhysicalOptimizerRule for OutputRequirements {
250    fn optimize(
251        &self,
252        plan: Arc<dyn ExecutionPlan>,
253        _config: &ConfigOptions,
254    ) -> Result<Arc<dyn ExecutionPlan>> {
255        match self.mode {
256            RuleMode::Add => require_top_ordering(plan),
257            RuleMode::Remove => plan
258                .transform_up(|plan| {
259                    if let Some(sort_req) =
260                        plan.as_any().downcast_ref::<OutputRequirementExec>()
261                    {
262                        Ok(Transformed::yes(sort_req.input()))
263                    } else {
264                        Ok(Transformed::no(plan))
265                    }
266                })
267                .data(),
268        }
269    }
270
271    fn name(&self) -> &str {
272        "OutputRequirements"
273    }
274
275    fn schema_check(&self) -> bool {
276        true
277    }
278}
279
280/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that
281/// global requirements are not lost during optimization.
282fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
283    let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
284    if is_changed {
285        Ok(new_plan)
286    } else {
287        // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement.
288        Ok(Arc::new(OutputRequirementExec::new(
289            new_plan,
290            // there is no ordering requirement
291            None,
292            Distribution::UnspecifiedDistribution,
293        )) as _)
294    }
295}
296
297/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan.
298/// First entry in the tuple is resulting plan, second entry indicates whether any
299/// `OutputRequirementExec` is added to the plan.
300fn require_top_ordering_helper(
301    plan: Arc<dyn ExecutionPlan>,
302) -> Result<(Arc<dyn ExecutionPlan>, bool)> {
303    let mut children = plan.children();
304    // Global ordering defines desired ordering in the final result.
305    if children.len() != 1 {
306        Ok((plan, false))
307    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
308        // In case of constant columns, output ordering of SortExec would give an empty set.
309        // Therefore; we check the sort expression field of the SortExec to assign the requirements.
310        let req_ordering = sort_exec.expr();
311        let req_dist = sort_exec.required_input_distribution()[0].clone();
312        let reqs = LexRequirement::from(req_ordering.clone());
313        Ok((
314            Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
315            true,
316        ))
317    } else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
318        let reqs = LexRequirement::from(spm.expr().clone());
319        Ok((
320            Arc::new(OutputRequirementExec::new(
321                plan,
322                Some(reqs),
323                Distribution::SinglePartition,
324            )) as _,
325            true,
326        ))
327    } else if plan.maintains_input_order()[0]
328        && plan.required_input_ordering()[0].is_none()
329    {
330        // Keep searching for a `SortExec` as long as ordering is maintained,
331        // and on-the-way operators do not themselves require an ordering.
332        // When an operator requires an ordering, any `SortExec` below can not
333        // be responsible for (i.e. the originator of) the global ordering.
334        let (new_child, is_changed) =
335            require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
336        Ok((plan.with_new_children(vec![new_child])?, is_changed))
337    } else {
338        // Stop searching, there is no global ordering desired for the query.
339        Ok((plan, false))
340    }
341}
342
343// See tests in datafusion/core/tests/physical_optimizer