datafusion_physical_optimizer/
output_requirements.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The GlobalOrderRequire optimizer rule either:
19//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global
20//!   ordering and distribution requirement across rules, or
21//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan.
22//!   Since the `OutputRequirementExec` operator is only a helper operator, it
23//!   shouldn't occur in the final plan (i.e. the executed plan).
24
25use std::sync::Arc;
26
27use crate::PhysicalOptimizerRule;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{Result, Statistics};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
34use datafusion_physical_plan::projection::{
35    make_with_child, update_expr, ProjectionExec,
36};
37use datafusion_physical_plan::sorts::sort::SortExec;
38use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
39use datafusion_physical_plan::{
40    DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
41};
42use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
43
44/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
45/// plan according to its `mode` attribute, which is set by the constructors
46/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
47/// the global requirements (ordering and distribution) across rules.
48///
49/// The primary use case of this node and rule is to specify and preserve the desired output
50/// ordering and distribution the entire plan. When sending to a single client, a single partition may
51/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be
52/// better.
53#[derive(Debug)]
54pub struct OutputRequirements {
55    mode: RuleMode,
56}
57
58impl OutputRequirements {
59    /// Create a new rule which works in `Add` mode; i.e. it simply adds a
60    /// top-level [`OutputRequirementExec`] into the physical plan to keep track
61    /// of global ordering and distribution requirements if there are any.
62    /// Note that this rule should run at the beginning.
63    pub fn new_add_mode() -> Self {
64        Self {
65            mode: RuleMode::Add,
66        }
67    }
68
69    /// Create a new rule which works in `Remove` mode; i.e. it simply removes
70    /// the top-level [`OutputRequirementExec`] from the physical plan if there is
71    /// any. We do this because a `OutputRequirementExec` is an ancillary,
72    /// non-executable operator whose sole purpose is to track global
73    /// requirements during optimization. Therefore, a
74    /// `OutputRequirementExec` should not appear in the final plan.
75    pub fn new_remove_mode() -> Self {
76        Self {
77            mode: RuleMode::Remove,
78        }
79    }
80}
81
82#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)]
83enum RuleMode {
84    Add,
85    Remove,
86}
87
88/// An ancillary, non-executable operator whose sole purpose is to track global
89/// requirements during optimization. It imposes
90/// - the ordering requirement in its `order_requirement` attribute.
91/// - the distribution requirement in its `dist_requirement` attribute.
92///
93/// See [`OutputRequirements`] for more details
94#[derive(Debug)]
95pub struct OutputRequirementExec {
96    input: Arc<dyn ExecutionPlan>,
97    order_requirement: Option<LexRequirement>,
98    dist_requirement: Distribution,
99    cache: PlanProperties,
100}
101
102impl OutputRequirementExec {
103    pub fn new(
104        input: Arc<dyn ExecutionPlan>,
105        requirements: Option<LexRequirement>,
106        dist_requirement: Distribution,
107    ) -> Self {
108        let cache = Self::compute_properties(&input);
109        Self {
110            input,
111            order_requirement: requirements,
112            dist_requirement,
113            cache,
114        }
115    }
116
117    pub fn input(&self) -> Arc<dyn ExecutionPlan> {
118        Arc::clone(&self.input)
119    }
120
121    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
122    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
123        PlanProperties::new(
124            input.equivalence_properties().clone(), // Equivalence Properties
125            input.output_partitioning().clone(),    // Output Partitioning
126            input.pipeline_behavior(),              // Pipeline Behavior
127            input.boundedness(),                    // Boundedness
128        )
129    }
130}
131
132impl DisplayAs for OutputRequirementExec {
133    fn fmt_as(
134        &self,
135        t: DisplayFormatType,
136        f: &mut std::fmt::Formatter,
137    ) -> std::fmt::Result {
138        match t {
139            DisplayFormatType::Default | DisplayFormatType::Verbose => {
140                write!(f, "OutputRequirementExec")
141            }
142            DisplayFormatType::TreeRender => {
143                // TODO: collect info
144                write!(f, "")
145            }
146        }
147    }
148}
149
150impl ExecutionPlan for OutputRequirementExec {
151    fn name(&self) -> &'static str {
152        "OutputRequirementExec"
153    }
154
155    fn as_any(&self) -> &dyn std::any::Any {
156        self
157    }
158
159    fn properties(&self) -> &PlanProperties {
160        &self.cache
161    }
162
163    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
164        vec![false]
165    }
166
167    fn required_input_distribution(&self) -> Vec<Distribution> {
168        vec![self.dist_requirement.clone()]
169    }
170
171    fn maintains_input_order(&self) -> Vec<bool> {
172        vec![true]
173    }
174
175    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
176        vec![&self.input]
177    }
178
179    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
180        vec![self.order_requirement.clone()]
181    }
182
183    fn with_new_children(
184        self: Arc<Self>,
185        mut children: Vec<Arc<dyn ExecutionPlan>>,
186    ) -> Result<Arc<dyn ExecutionPlan>> {
187        Ok(Arc::new(Self::new(
188            children.remove(0), // has a single child
189            self.order_requirement.clone(),
190            self.dist_requirement.clone(),
191        )))
192    }
193
194    fn execute(
195        &self,
196        _partition: usize,
197        _context: Arc<TaskContext>,
198    ) -> Result<SendableRecordBatchStream> {
199        unreachable!();
200    }
201
202    fn statistics(&self) -> Result<Statistics> {
203        self.input.partition_statistics(None)
204    }
205
206    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
207        self.input.partition_statistics(partition)
208    }
209
210    fn try_swapping_with_projection(
211        &self,
212        projection: &ProjectionExec,
213    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
214        // If the projection does not narrow the schema, we should not try to push it down:
215        if projection.expr().len() >= projection.input().schema().fields().len() {
216            return Ok(None);
217        }
218
219        let mut updated_sort_reqs = LexRequirement::new(vec![]);
220        // None or empty_vec can be treated in the same way.
221        if let Some(reqs) = &self.required_input_ordering()[0] {
222            for req in &reqs.inner {
223                let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)?
224                else {
225                    return Ok(None);
226                };
227                updated_sort_reqs.push(PhysicalSortRequirement {
228                    expr: new_expr,
229                    options: req.options,
230                });
231            }
232        }
233
234        let dist_req = match &self.required_input_distribution()[0] {
235            Distribution::HashPartitioned(exprs) => {
236                let mut updated_exprs = vec![];
237                for expr in exprs {
238                    let Some(new_expr) = update_expr(expr, projection.expr(), false)?
239                    else {
240                        return Ok(None);
241                    };
242                    updated_exprs.push(new_expr);
243                }
244                Distribution::HashPartitioned(updated_exprs)
245            }
246            dist => dist.clone(),
247        };
248
249        make_with_child(projection, &self.input())
250            .map(|input| {
251                OutputRequirementExec::new(
252                    input,
253                    (!updated_sort_reqs.is_empty()).then_some(updated_sort_reqs),
254                    dist_req,
255                )
256            })
257            .map(|e| Some(Arc::new(e) as _))
258    }
259}
260
261impl PhysicalOptimizerRule for OutputRequirements {
262    fn optimize(
263        &self,
264        plan: Arc<dyn ExecutionPlan>,
265        _config: &ConfigOptions,
266    ) -> Result<Arc<dyn ExecutionPlan>> {
267        match self.mode {
268            RuleMode::Add => require_top_ordering(plan),
269            RuleMode::Remove => plan
270                .transform_up(|plan| {
271                    if let Some(sort_req) =
272                        plan.as_any().downcast_ref::<OutputRequirementExec>()
273                    {
274                        Ok(Transformed::yes(sort_req.input()))
275                    } else {
276                        Ok(Transformed::no(plan))
277                    }
278                })
279                .data(),
280        }
281    }
282
283    fn name(&self) -> &str {
284        "OutputRequirements"
285    }
286
287    fn schema_check(&self) -> bool {
288        true
289    }
290}
291
292/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that
293/// global requirements are not lost during optimization.
294fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
295    let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
296    if is_changed {
297        Ok(new_plan)
298    } else {
299        // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement.
300        Ok(Arc::new(OutputRequirementExec::new(
301            new_plan,
302            // there is no ordering requirement
303            None,
304            Distribution::UnspecifiedDistribution,
305        )) as _)
306    }
307}
308
309/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan.
310/// First entry in the tuple is resulting plan, second entry indicates whether any
311/// `OutputRequirementExec` is added to the plan.
312fn require_top_ordering_helper(
313    plan: Arc<dyn ExecutionPlan>,
314) -> Result<(Arc<dyn ExecutionPlan>, bool)> {
315    let mut children = plan.children();
316    // Global ordering defines desired ordering in the final result.
317    if children.len() != 1 {
318        Ok((plan, false))
319    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
320        // In case of constant columns, output ordering of SortExec would give an empty set.
321        // Therefore; we check the sort expression field of the SortExec to assign the requirements.
322        let req_ordering = sort_exec.expr();
323        let req_dist = sort_exec.required_input_distribution()[0].clone();
324        let reqs = LexRequirement::from(req_ordering.clone());
325        Ok((
326            Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
327            true,
328        ))
329    } else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
330        let reqs = LexRequirement::from(spm.expr().clone());
331        Ok((
332            Arc::new(OutputRequirementExec::new(
333                plan,
334                Some(reqs),
335                Distribution::SinglePartition,
336            )) as _,
337            true,
338        ))
339    } else if plan.maintains_input_order()[0]
340        && plan.required_input_ordering()[0].is_none()
341    {
342        // Keep searching for a `SortExec` as long as ordering is maintained,
343        // and on-the-way operators do not themselves require an ordering.
344        // When an operator requires an ordering, any `SortExec` below can not
345        // be responsible for (i.e. the originator of) the global ordering.
346        let (new_child, is_changed) =
347            require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
348        Ok((plan.with_new_children(vec![new_child])?, is_changed))
349    } else {
350        // Stop searching, there is no global ordering desired for the query.
351        Ok((plan, false))
352    }
353}
354
355// See tests in datafusion/core/tests/physical_optimizer