datafusion_physical_optimizer/enforce_sorting/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceSorting optimizer rule inspects the physical plan with respect
19//! to local sorting requirements and does the following:
20//! - Adds a [`SortExec`] when a requirement is not met,
21//! - Removes an already-existing [`SortExec`] if it is possible to prove
22//!   that this sort is unnecessary
23//!
24//! The rule can work on valid *and* invalid physical plans with respect to
25//! sorting requirements, but always produces a valid physical plan in this sense.
26//!
27//! A non-realistic but easy to follow example for sort removals: Assume that we
28//! somehow get the fragment
29//!
30//! ```text
31//! SortExec: expr=[nullable_col@0 ASC]
32//!   SortExec: expr=[non_nullable_col@1 ASC]
33//! ```
34//!
35//! in the physical plan. The first sort is unnecessary since its result is overwritten
36//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
37
38pub mod replace_with_order_preserving_variants;
39pub mod sort_pushdown;
40
41use std::sync::Arc;
42
43use crate::enforce_sorting::replace_with_order_preserving_variants::{
44    replace_with_order_preserving_variants, OrderPreservationContext,
45};
46use crate::enforce_sorting::sort_pushdown::{
47    assign_initial_requirements, pushdown_sorts, SortPushDown,
48};
49use crate::utils::{
50    add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
51    is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
52};
53use crate::PhysicalOptimizerRule;
54
55use datafusion_common::config::ConfigOptions;
56use datafusion_common::plan_err;
57use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
58use datafusion_common::Result;
59use datafusion_physical_expr::{Distribution, Partitioning};
60use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
61use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
62use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
63use datafusion_physical_plan::repartition::RepartitionExec;
64use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
65use datafusion_physical_plan::sorts::sort::SortExec;
66use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
67use datafusion_physical_plan::tree_node::PlanContext;
68use datafusion_physical_plan::windows::{
69    get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
70};
71use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
72
73use itertools::izip;
74
75/// This rule inspects [`SortExec`]'s in the given physical plan in order to
76/// remove unnecessary sorts, and optimize sort performance across the plan.
77#[derive(Default, Debug)]
78pub struct EnforceSorting {}
79
80impl EnforceSorting {
81    #[allow(missing_docs)]
82    pub fn new() -> Self {
83        Self {}
84    }
85}
86
87/// This context object is used within the [`EnforceSorting`] rule to track the closest
88/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
89/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
90/// via its children.
91pub type PlanWithCorrespondingSort = PlanContext<bool>;
92
93/// For a given node, update the [`PlanContext.data`] attribute.
94///
95/// If the node is a `SortExec`, or any of the node's children are a `SortExec`,
96/// then set the attribute to true.
97///
98/// This requires a bottom-up traversal was previously performed, updating the
99/// children previously.
100fn update_sort_ctx_children_data(
101    mut node_and_ctx: PlanWithCorrespondingSort,
102    data: bool,
103) -> Result<PlanWithCorrespondingSort> {
104    // Update `child.data` for all children.
105    for child_node in node_and_ctx.children.iter_mut() {
106        let child_plan = &child_node.plan;
107        child_node.data = if is_sort(child_plan) {
108            // child is sort
109            true
110        } else if is_limit(child_plan) {
111            // There is no sort linkage for this path, it starts at a limit.
112            false
113        } else {
114            // If a descendent is a sort, and the child maintains the sort.
115            let is_spm = is_sort_preserving_merge(child_plan);
116            let required_orderings = child_plan.required_input_ordering();
117            let flags = child_plan.maintains_input_order();
118            // Add parent node to the tree if there is at least one child with
119            // a sort connection:
120            izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
121                let propagates_ordering =
122                    (maintains && required_ordering.is_none()) || is_spm;
123                // `connected_to_sort` only returns the correct answer with bottom-up traversal
124                let connected_to_sort =
125                    child_node.children.iter().any(|child| child.data);
126                propagates_ordering && connected_to_sort
127            })
128        }
129    }
130
131    // set data attribute on current node
132    node_and_ctx.data = data;
133
134    Ok(node_and_ctx)
135}
136
137/// This object is used within the [`EnforceSorting`] rule to track the closest
138/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
139/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
140/// connected to a `CoalescePartitionsExec` via its children.
141///
142/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
143///
144/// This requires a bottom-up traversal was previously performed, updating the
145/// children previously.
146pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
147
148/// Discovers the linked Coalesce->Sort cascades.
149///
150/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
151/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added
152/// at the root of the subplan (just after the sort) in order to parallelize sorts.
153/// Refer to the [`parallelize_sorts`] for more details on sort parallelization.
154///
155/// Example of linked Coalesce->Sort:
156/// ```text
157/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
158///   ...nodes...   ctx.data=true (e.g. are linked in cascade)
159///     Coalesce  ctx.data=true (e.g. is a coalesce)
160/// ```
161///
162/// The link should not be continued (and the coalesce not removed) if the distribution
163/// is changed between the Coalesce->Sort cascade. Example:
164/// ```text
165/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
166///   AggregateExec  ctx.data=false, to stop the link
167///     ...nodes...   ctx.data=true (e.g. are linked in cascade)
168///       Coalesce  ctx.data=true (e.g. is a coalesce)
169/// ```
170fn update_coalesce_ctx_children(
171    coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
172) {
173    let children = &coalesce_context.children;
174    coalesce_context.data = if children.is_empty() {
175        // Plan has no children, it cannot be a `CoalescePartitionsExec`.
176        false
177    } else if is_coalesce_partitions(&coalesce_context.plan) {
178        // Initiate a connection:
179        true
180    } else {
181        children.iter().enumerate().any(|(idx, node)| {
182            // Only consider operators that don't require a single partition,
183            // and connected to some `CoalescePartitionsExec`:
184            node.data
185                && !matches!(
186                    coalesce_context.plan.required_input_distribution()[idx],
187                    Distribution::SinglePartition
188                )
189        })
190    };
191}
192
193/// Performs optimizations based upon a series of subrules.
194///
195/// Refer to each subrule for detailed descriptions of the optimizations performed:
196/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`],
197/// and [`pushdown_sorts`].
198///
199/// Subrule application is ordering dependent.
200///
201/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled.
202impl PhysicalOptimizerRule for EnforceSorting {
203    fn optimize(
204        &self,
205        plan: Arc<dyn ExecutionPlan>,
206        config: &ConfigOptions,
207    ) -> Result<Arc<dyn ExecutionPlan>> {
208        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
209        // Execute a bottom-up traversal to enforce sorting requirements,
210        // remove unnecessary sorts, and optimize sort-sensitive operators:
211        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
212        let new_plan = if config.optimizer.repartition_sorts {
213            let plan_with_coalesce_partitions =
214                PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
215            let parallel = plan_with_coalesce_partitions
216                .transform_up(parallelize_sorts)
217                .data()?;
218            parallel.plan
219        } else {
220            adjusted.plan
221        };
222
223        let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
224        let updated_plan = plan_with_pipeline_fixer
225            .transform_up(|plan_with_pipeline_fixer| {
226                replace_with_order_preserving_variants(
227                    plan_with_pipeline_fixer,
228                    false,
229                    true,
230                    config,
231                )
232            })
233            .data()?;
234        // Execute a top-down traversal to exploit sort push-down opportunities
235        // missed by the bottom-up traversal:
236        let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
237        assign_initial_requirements(&mut sort_pushdown);
238        let adjusted = pushdown_sorts(sort_pushdown)?;
239        adjusted
240            .plan
241            .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
242            .data()
243    }
244
245    fn name(&self) -> &str {
246        "EnforceSorting"
247    }
248
249    fn schema_check(&self) -> bool {
250        true
251    }
252}
253
254fn replace_with_partial_sort(
255    plan: Arc<dyn ExecutionPlan>,
256) -> Result<Arc<dyn ExecutionPlan>> {
257    let plan_any = plan.as_any();
258    if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
259        let child = Arc::clone(sort_plan.children()[0]);
260        if !child.boundedness().is_unbounded() {
261            return Ok(plan);
262        }
263
264        // here we're trying to find the common prefix for sorted columns that is required for the
265        // sort and already satisfied by the given ordering
266        let child_eq_properties = child.equivalence_properties();
267        let sort_req = LexRequirement::from(sort_plan.expr().clone());
268
269        let mut common_prefix_length = 0;
270        while child_eq_properties.ordering_satisfy_requirement(&LexRequirement {
271            inner: sort_req[0..common_prefix_length + 1].to_vec(),
272        }) {
273            common_prefix_length += 1;
274        }
275        if common_prefix_length > 0 {
276            return Ok(Arc::new(
277                PartialSortExec::new(
278                    LexOrdering::new(sort_plan.expr().to_vec()),
279                    Arc::clone(sort_plan.input()),
280                    common_prefix_length,
281                )
282                .with_preserve_partitioning(sort_plan.preserve_partitioning())
283                .with_fetch(sort_plan.fetch()),
284            ));
285        }
286    }
287    Ok(plan)
288}
289
290/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into
291/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below:
292///
293/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
294/// combine the partitions first, and then sort:
295/// ```text
296///   ┌ ─ ─ ─ ─ ─ ┐                                                                                   
297///    ┌─┬─┬─┐                                                                                        
298///   ││B│A│D│... ├──┐                                                                                
299///    └─┴─┴─┘       │                                                                                
300///   └ ─ ─ ─ ─ ─ ┘  │  ┌────────────────────────┐   ┌ ─ ─ ─ ─ ─ ─ ┐   ┌────────┐    ┌ ─ ─ ─ ─ ─ ─ ─ ┐
301///    Partition 1   │  │        Coalesce        │    ┌─┬─┬─┬─┬─┐      │        │     ┌─┬─┬─┬─┬─┐     
302///                  ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶  Sort  ├───▶││A│B│C│D│E│... │
303///                  │  │                        │    └─┴─┴─┴─┴─┘      │        │     └─┴─┴─┴─┴─┘     
304///   ┌ ─ ─ ─ ─ ─ ┐  │  └────────────────────────┘   └ ─ ─ ─ ─ ─ ─ ┘   └────────┘    └ ─ ─ ─ ─ ─ ─ ─ ┘
305///    ┌─┬─┐         │                                 Partition                       Partition      
306///   ││E│C│ ...  ├──┘                                                                                
307///    └─┴─┘                                                                                          
308///   └ ─ ─ ─ ─ ─ ┘                                                                                   
309///    Partition 2                                                                                    
310/// ```                                                                                                 
311///
312///
313/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
314/// sorts each partition first, then merge partitions while retaining the sort:
315/// ```text
316///   ┌ ─ ─ ─ ─ ─ ┐   ┌────────┐   ┌ ─ ─ ─ ─ ─ ┐                                                 
317///    ┌─┬─┬─┐        │        │    ┌─┬─┬─┐                                                      
318///   ││B│A│D│... │──▶│  Sort  │──▶││A│B│D│... │──┐                                              
319///    └─┴─┴─┘        │        │    └─┴─┴─┘       │                                              
320///   └ ─ ─ ─ ─ ─ ┘   └────────┘   └ ─ ─ ─ ─ ─ ┘  │  ┌─────────────────────┐    ┌ ─ ─ ─ ─ ─ ─ ─ ┐
321///    Partition 1                  Partition 1   │  │                     │     ┌─┬─┬─┬─┬─┐     
322///                                               ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
323///                                               │  │                     │     └─┴─┴─┴─┴─┘     
324///   ┌ ─ ─ ─ ─ ─ ┐   ┌────────┐   ┌ ─ ─ ─ ─ ─ ┐  │  └─────────────────────┘    └ ─ ─ ─ ─ ─ ─ ─ ┘
325///    ┌─┬─┐          │        │    ┌─┬─┐         │                               Partition      
326///   ││E│C│ ...  │──▶│  Sort  ├──▶││C│E│ ...  │──┘                                              
327///    └─┴─┘          │        │    └─┴─┘                                                        
328///   └ ─ ─ ─ ─ ─ ┘   └────────┘   └ ─ ─ ─ ─ ─ ┘                                                 
329///    Partition 2                  Partition 2                                                  
330/// ```
331///
332/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the
333/// sort first on a per-partition basis, thereby parallelizing the sort.
334///
335///
336/// The outcome is that plans of the form
337/// ```text
338///      "SortExec: expr=\[a@0 ASC\]",
339///      "  ...nodes..."
340///      "    CoalescePartitionsExec",
341///      "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
342/// ```
343/// are transformed into
344/// ```text
345///      "SortPreservingMergeExec: \[a@0 ASC\]",
346///      "  SortExec: expr=\[a@0 ASC\]",
347///      "    ...nodes..."
348///      "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
349/// ```
350/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
351/// By performing sorting in parallel, we can increase performance in some scenarios.
352///
353/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
354/// which require single partitioning. Do not parallelize when the following scenario occurs:
355/// ```text
356///      "SortExec: expr=\[a@0 ASC\]",
357///      "  ...nodes requiring single partitioning..."
358///      "    CoalescePartitionsExec",
359///      "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
360/// ```
361pub fn parallelize_sorts(
362    mut requirements: PlanWithCorrespondingCoalescePartitions,
363) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
364    update_coalesce_ctx_children(&mut requirements);
365
366    if requirements.children.is_empty() || !requirements.children[0].data {
367        // We only take an action when the plan is either a `SortExec`, a
368        // `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
369        // all have a single child. Therefore, if the first child has no
370        // connection, we can return immediately.
371        Ok(Transformed::no(requirements))
372    } else if (is_sort(&requirements.plan)
373        || is_sort_preserving_merge(&requirements.plan))
374        && requirements.plan.output_partitioning().partition_count() <= 1
375    {
376        // Take the initial sort expressions and requirements
377        let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
378        let sort_reqs = LexRequirement::from(sort_exprs.clone());
379        let sort_exprs = sort_exprs.clone();
380
381        // If there is a connection between a `CoalescePartitionsExec` and a
382        // global sort that satisfy the requirements (i.e. intermediate
383        // executors don't require single partition), then we can replace
384        // the `CoalescePartitionsExec` + `SortExec` cascade with a `SortExec`
385        // + `SortPreservingMergeExec` cascade to parallelize sorting.
386        requirements = remove_bottleneck_in_subplan(requirements)?;
387        // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
388        // deals with the children and their children and so on.
389        requirements = requirements.children.swap_remove(0);
390
391        requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);
392
393        let spm =
394            SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
395        Ok(Transformed::yes(
396            PlanWithCorrespondingCoalescePartitions::new(
397                Arc::new(spm.with_fetch(fetch)),
398                false,
399                vec![requirements],
400            ),
401        ))
402    } else if is_coalesce_partitions(&requirements.plan) {
403        let fetch = requirements.plan.fetch();
404        // There is an unnecessary `CoalescePartitionsExec` in the plan.
405        // This will handle the recursive `CoalescePartitionsExec` plans.
406        requirements = remove_bottleneck_in_subplan(requirements)?;
407        // For the removal of self node which is also a `CoalescePartitionsExec`.
408        requirements = requirements.children.swap_remove(0);
409
410        Ok(Transformed::yes(
411            PlanWithCorrespondingCoalescePartitions::new(
412                Arc::new(
413                    CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
414                        .with_fetch(fetch),
415                ),
416                false,
417                vec![requirements],
418            ),
419        ))
420    } else {
421        Ok(Transformed::yes(requirements))
422    }
423}
424
425/// This function enforces sorting requirements and makes optimizations without
426/// violating these requirements whenever possible. Requires a bottom-up traversal.
427pub fn ensure_sorting(
428    mut requirements: PlanWithCorrespondingSort,
429) -> Result<Transformed<PlanWithCorrespondingSort>> {
430    requirements = update_sort_ctx_children_data(requirements, false)?;
431
432    // Perform naive analysis at the beginning -- remove already-satisfied sorts:
433    if requirements.children.is_empty() {
434        return Ok(Transformed::no(requirements));
435    }
436    let maybe_requirements = analyze_immediate_sort_removal(requirements);
437    requirements = if !maybe_requirements.transformed {
438        maybe_requirements.data
439    } else {
440        return Ok(maybe_requirements);
441    };
442
443    let plan = &requirements.plan;
444    let mut updated_children = vec![];
445    for (idx, (required_ordering, mut child)) in plan
446        .required_input_ordering()
447        .into_iter()
448        .zip(requirements.children.into_iter())
449        .enumerate()
450    {
451        let physical_ordering = child.plan.output_ordering();
452
453        if let Some(required) = required_ordering {
454            let eq_properties = child.plan.equivalence_properties();
455            if !eq_properties.ordering_satisfy_requirement(&required) {
456                // Make sure we preserve the ordering requirements:
457                if physical_ordering.is_some() {
458                    child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
459                }
460                child = add_sort_above(child, required, None);
461                child = update_sort_ctx_children_data(child, true)?;
462            }
463        } else if physical_ordering.is_none()
464            || !plan.maintains_input_order()[idx]
465            || is_union(plan)
466        {
467            // We have a `SortExec` whose effect may be neutralized by another
468            // order-imposing operator, remove this sort:
469            child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
470        }
471        updated_children.push(child);
472    }
473    requirements.children = updated_children;
474    requirements = requirements.update_plan_from_children()?;
475    // For window expressions, we can remove some sorts when we can
476    // calculate the result in reverse:
477    let child_node = &requirements.children[0];
478    if is_window(&requirements.plan) && child_node.data {
479        return adjust_window_sort_removal(requirements).map(Transformed::yes);
480    } else if is_sort_preserving_merge(&requirements.plan)
481        && child_node.plan.output_partitioning().partition_count() <= 1
482    {
483        // This `SortPreservingMergeExec` is unnecessary, input already has a
484        // single partition and no fetch is required.
485        let mut child_node = requirements.children.swap_remove(0);
486        if let Some(fetch) = requirements.plan.fetch() {
487            // Add the limit exec if the original SPM had a fetch:
488            child_node.plan =
489                Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
490        }
491        return Ok(Transformed::yes(child_node));
492    }
493    update_sort_ctx_children_data(requirements, false).map(Transformed::yes)
494}
495
496/// Analyzes a given [`SortExec`] (`plan`) to determine whether its input
497/// already has a finer ordering than it enforces.
498fn analyze_immediate_sort_removal(
499    mut node: PlanWithCorrespondingSort,
500) -> Transformed<PlanWithCorrespondingSort> {
501    if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
502        let sort_input = sort_exec.input();
503        // If this sort is unnecessary, we should remove it:
504        if sort_input.equivalence_properties().ordering_satisfy(
505            sort_exec
506                .properties()
507                .output_ordering()
508                .unwrap_or_else(|| LexOrdering::empty()),
509        ) {
510            node.plan = if !sort_exec.preserve_partitioning()
511                && sort_input.output_partitioning().partition_count() > 1
512            {
513                // Replace the sort with a sort-preserving merge:
514                let expr = LexOrdering::new(sort_exec.expr().to_vec());
515                Arc::new(
516                    SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
517                        .with_fetch(sort_exec.fetch()),
518                ) as _
519            } else {
520                // Remove the sort:
521                node.children = node.children.swap_remove(0).children;
522                if let Some(fetch) = sort_exec.fetch() {
523                    // If the sort has a fetch, we need to add a limit:
524                    if sort_exec
525                        .properties()
526                        .output_partitioning()
527                        .partition_count()
528                        == 1
529                    {
530                        Arc::new(GlobalLimitExec::new(
531                            Arc::clone(sort_input),
532                            0,
533                            Some(fetch),
534                        ))
535                    } else {
536                        Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch))
537                    }
538                } else {
539                    Arc::clone(sort_input)
540                }
541            };
542            for child in node.children.iter_mut() {
543                child.data = false;
544            }
545            node.data = false;
546            return Transformed::yes(node);
547        }
548    }
549    Transformed::no(node)
550}
551
552/// Adjusts a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
553/// whether it may allow removing a sort.
554fn adjust_window_sort_removal(
555    mut window_tree: PlanWithCorrespondingSort,
556) -> Result<PlanWithCorrespondingSort> {
557    // Window operators have a single child we need to adjust:
558    let child_node = remove_corresponding_sort_from_sub_plan(
559        window_tree.children.swap_remove(0),
560        matches!(
561            window_tree.plan.required_input_distribution()[0],
562            Distribution::SinglePartition
563        ),
564    )?;
565    window_tree.children.push(child_node);
566
567    let plan = window_tree.plan.as_any();
568    let child_plan = &window_tree.children[0].plan;
569    let (window_expr, new_window) =
570        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
571            let window_expr = exec.window_expr();
572            let new_window =
573                get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
574            (window_expr, new_window)
575        } else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
576            let window_expr = exec.window_expr();
577            let new_window =
578                get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
579            (window_expr, new_window)
580        } else {
581            return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
582        };
583
584    window_tree.plan = if let Some(new_window) = new_window {
585        // We were able to change the window to accommodate the input, use it:
586        new_window
587    } else {
588        // We were unable to change the window to accommodate the input, so we
589        // will insert a sort.
590        let reqs = window_tree
591            .plan
592            .required_input_ordering()
593            .swap_remove(0)
594            .unwrap_or_default();
595
596        // Satisfy the ordering requirement so that the window can run:
597        let mut child_node = window_tree.children.swap_remove(0);
598        child_node = add_sort_above(child_node, reqs, None);
599        let child_plan = Arc::clone(&child_node.plan);
600        window_tree.children.push(child_node);
601
602        if window_expr.iter().all(|e| e.uses_bounded_memory()) {
603            Arc::new(BoundedWindowAggExec::try_new(
604                window_expr.to_vec(),
605                child_plan,
606                InputOrderMode::Sorted,
607                !window_expr[0].partition_by().is_empty(),
608            )?) as _
609        } else {
610            Arc::new(WindowAggExec::try_new(
611                window_expr.to_vec(),
612                child_plan,
613                !window_expr[0].partition_by().is_empty(),
614            )?) as _
615        }
616    };
617
618    window_tree.data = false;
619    Ok(window_tree)
620}
621
622/// Removes parallelization-reducing, avoidable [`CoalescePartitionsExec`]s from
623/// the plan in `node`. After the removal of such `CoalescePartitionsExec`s from
624/// the plan, some of the remaining `RepartitionExec`s might become unnecessary.
625/// Removes such `RepartitionExec`s from the plan as well.
626fn remove_bottleneck_in_subplan(
627    mut requirements: PlanWithCorrespondingCoalescePartitions,
628) -> Result<PlanWithCorrespondingCoalescePartitions> {
629    let plan = &requirements.plan;
630    let children = &mut requirements.children;
631    if is_coalesce_partitions(&children[0].plan) {
632        // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
633        let mut new_child_node = children[0].children.swap_remove(0);
634        while new_child_node.plan.output_partitioning() == plan.output_partitioning()
635            && is_repartition(&new_child_node.plan)
636            && is_repartition(plan)
637        {
638            new_child_node = new_child_node.children.swap_remove(0)
639        }
640        children[0] = new_child_node;
641    } else {
642        requirements.children = requirements
643            .children
644            .into_iter()
645            .map(|node| {
646                if node.data {
647                    remove_bottleneck_in_subplan(node)
648                } else {
649                    Ok(node)
650                }
651            })
652            .collect::<Result<_>>()?;
653    }
654    let mut new_reqs = requirements.update_plan_from_children()?;
655    if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
656        let input_partitioning = repartition.input().output_partitioning();
657        // We can remove this repartitioning operator if it is now a no-op:
658        let mut can_remove = input_partitioning.eq(repartition.partitioning());
659        // We can also remove it if we ended up with an ineffective RR:
660        if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
661            can_remove |= *n_out == input_partitioning.partition_count();
662        }
663        if can_remove {
664            new_reqs = new_reqs.children.swap_remove(0)
665        }
666    }
667    Ok(new_reqs)
668}
669
670/// Updates child to remove the unnecessary sort below it.
671fn update_child_to_remove_unnecessary_sort(
672    child_idx: usize,
673    mut node: PlanWithCorrespondingSort,
674    parent: &Arc<dyn ExecutionPlan>,
675) -> Result<PlanWithCorrespondingSort> {
676    if node.data {
677        let requires_single_partition = matches!(
678            parent.required_input_distribution()[child_idx],
679            Distribution::SinglePartition
680        );
681        node = remove_corresponding_sort_from_sub_plan(node, requires_single_partition)?;
682    }
683    node.data = false;
684    Ok(node)
685}
686
687/// Removes the sort from the plan in `node`.
688fn remove_corresponding_sort_from_sub_plan(
689    mut node: PlanWithCorrespondingSort,
690    requires_single_partition: bool,
691) -> Result<PlanWithCorrespondingSort> {
692    // A `SortExec` is always at the bottom of the tree.
693    if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
694        // Do not remove sorts with fetch:
695        if sort_exec.fetch().is_none() {
696            node = node.children.swap_remove(0);
697        }
698    } else {
699        let mut any_connection = false;
700        let required_dist = node.plan.required_input_distribution();
701        node.children = node
702            .children
703            .into_iter()
704            .enumerate()
705            .map(|(idx, child)| {
706                if child.data {
707                    any_connection = true;
708                    remove_corresponding_sort_from_sub_plan(
709                        child,
710                        matches!(required_dist[idx], Distribution::SinglePartition),
711                    )
712                } else {
713                    Ok(child)
714                }
715            })
716            .collect::<Result<_>>()?;
717        node = node.update_plan_from_children()?;
718        if any_connection || node.children.is_empty() {
719            node = update_sort_ctx_children_data(node, false)?;
720        }
721
722        // Replace with variants that do not preserve order.
723        if is_sort_preserving_merge(&node.plan) {
724            node.children = node.children.swap_remove(0).children;
725            node.plan = Arc::clone(node.plan.children().swap_remove(0));
726        } else if let Some(repartition) =
727            node.plan.as_any().downcast_ref::<RepartitionExec>()
728        {
729            node.plan = Arc::new(RepartitionExec::try_new(
730                Arc::clone(&node.children[0].plan),
731                repartition.properties().output_partitioning().clone(),
732            )?) as _;
733        }
734    };
735    // Deleting a merging sort may invalidate distribution requirements.
736    // Ensure that we stay compliant with such requirements:
737    if requires_single_partition && node.plan.output_partitioning().partition_count() > 1
738    {
739        // If there is existing ordering, to preserve ordering use
740        // `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
741        let plan = Arc::clone(&node.plan);
742        let fetch = plan.fetch();
743        let plan = if let Some(ordering) = plan.output_ordering() {
744            Arc::new(
745                SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
746                    .with_fetch(fetch),
747            ) as _
748        } else {
749            Arc::new(CoalescePartitionsExec::new(plan)) as _
750        };
751        node = PlanWithCorrespondingSort::new(plan, false, vec![node]);
752        node = update_sort_ctx_children_data(node, false)?;
753    }
754    Ok(node)
755}
756
757/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible.
758fn get_sort_exprs(
759    sort_any: &Arc<dyn ExecutionPlan>,
760) -> Result<(&LexOrdering, Option<usize>)> {
761    if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
762        Ok((sort_exec.expr(), sort_exec.fetch()))
763    } else if let Some(spm) = sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
764    {
765        Ok((spm.expr(), spm.fetch()))
766    } else {
767        plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec")
768    }
769}
770
771// Tests are in tests/cases/enforce_sorting.rs