Skip to main content

datafusion_physical_optimizer/
output_requirements.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The GlobalOrderRequire optimizer rule either:
19//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global
20//!   ordering and distribution requirement across rules, or
21//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan.
22//!   Since the `OutputRequirementExec` operator is only a helper operator, it
23//!   shouldn't occur in the final plan (i.e. the executed plan).
24
25use std::sync::Arc;
26
27use crate::PhysicalOptimizerRule;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{Result, Statistics};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::Distribution;
34use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
35use datafusion_physical_plan::execution_plan::Boundedness;
36use datafusion_physical_plan::projection::{
37    ProjectionExec, make_with_child, update_expr, update_ordering_requirement,
38};
39use datafusion_physical_plan::sorts::sort::SortExec;
40use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
41use datafusion_physical_plan::{
42    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
43    SendableRecordBatchStream,
44};
45
46/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
47/// plan according to its `mode` attribute, which is set by the constructors
48/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
49/// the global requirements (ordering and distribution) across rules.
50///
51/// The primary use case of this node and rule is to specify and preserve the desired output
52/// ordering and distribution the entire plan. When sending to a single client, a single partition may
53/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be
54/// better.
55#[derive(Debug)]
56pub struct OutputRequirements {
57    mode: RuleMode,
58}
59
60impl OutputRequirements {
61    /// Create a new rule which works in `Add` mode; i.e. it simply adds a
62    /// top-level [`OutputRequirementExec`] into the physical plan to keep track
63    /// of global ordering and distribution requirements if there are any.
64    /// Note that this rule should run at the beginning.
65    pub fn new_add_mode() -> Self {
66        Self {
67            mode: RuleMode::Add,
68        }
69    }
70
71    /// Create a new rule which works in `Remove` mode; i.e. it simply removes
72    /// the top-level [`OutputRequirementExec`] from the physical plan if there is
73    /// any. We do this because a `OutputRequirementExec` is an ancillary,
74    /// non-executable operator whose sole purpose is to track global
75    /// requirements during optimization. Therefore, a
76    /// `OutputRequirementExec` should not appear in the final plan.
77    pub fn new_remove_mode() -> Self {
78        Self {
79            mode: RuleMode::Remove,
80        }
81    }
82}
83
84#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)]
85enum RuleMode {
86    Add,
87    Remove,
88}
89
90/// An ancillary, non-executable operator whose sole purpose is to track global
91/// requirements during optimization. It imposes
92/// - the ordering requirement in its `order_requirement` attribute.
93/// - the distribution requirement in its `dist_requirement` attribute.
94///
95/// See [`OutputRequirements`] for more details
96#[derive(Debug)]
97pub struct OutputRequirementExec {
98    input: Arc<dyn ExecutionPlan>,
99    order_requirement: Option<OrderingRequirements>,
100    dist_requirement: Distribution,
101    cache: Arc<PlanProperties>,
102    fetch: Option<usize>,
103}
104
105impl OutputRequirementExec {
106    pub fn new(
107        input: Arc<dyn ExecutionPlan>,
108        requirements: Option<OrderingRequirements>,
109        dist_requirement: Distribution,
110        fetch: Option<usize>,
111    ) -> Self {
112        let cache = Self::compute_properties(&input, &fetch);
113        Self {
114            input,
115            order_requirement: requirements,
116            dist_requirement,
117            cache: Arc::new(cache),
118            fetch,
119        }
120    }
121
122    pub fn input(&self) -> Arc<dyn ExecutionPlan> {
123        Arc::clone(&self.input)
124    }
125
126    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
127    fn compute_properties(
128        input: &Arc<dyn ExecutionPlan>,
129        fetch: &Option<usize>,
130    ) -> PlanProperties {
131        let boundedness = if fetch.is_some() {
132            Boundedness::Bounded
133        } else {
134            input.boundedness()
135        };
136
137        PlanProperties::new(
138            input.equivalence_properties().clone(), // Equivalence Properties
139            input.output_partitioning().clone(),    // Output Partitioning
140            input.pipeline_behavior(),              // Pipeline Behavior
141            boundedness,                            // Boundedness
142        )
143    }
144
145    /// Get fetch
146    pub fn fetch(&self) -> Option<usize> {
147        self.fetch
148    }
149}
150
151impl DisplayAs for OutputRequirementExec {
152    fn fmt_as(
153        &self,
154        t: DisplayFormatType,
155        f: &mut std::fmt::Formatter,
156    ) -> std::fmt::Result {
157        match t {
158            DisplayFormatType::Default | DisplayFormatType::Verbose => {
159                let order_cols = self
160                    .order_requirement
161                    .as_ref()
162                    .map(|reqs| reqs.first())
163                    .map(|lex| {
164                        let pairs: Vec<String> = lex
165                            .iter()
166                            .map(|req| {
167                                let direction = req
168                                    .options
169                                    .as_ref()
170                                    .map(
171                                        |opt| if opt.descending { "desc" } else { "asc" },
172                                    )
173                                    .unwrap_or("unspecified");
174                                format!("({}, {direction})", req.expr)
175                            })
176                            .collect();
177                        format!("[{}]", pairs.join(", "))
178                    })
179                    .unwrap_or_else(|| "[]".to_string());
180
181                write!(
182                    f,
183                    "OutputRequirementExec: order_by={}, dist_by={}",
184                    order_cols, self.dist_requirement
185                )
186            }
187            DisplayFormatType::TreeRender => {
188                write!(f, "")
189            }
190        }
191    }
192}
193
194impl ExecutionPlan for OutputRequirementExec {
195    fn name(&self) -> &'static str {
196        "OutputRequirementExec"
197    }
198
199    fn properties(&self) -> &Arc<PlanProperties> {
200        &self.cache
201    }
202
203    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
204        vec![false]
205    }
206
207    fn required_input_distribution(&self) -> Vec<Distribution> {
208        vec![self.dist_requirement.clone()]
209    }
210
211    fn maintains_input_order(&self) -> Vec<bool> {
212        vec![true]
213    }
214
215    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
216        vec![&self.input]
217    }
218
219    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
220        vec![self.order_requirement.clone()]
221    }
222
223    fn with_new_children(
224        self: Arc<Self>,
225        mut children: Vec<Arc<dyn ExecutionPlan>>,
226    ) -> Result<Arc<dyn ExecutionPlan>> {
227        Ok(Arc::new(Self::new(
228            children.remove(0), // has a single child
229            self.order_requirement.clone(),
230            self.dist_requirement.clone(),
231            self.fetch,
232        )))
233    }
234
235    fn execute(
236        &self,
237        _partition: usize,
238        _context: Arc<TaskContext>,
239    ) -> Result<SendableRecordBatchStream> {
240        unreachable!();
241    }
242
243    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
244        self.input.partition_statistics(partition)
245    }
246
247    fn try_swapping_with_projection(
248        &self,
249        projection: &ProjectionExec,
250    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
251        // If the projection does not narrow the schema, we should not try to push it down:
252        let proj_exprs = projection.expr();
253        if proj_exprs.len() >= projection.input().schema().fields().len() {
254            return Ok(None);
255        }
256
257        let mut requirements = self.required_input_ordering().swap_remove(0);
258        if let Some(reqs) = requirements {
259            let mut updated_reqs = vec![];
260            let (lexes, soft) = reqs.into_alternatives();
261            for lex in lexes.into_iter() {
262                let Some(updated_lex) = update_ordering_requirement(lex, proj_exprs)?
263                else {
264                    return Ok(None);
265                };
266                updated_reqs.push(updated_lex);
267            }
268            requirements = OrderingRequirements::new_alternatives(updated_reqs, soft);
269        }
270
271        let dist_req = match &self.required_input_distribution()[0] {
272            Distribution::HashPartitioned(exprs) => {
273                let mut updated_exprs = vec![];
274                for expr in exprs {
275                    let Some(new_expr) = update_expr(expr, projection.expr(), false)?
276                    else {
277                        return Ok(None);
278                    };
279                    updated_exprs.push(new_expr);
280                }
281                Distribution::HashPartitioned(updated_exprs)
282            }
283            dist => dist.clone(),
284        };
285
286        make_with_child(projection, &self.input()).map(|input| {
287            let e = OutputRequirementExec::new(input, requirements, dist_req, self.fetch);
288            Some(Arc::new(e) as _)
289        })
290    }
291
292    fn fetch(&self) -> Option<usize> {
293        self.fetch
294    }
295}
296
297impl PhysicalOptimizerRule for OutputRequirements {
298    fn optimize(
299        &self,
300        plan: Arc<dyn ExecutionPlan>,
301        _config: &ConfigOptions,
302    ) -> Result<Arc<dyn ExecutionPlan>> {
303        match self.mode {
304            RuleMode::Add => require_top_ordering(plan),
305            RuleMode::Remove => plan
306                .transform_up(|plan| {
307                    if let Some(sort_req) = plan.downcast_ref::<OutputRequirementExec>() {
308                        Ok(Transformed::yes(sort_req.input()))
309                    } else {
310                        Ok(Transformed::no(plan))
311                    }
312                })
313                .data(),
314        }
315    }
316
317    fn name(&self) -> &str {
318        "OutputRequirements"
319    }
320
321    fn schema_check(&self) -> bool {
322        true
323    }
324}
325
326/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that
327/// global requirements are not lost during optimization.
328fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
329    let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
330    if is_changed {
331        Ok(new_plan)
332    } else {
333        // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement.
334        Ok(Arc::new(OutputRequirementExec::new(
335            new_plan,
336            // there is no ordering requirement
337            None,
338            Distribution::UnspecifiedDistribution,
339            None,
340        )) as _)
341    }
342}
343
344/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan.
345/// First entry in the tuple is resulting plan, second entry indicates whether any
346/// `OutputRequirementExec` is added to the plan.
347fn require_top_ordering_helper(
348    plan: Arc<dyn ExecutionPlan>,
349) -> Result<(Arc<dyn ExecutionPlan>, bool)> {
350    let mut children = plan.children();
351    // Global ordering defines desired ordering in the final result.
352    if children.len() != 1 {
353        Ok((plan, false))
354    } else if let Some(sort_exec) = plan.downcast_ref::<SortExec>() {
355        // In case of constant columns, output ordering of the `SortExec` would
356        // be an empty set. Therefore; we check the sort expression field to
357        // assign the requirements.
358        let req_dist = sort_exec.required_input_distribution().swap_remove(0);
359        let req_ordering = sort_exec.expr();
360        let reqs = OrderingRequirements::from(req_ordering.clone());
361        let fetch = sort_exec.fetch();
362
363        Ok((
364            Arc::new(OutputRequirementExec::new(
365                plan,
366                Some(reqs),
367                req_dist,
368                fetch,
369            )) as _,
370            true,
371        ))
372    } else if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>() {
373        let reqs = OrderingRequirements::from(spm.expr().clone());
374        let fetch = spm.fetch();
375        Ok((
376            Arc::new(OutputRequirementExec::new(
377                plan,
378                Some(reqs),
379                Distribution::SinglePartition,
380                fetch,
381            )) as _,
382            true,
383        ))
384    } else if plan.maintains_input_order()[0]
385        && (plan.required_input_ordering()[0]
386            .as_ref()
387            .is_none_or(|o| matches!(o, OrderingRequirements::Soft(_))))
388    {
389        // Keep searching for a `SortExec` as long as ordering is maintained,
390        // and on-the-way operators do not themselves require an ordering.
391        // When an operator requires an ordering, any `SortExec` below can not
392        // be responsible for (i.e. the originator of) the global ordering.
393        let (new_child, is_changed) =
394            require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
395
396        let plan = if is_changed {
397            plan.with_new_children(vec![new_child])?
398        } else {
399            plan
400        };
401
402        Ok((plan, is_changed))
403    } else {
404        // Stop searching, there is no global ordering desired for the query.
405        Ok((plan, false))
406    }
407}
408
409// See tests in datafusion/core/tests/physical_optimizer