datafusion_physical_optimizer/
enforce_distribution.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use crate::optimizer::PhysicalOptimizerRule;
28use crate::output_requirements::OutputRequirementExec;
29use crate::utils::{
30    add_sort_above_with_check, is_coalesce_partitions, is_repartition,
31    is_sort_preserving_merge,
32};
33
34use arrow::compute::SortOptions;
35use datafusion_common::config::ConfigOptions;
36use datafusion_common::error::Result;
37use datafusion_common::stats::Precision;
38use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
39use datafusion_expr::logical_plan::JoinType;
40use datafusion_physical_expr::expressions::{Column, NoOp};
41use datafusion_physical_expr::utils::map_columns_before_projection;
42use datafusion_physical_expr::{
43    physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
44};
45use datafusion_physical_plan::aggregates::{
46    AggregateExec, AggregateMode, PhysicalGroupBy,
47};
48use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
49use datafusion_physical_plan::execution_plan::EmissionType;
50use datafusion_physical_plan::joins::{
51    CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52};
53use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
54use datafusion_physical_plan::repartition::RepartitionExec;
55use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
56use datafusion_physical_plan::tree_node::PlanContext;
57use datafusion_physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
58use datafusion_physical_plan::windows::WindowAggExec;
59use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
60use datafusion_physical_plan::ExecutionPlanProperties;
61use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
62
63use itertools::izip;
64
65/// The `EnforceDistribution` rule ensures that distribution requirements are
66/// met. In doing so, this rule will increase the parallelism in the plan by
67/// introducing repartitioning operators to the physical plan.
68///
69/// For example, given an input such as:
70///
71///
72/// ```text
73/// ┌─────────────────────────────────┐
74/// │                                 │
75/// │          ExecutionPlan          │
76/// │                                 │
77/// └─────────────────────────────────┘
78///             ▲         ▲
79///             │         │
80///       ┌─────┘         └─────┐
81///       │                     │
82///       │                     │
83///       │                     │
84/// ┌───────────┐         ┌───────────┐
85/// │           │         │           │
86/// │ batch A1  │         │ batch B1  │
87/// │           │         │           │
88/// ├───────────┤         ├───────────┤
89/// │           │         │           │
90/// │ batch A2  │         │ batch B2  │
91/// │           │         │           │
92/// ├───────────┤         ├───────────┤
93/// │           │         │           │
94/// │ batch A3  │         │ batch B3  │
95/// │           │         │           │
96/// └───────────┘         └───────────┘
97///
98///      Input                 Input
99///        A                     B
100/// ```
101///
102/// This rule will attempt to add a `RepartitionExec` to increase parallelism
103/// (to 3, in this case) and create the following arrangement:
104///
105/// ```text
106///     ┌─────────────────────────────────┐
107///     │                                 │
108///     │          ExecutionPlan          │
109///     │                                 │
110///     └─────────────────────────────────┘
111///               ▲      ▲       ▲            Input now has 3
112///               │      │       │             partitions
113///       ┌───────┘      │       └───────┐
114///       │              │               │
115///       │              │               │
116/// ┌───────────┐  ┌───────────┐   ┌───────────┐
117/// │           │  │           │   │           │
118/// │ batch A1  │  │ batch A3  │   │ batch B3  │
119/// │           │  │           │   │           │
120/// ├───────────┤  ├───────────┤   ├───────────┤
121/// │           │  │           │   │           │
122/// │ batch B2  │  │ batch B1  │   │ batch A2  │
123/// │           │  │           │   │           │
124/// └───────────┘  └───────────┘   └───────────┘
125///       ▲              ▲               ▲
126///       │              │               │
127///       └─────────┐    │    ┌──────────┘
128///                 │    │    │
129///                 │    │    │
130///     ┌─────────────────────────────────┐   batches are
131///     │       RepartitionExec(3)        │   repartitioned
132///     │           RoundRobin            │
133///     │                                 │
134///     └─────────────────────────────────┘
135///                 ▲         ▲
136///                 │         │
137///           ┌─────┘         └─────┐
138///           │                     │
139///           │                     │
140///           │                     │
141///     ┌───────────┐         ┌───────────┐
142///     │           │         │           │
143///     │ batch A1  │         │ batch B1  │
144///     │           │         │           │
145///     ├───────────┤         ├───────────┤
146///     │           │         │           │
147///     │ batch A2  │         │ batch B2  │
148///     │           │         │           │
149///     ├───────────┤         ├───────────┤
150///     │           │         │           │
151///     │ batch A3  │         │ batch B3  │
152///     │           │         │           │
153///     └───────────┘         └───────────┘
154///
155///
156///      Input                 Input
157///        A                     B
158/// ```
159///
160/// The `EnforceDistribution` rule
161/// - is idempotent; i.e. it can be applied multiple times, each time producing
162///   the same result.
163/// - always produces a valid plan in terms of distribution requirements. Its
164///   input plan can be valid or invalid with respect to distribution requirements,
165///   but the output plan will always be valid.
166/// - produces a valid plan in terms of ordering requirements, *if* its input is
167///   a valid plan in terms of ordering requirements. If the input plan is invalid,
168///   this rule does not attempt to fix it as doing so is the responsibility of the
169///   `EnforceSorting` rule.
170///
171/// Note that distribution requirements are met in the strictest way. This may
172/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
173/// meeting the requirements in the strictest way may help avoid possible data
174/// skew in joins.
175///
176/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
177/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
178/// (a, c), (b, c), (a), (b), (c) and ( ).
179///
180/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
181/// by a HashPartition(a, b, c).
182#[derive(Default, Debug)]
183pub struct EnforceDistribution {}
184
185impl EnforceDistribution {
186    #[allow(missing_docs)]
187    pub fn new() -> Self {
188        Self {}
189    }
190}
191
192impl PhysicalOptimizerRule for EnforceDistribution {
193    fn optimize(
194        &self,
195        plan: Arc<dyn ExecutionPlan>,
196        config: &ConfigOptions,
197    ) -> Result<Arc<dyn ExecutionPlan>> {
198        let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
199
200        let adjusted = if top_down_join_key_reordering {
201            // Run a top-down process to adjust input key ordering recursively
202            let plan_requirements = PlanWithKeyRequirements::new_default(plan);
203            let adjusted = plan_requirements
204                .transform_down(adjust_input_keys_ordering)
205                .data()?;
206            adjusted.plan
207        } else {
208            // Run a bottom-up process
209            plan.transform_up(|plan| {
210                Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
211            })
212            .data()?
213        };
214
215        let distribution_context = DistributionContext::new_default(adjusted);
216        // Distribution enforcement needs to be applied bottom-up.
217        let distribution_context = distribution_context
218            .transform_up(|distribution_context| {
219                ensure_distribution(distribution_context, config)
220            })
221            .data()?;
222        Ok(distribution_context.plan)
223    }
224
225    fn name(&self) -> &str {
226        "EnforceDistribution"
227    }
228
229    fn schema_check(&self) -> bool {
230        true
231    }
232}
233
234#[derive(Debug, Clone)]
235struct JoinKeyPairs {
236    left_keys: Vec<Arc<dyn PhysicalExpr>>,
237    right_keys: Vec<Arc<dyn PhysicalExpr>>,
238}
239
240/// Keeps track of parent required key orderings.
241pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
242
243/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
244/// That might not match with the output partitioning of the join node's children
245/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
246///
247/// Example:
248///     TopJoin on (a, b, c)
249///         bottom left join on(b, a, c)
250///         bottom right join on(c, b, a)
251///
252///  Will be adjusted to:
253///     TopJoin on (a, b, c)
254///         bottom left join on(a, b, c)
255///         bottom right join on(a, b, c)
256///
257/// Example:
258///     TopJoin on (a, b, c)
259///         Agg1 group by (b, a, c)
260///         Agg2 group by (c, b, a)
261///
262/// Will be adjusted to:
263///     TopJoin on (a, b, c)
264///          Projection(b, a, c)
265///             Agg1 group by (a, b, c)
266///          Projection(c, b, a)
267///             Agg2 group by (a, b, c)
268///
269/// Following is the explanation of the reordering process:
270///
271/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
272///    Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
273///    Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274///    Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
275///
276/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
277///    Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
278///    Requirements is already satisfied, clear all the requirements, return the unchanged plan.
279///    Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
280///
281/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
282/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
283/// 5) For other types of operators, by default, pushdown the parent requirements to children.
284///
285pub fn adjust_input_keys_ordering(
286    mut requirements: PlanWithKeyRequirements,
287) -> Result<Transformed<PlanWithKeyRequirements>> {
288    let plan = Arc::clone(&requirements.plan);
289
290    if let Some(HashJoinExec {
291        left,
292        right,
293        on,
294        filter,
295        join_type,
296        projection,
297        mode,
298        null_equality,
299        ..
300    }) = plan.as_any().downcast_ref::<HashJoinExec>()
301    {
302        match mode {
303            PartitionMode::Partitioned => {
304                let join_constructor = |new_conditions: (
305                    Vec<(PhysicalExprRef, PhysicalExprRef)>,
306                    Vec<SortOptions>,
307                )| {
308                    HashJoinExec::try_new(
309                        Arc::clone(left),
310                        Arc::clone(right),
311                        new_conditions.0,
312                        filter.clone(),
313                        join_type,
314                        // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
315                        projection.clone(),
316                        PartitionMode::Partitioned,
317                        *null_equality,
318                    )
319                    .map(|e| Arc::new(e) as _)
320                };
321                return reorder_partitioned_join_keys(
322                    requirements,
323                    on,
324                    &[],
325                    &join_constructor,
326                )
327                .map(Transformed::yes);
328            }
329            PartitionMode::CollectLeft => {
330                // Push down requirements to the right side
331                requirements.children[1].data = match join_type {
332                    JoinType::Inner | JoinType::Right => shift_right_required(
333                        &requirements.data,
334                        left.schema().fields().len(),
335                    )
336                    .unwrap_or_default(),
337                    JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
338                        requirements.data.clone()
339                    }
340                    JoinType::Left
341                    | JoinType::LeftSemi
342                    | JoinType::LeftAnti
343                    | JoinType::Full
344                    | JoinType::LeftMark => vec![],
345                };
346            }
347            PartitionMode::Auto => {
348                // Can not satisfy, clear the current requirements and generate new empty requirements
349                requirements.data.clear();
350            }
351        }
352    } else if let Some(CrossJoinExec { left, .. }) =
353        plan.as_any().downcast_ref::<CrossJoinExec>()
354    {
355        let left_columns_len = left.schema().fields().len();
356        // Push down requirements to the right side
357        requirements.children[1].data =
358            shift_right_required(&requirements.data, left_columns_len)
359                .unwrap_or_default();
360    } else if let Some(SortMergeJoinExec {
361        left,
362        right,
363        on,
364        filter,
365        join_type,
366        sort_options,
367        null_equality,
368        ..
369    }) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
370    {
371        let join_constructor = |new_conditions: (
372            Vec<(PhysicalExprRef, PhysicalExprRef)>,
373            Vec<SortOptions>,
374        )| {
375            SortMergeJoinExec::try_new(
376                Arc::clone(left),
377                Arc::clone(right),
378                new_conditions.0,
379                filter.clone(),
380                *join_type,
381                new_conditions.1,
382                *null_equality,
383            )
384            .map(|e| Arc::new(e) as _)
385        };
386        return reorder_partitioned_join_keys(
387            requirements,
388            on,
389            sort_options,
390            &join_constructor,
391        )
392        .map(Transformed::yes);
393    } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
394        if !requirements.data.is_empty() {
395            if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
396                return reorder_aggregate_keys(requirements, aggregate_exec)
397                    .map(Transformed::yes);
398            } else {
399                requirements.data.clear();
400            }
401        } else {
402            // Keep everything unchanged
403            return Ok(Transformed::no(requirements));
404        }
405    } else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
406        let expr = proj.expr();
407        // For Projection, we need to transform the requirements to the columns before the Projection
408        // And then to push down the requirements
409        // Construct a mapping from new name to the original Column
410        let proj_exprs: Vec<_> = expr
411            .iter()
412            .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
413            .collect();
414        let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
415        if new_required.len() == requirements.data.len() {
416            requirements.children[0].data = new_required;
417        } else {
418            // Can not satisfy, clear the current requirements and generate new empty requirements
419            requirements.data.clear();
420        }
421    } else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
422        || plan
423            .as_any()
424            .downcast_ref::<CoalescePartitionsExec>()
425            .is_some()
426        || plan.as_any().downcast_ref::<WindowAggExec>().is_some()
427    {
428        requirements.data.clear();
429    } else {
430        // By default, push down the parent requirements to children
431        for child in requirements.children.iter_mut() {
432            child.data.clone_from(&requirements.data);
433        }
434    }
435    Ok(Transformed::yes(requirements))
436}
437
438pub fn reorder_partitioned_join_keys<F>(
439    mut join_plan: PlanWithKeyRequirements,
440    on: &[(PhysicalExprRef, PhysicalExprRef)],
441    sort_options: &[SortOptions],
442    join_constructor: &F,
443) -> Result<PlanWithKeyRequirements>
444where
445    F: Fn(
446        (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
447    ) -> Result<Arc<dyn ExecutionPlan>>,
448{
449    let parent_required = &join_plan.data;
450    let join_key_pairs = extract_join_keys(on);
451    let eq_properties = join_plan.plan.equivalence_properties();
452
453    let (
454        JoinKeyPairs {
455            left_keys,
456            right_keys,
457        },
458        positions,
459    ) = try_reorder(join_key_pairs, parent_required, eq_properties);
460
461    if let Some(positions) = positions {
462        if !positions.is_empty() {
463            let new_join_on = new_join_conditions(&left_keys, &right_keys);
464            let new_sort_options = (0..sort_options.len())
465                .map(|idx| sort_options[positions[idx]])
466                .collect();
467            join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
468        }
469    }
470
471    join_plan.children[0].data = left_keys;
472    join_plan.children[1].data = right_keys;
473    Ok(join_plan)
474}
475
476pub fn reorder_aggregate_keys(
477    mut agg_node: PlanWithKeyRequirements,
478    agg_exec: &AggregateExec,
479) -> Result<PlanWithKeyRequirements> {
480    let parent_required = &agg_node.data;
481    let output_columns = agg_exec
482        .group_expr()
483        .expr()
484        .iter()
485        .enumerate()
486        .map(|(index, (_, name))| Column::new(name, index))
487        .collect::<Vec<_>>();
488
489    let output_exprs = output_columns
490        .iter()
491        .map(|c| Arc::new(c.clone()) as _)
492        .collect::<Vec<_>>();
493
494    if parent_required.len() == output_exprs.len()
495        && agg_exec.group_expr().null_expr().is_empty()
496        && !physical_exprs_equal(&output_exprs, parent_required)
497    {
498        if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
499            if let Some(agg_exec) =
500                agg_exec.input().as_any().downcast_ref::<AggregateExec>()
501            {
502                if matches!(agg_exec.mode(), &AggregateMode::Partial) {
503                    let group_exprs = agg_exec.group_expr().expr();
504                    let new_group_exprs = positions
505                        .into_iter()
506                        .map(|idx| group_exprs[idx].clone())
507                        .collect();
508                    let partial_agg = Arc::new(AggregateExec::try_new(
509                        AggregateMode::Partial,
510                        PhysicalGroupBy::new_single(new_group_exprs),
511                        agg_exec.aggr_expr().to_vec(),
512                        agg_exec.filter_expr().to_vec(),
513                        Arc::clone(agg_exec.input()),
514                        Arc::clone(&agg_exec.input_schema),
515                    )?);
516                    // Build new group expressions that correspond to the output
517                    // of the "reordered" aggregator:
518                    let group_exprs = partial_agg.group_expr().expr();
519                    let new_group_by = PhysicalGroupBy::new_single(
520                        partial_agg
521                            .output_group_expr()
522                            .into_iter()
523                            .enumerate()
524                            .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
525                            .collect(),
526                    );
527                    let new_final_agg = Arc::new(AggregateExec::try_new(
528                        AggregateMode::FinalPartitioned,
529                        new_group_by,
530                        agg_exec.aggr_expr().to_vec(),
531                        agg_exec.filter_expr().to_vec(),
532                        Arc::clone(&partial_agg) as _,
533                        agg_exec.input_schema(),
534                    )?);
535
536                    agg_node.plan = Arc::clone(&new_final_agg) as _;
537                    agg_node.data.clear();
538                    agg_node.children = vec![PlanWithKeyRequirements::new(
539                        partial_agg as _,
540                        vec![],
541                        agg_node.children.swap_remove(0).children,
542                    )];
543
544                    // Need to create a new projection to change the expr ordering back
545                    let agg_schema = new_final_agg.schema();
546                    let mut proj_exprs = output_columns
547                        .iter()
548                        .map(|col| {
549                            let name = col.name();
550                            let index = agg_schema.index_of(name)?;
551                            Ok(ProjectionExpr {
552                                expr: Arc::new(Column::new(name, index)) as _,
553                                alias: name.to_owned(),
554                            })
555                        })
556                        .collect::<Result<Vec<_>>>()?;
557                    let agg_fields = agg_schema.fields();
558                    for (idx, field) in
559                        agg_fields.iter().enumerate().skip(output_columns.len())
560                    {
561                        let name = field.name();
562                        let plan = Arc::new(Column::new(name, idx)) as _;
563                        proj_exprs.push(ProjectionExpr {
564                            expr: plan,
565                            alias: name.clone(),
566                        })
567                    }
568                    return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
569                        PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
570                    });
571                }
572            }
573        }
574    }
575    Ok(agg_node)
576}
577
578fn shift_right_required(
579    parent_required: &[Arc<dyn PhysicalExpr>],
580    left_columns_len: usize,
581) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
582    let new_right_required = parent_required
583        .iter()
584        .filter_map(|r| {
585            r.as_any().downcast_ref::<Column>().and_then(|col| {
586                col.index()
587                    .checked_sub(left_columns_len)
588                    .map(|index| Arc::new(Column::new(col.name(), index)) as _)
589            })
590        })
591        .collect::<Vec<_>>();
592
593    // if the parent required are all coming from the right side, the requirements can be pushdown
594    (new_right_required.len() == parent_required.len()).then_some(new_right_required)
595}
596
597/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
598/// That might not match with the output partitioning of the join node's children
599/// This method will try to change the ordering of the join keys to match with the
600/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
601/// match with one, either the left side or the right side.
602///
603/// Example:
604///     TopJoin on (a, b, c)
605///         bottom left join on(b, a, c)
606///         bottom right join on(c, b, a)
607///
608///  Will be adjusted to:
609///     TopJoin on (b, a, c)
610///         bottom left join on(b, a, c)
611///         bottom right join on(c, b, a)
612///
613/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
614/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
615/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
616/// and then can't apply the Top-Down reordering process.
617pub fn reorder_join_keys_to_inputs(
618    plan: Arc<dyn ExecutionPlan>,
619) -> Result<Arc<dyn ExecutionPlan>> {
620    let plan_any = plan.as_any();
621    if let Some(HashJoinExec {
622        left,
623        right,
624        on,
625        filter,
626        join_type,
627        projection,
628        mode,
629        null_equality,
630        ..
631    }) = plan_any.downcast_ref::<HashJoinExec>()
632    {
633        if matches!(mode, PartitionMode::Partitioned) {
634            let (join_keys, positions) = reorder_current_join_keys(
635                extract_join_keys(on),
636                Some(left.output_partitioning()),
637                Some(right.output_partitioning()),
638                left.equivalence_properties(),
639                right.equivalence_properties(),
640            );
641            if positions.is_some_and(|idxs| !idxs.is_empty()) {
642                let JoinKeyPairs {
643                    left_keys,
644                    right_keys,
645                } = join_keys;
646                let new_join_on = new_join_conditions(&left_keys, &right_keys);
647                return Ok(Arc::new(HashJoinExec::try_new(
648                    Arc::clone(left),
649                    Arc::clone(right),
650                    new_join_on,
651                    filter.clone(),
652                    join_type,
653                    projection.clone(),
654                    PartitionMode::Partitioned,
655                    *null_equality,
656                )?));
657            }
658        }
659    } else if let Some(SortMergeJoinExec {
660        left,
661        right,
662        on,
663        filter,
664        join_type,
665        sort_options,
666        null_equality,
667        ..
668    }) = plan_any.downcast_ref::<SortMergeJoinExec>()
669    {
670        let (join_keys, positions) = reorder_current_join_keys(
671            extract_join_keys(on),
672            Some(left.output_partitioning()),
673            Some(right.output_partitioning()),
674            left.equivalence_properties(),
675            right.equivalence_properties(),
676        );
677        if let Some(positions) = positions {
678            if !positions.is_empty() {
679                let JoinKeyPairs {
680                    left_keys,
681                    right_keys,
682                } = join_keys;
683                let new_join_on = new_join_conditions(&left_keys, &right_keys);
684                let new_sort_options = (0..sort_options.len())
685                    .map(|idx| sort_options[positions[idx]])
686                    .collect();
687                return SortMergeJoinExec::try_new(
688                    Arc::clone(left),
689                    Arc::clone(right),
690                    new_join_on,
691                    filter.clone(),
692                    *join_type,
693                    new_sort_options,
694                    *null_equality,
695                )
696                .map(|smj| Arc::new(smj) as _);
697            }
698        }
699    }
700    Ok(plan)
701}
702
703/// Reorder the current join keys ordering based on either left partition or right partition
704fn reorder_current_join_keys(
705    join_keys: JoinKeyPairs,
706    left_partition: Option<&Partitioning>,
707    right_partition: Option<&Partitioning>,
708    left_equivalence_properties: &EquivalenceProperties,
709    right_equivalence_properties: &EquivalenceProperties,
710) -> (JoinKeyPairs, Option<Vec<usize>>) {
711    match (left_partition, right_partition) {
712        (Some(Partitioning::Hash(left_exprs, _)), _) => {
713            match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
714                (join_keys, None) => reorder_current_join_keys(
715                    join_keys,
716                    None,
717                    right_partition,
718                    left_equivalence_properties,
719                    right_equivalence_properties,
720                ),
721                result => result,
722            }
723        }
724        (_, Some(Partitioning::Hash(right_exprs, _))) => {
725            try_reorder(join_keys, right_exprs, right_equivalence_properties)
726        }
727        _ => (join_keys, None),
728    }
729}
730
731fn try_reorder(
732    join_keys: JoinKeyPairs,
733    expected: &[Arc<dyn PhysicalExpr>],
734    equivalence_properties: &EquivalenceProperties,
735) -> (JoinKeyPairs, Option<Vec<usize>>) {
736    let eq_groups = equivalence_properties.eq_group();
737    let mut normalized_expected = vec![];
738    let mut normalized_left_keys = vec![];
739    let mut normalized_right_keys = vec![];
740    if join_keys.left_keys.len() != expected.len() {
741        return (join_keys, None);
742    }
743    if physical_exprs_equal(expected, &join_keys.left_keys)
744        || physical_exprs_equal(expected, &join_keys.right_keys)
745    {
746        return (join_keys, Some(vec![]));
747    } else if !equivalence_properties.eq_group().is_empty() {
748        normalized_expected = expected
749            .iter()
750            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
751            .collect();
752
753        normalized_left_keys = join_keys
754            .left_keys
755            .iter()
756            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
757            .collect();
758
759        normalized_right_keys = join_keys
760            .right_keys
761            .iter()
762            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
763            .collect();
764
765        if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
766            || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
767        {
768            return (join_keys, Some(vec![]));
769        }
770    }
771
772    let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
773        .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
774        .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
775        .or_else(|| {
776            expected_expr_positions(&normalized_right_keys, &normalized_expected)
777        })
778    else {
779        return (join_keys, None);
780    };
781
782    let mut new_left_keys = vec![];
783    let mut new_right_keys = vec![];
784    for pos in positions.iter() {
785        new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
786        new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
787    }
788    let pairs = JoinKeyPairs {
789        left_keys: new_left_keys,
790        right_keys: new_right_keys,
791    };
792
793    (pairs, Some(positions))
794}
795
796/// Return the expected expressions positions.
797/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
798///
799/// This method will return a Vec [3, 0, 1, 2]
800fn expected_expr_positions(
801    current: &[Arc<dyn PhysicalExpr>],
802    expected: &[Arc<dyn PhysicalExpr>],
803) -> Option<Vec<usize>> {
804    if current.is_empty() || expected.is_empty() {
805        return None;
806    }
807    let mut indexes: Vec<usize> = vec![];
808    let mut current = current.to_vec();
809    for expr in expected.iter() {
810        // Find the position of the expected expr in the current expressions
811        if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
812            current[expected_position] = Arc::new(NoOp::new());
813            indexes.push(expected_position);
814        } else {
815            return None;
816        }
817    }
818    Some(indexes)
819}
820
821fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
822    let (left_keys, right_keys) = on
823        .iter()
824        .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
825        .unzip();
826    JoinKeyPairs {
827        left_keys,
828        right_keys,
829    }
830}
831
832fn new_join_conditions(
833    new_left_keys: &[Arc<dyn PhysicalExpr>],
834    new_right_keys: &[Arc<dyn PhysicalExpr>],
835) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
836    new_left_keys
837        .iter()
838        .zip(new_right_keys.iter())
839        .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
840        .collect()
841}
842
843/// Adds RoundRobin repartition operator to the plan increase parallelism.
844///
845/// # Arguments
846///
847/// * `input`: Current node.
848/// * `n_target`: desired target partition number, if partition number of the
849///   current executor is less than this value. Partition number will be increased.
850///
851/// # Returns
852///
853/// A [`Result`] object that contains new execution plan where the desired
854/// partition number is achieved by adding a RoundRobin repartition.
855fn add_roundrobin_on_top(
856    input: DistributionContext,
857    n_target: usize,
858) -> Result<DistributionContext> {
859    // Adding repartition is helpful:
860    if input.plan.output_partitioning().partition_count() < n_target {
861        // When there is an existing ordering, we preserve ordering
862        // during repartition. This will be un-done in the future
863        // If any of the following conditions is true
864        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
865        // - Usage of order preserving variants is not desirable
866        // (determined by flag `config.optimizer.prefer_existing_sort`)
867        let partitioning = Partitioning::RoundRobinBatch(n_target);
868        let repartition =
869            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
870                .with_preserve_order();
871
872        let new_plan = Arc::new(repartition) as _;
873
874        Ok(DistributionContext::new(new_plan, true, vec![input]))
875    } else {
876        // Partition is not helpful, we already have desired number of partitions.
877        Ok(input)
878    }
879}
880
881/// Adds a hash repartition operator:
882/// - to increase parallelism, and/or
883/// - to satisfy requirements of the subsequent operators.
884///
885/// Repartition(Hash) is added on top of operator `input`.
886///
887/// # Arguments
888///
889/// * `input`: Current node.
890/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
891/// * `n_target`: desired target partition number, if partition number of the
892///   current executor is less than this value. Partition number will be increased.
893///
894/// # Returns
895///
896/// A [`Result`] object that contains new execution plan where the desired
897/// distribution is satisfied by adding a Hash repartition.
898fn add_hash_on_top(
899    input: DistributionContext,
900    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
901    n_target: usize,
902) -> Result<DistributionContext> {
903    // Early return if hash repartition is unnecessary
904    // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
905    if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
906        return Ok(input);
907    }
908
909    let dist = Distribution::HashPartitioned(hash_exprs);
910    let satisfied = input
911        .plan
912        .output_partitioning()
913        .satisfy(&dist, input.plan.equivalence_properties());
914
915    // Add hash repartitioning when:
916    // - The hash distribution requirement is not satisfied, or
917    // - We can increase parallelism by adding hash partitioning.
918    if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
919        // When there is an existing ordering, we preserve ordering during
920        // repartition. This will be rolled back in the future if any of the
921        // following conditions is true:
922        // - Preserving ordering is not helpful in terms of satisfying ordering
923        //   requirements.
924        // - Usage of order preserving variants is not desirable (per the flag
925        //   `config.optimizer.prefer_existing_sort`).
926        let partitioning = dist.create_partitioning(n_target);
927        let repartition =
928            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
929                .with_preserve_order();
930        let plan = Arc::new(repartition) as _;
931
932        return Ok(DistributionContext::new(plan, true, vec![input]));
933    }
934
935    Ok(input)
936}
937
938/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
939/// on top of the given plan node to satisfy a single partition requirement
940/// while preserving ordering constraints.
941///
942/// # Parameters
943///
944/// * `input`: Current node.
945///
946/// # Returns
947///
948/// Updated node with an execution plan, where the desired single distribution
949/// requirement is satisfied.
950fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
951    // Apply only when the partition count is larger than one.
952    if input.plan.output_partitioning().partition_count() > 1 {
953        // When there is an existing ordering, we preserve ordering
954        // when decreasing partitions. This will be un-done in the future
955        // if any of the following conditions is true
956        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
957        // - Usage of order preserving variants is not desirable
958        // (determined by flag `config.optimizer.prefer_existing_sort`)
959        let new_plan = if let Some(req) = input.plan.output_ordering() {
960            Arc::new(SortPreservingMergeExec::new(
961                req.clone(),
962                Arc::clone(&input.plan),
963            )) as _
964        } else {
965            // If there is no input order, we can simply coalesce partitions:
966            Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
967        };
968
969        DistributionContext::new(new_plan, true, vec![input])
970    } else {
971        input
972    }
973}
974
975/// Updates the physical plan inside [`DistributionContext`] so that distribution
976/// changing operators are removed from the top. If they are necessary, they will
977/// be added in subsequent stages.
978///
979/// Assume that following plan is given:
980/// ```text
981/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
982/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
983/// "    DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
984/// ```
985///
986/// Since `RepartitionExec`s change the distribution, this function removes
987/// them and returns following plan:
988///
989/// ```text
990/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
991/// ```
992fn remove_dist_changing_operators(
993    mut distribution_context: DistributionContext,
994) -> Result<DistributionContext> {
995    while is_repartition(&distribution_context.plan)
996        || is_coalesce_partitions(&distribution_context.plan)
997        || is_sort_preserving_merge(&distribution_context.plan)
998    {
999        // All of above operators have a single child. First child is only child.
1000        // Remove any distribution changing operators at the beginning:
1001        distribution_context = distribution_context.children.swap_remove(0);
1002        // Note that they will be re-inserted later on if necessary or helpful.
1003    }
1004
1005    Ok(distribution_context)
1006}
1007
1008/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
1009///
1010/// Assume that following plan is given:
1011/// ```text
1012/// "SortPreservingMergeExec: \[a@0 ASC]"
1013/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1014/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1015/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1016/// ```
1017///
1018/// This function converts plan above to the following:
1019///
1020/// ```text
1021/// "CoalescePartitionsExec"
1022/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1023/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1024/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1025/// ```
1026pub fn replace_order_preserving_variants(
1027    mut context: DistributionContext,
1028) -> Result<DistributionContext> {
1029    context.children = context
1030        .children
1031        .into_iter()
1032        .map(|child| {
1033            if child.data {
1034                replace_order_preserving_variants(child)
1035            } else {
1036                Ok(child)
1037            }
1038        })
1039        .collect::<Result<Vec<_>>>()?;
1040
1041    if is_sort_preserving_merge(&context.plan) {
1042        let child_plan = Arc::clone(&context.children[0].plan);
1043        context.plan = Arc::new(
1044            CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1045        );
1046        return Ok(context);
1047    } else if let Some(repartition) =
1048        context.plan.as_any().downcast_ref::<RepartitionExec>()
1049    {
1050        if repartition.preserve_order() {
1051            context.plan = Arc::new(RepartitionExec::try_new(
1052                Arc::clone(&context.children[0].plan),
1053                repartition.partitioning().clone(),
1054            )?);
1055            return Ok(context);
1056        }
1057    }
1058
1059    context.update_plan_from_children()
1060}
1061
1062/// A struct to keep track of repartition requirements for each child node.
1063struct RepartitionRequirementStatus {
1064    /// The distribution requirement for the node.
1065    requirement: Distribution,
1066    /// Designates whether round robin partitioning is theoretically beneficial;
1067    /// i.e. the operator can actually utilize parallelism.
1068    roundrobin_beneficial: bool,
1069    /// Designates whether round robin partitioning is beneficial according to
1070    /// the statistical information we have on the number of rows.
1071    roundrobin_beneficial_stats: bool,
1072    /// Designates whether hash partitioning is necessary.
1073    hash_necessary: bool,
1074}
1075
1076/// Calculates the `RepartitionRequirementStatus` for each children to generate
1077/// consistent and sensible (in terms of performance) distribution requirements.
1078/// As an example, a hash join's left (build) child might produce
1079///
1080/// ```text
1081/// RepartitionRequirementStatus {
1082///     ..,
1083///     hash_necessary: true
1084/// }
1085/// ```
1086///
1087/// while its right (probe) child might have very few rows and produce:
1088///
1089/// ```text
1090/// RepartitionRequirementStatus {
1091///     ..,
1092///     hash_necessary: false
1093/// }
1094/// ```
1095///
1096/// These statuses are not consistent as all children should agree on hash
1097/// partitioning. This function aligns the statuses to generate consistent
1098/// hash partitions for each children. After alignment, the right child's
1099/// status would turn into:
1100///
1101/// ```text
1102/// RepartitionRequirementStatus {
1103///     ..,
1104///     hash_necessary: true
1105/// }
1106/// ```
1107fn get_repartition_requirement_status(
1108    plan: &Arc<dyn ExecutionPlan>,
1109    batch_size: usize,
1110    should_use_estimates: bool,
1111) -> Result<Vec<RepartitionRequirementStatus>> {
1112    let mut needs_alignment = false;
1113    let children = plan.children();
1114    let rr_beneficial = plan.benefits_from_input_partitioning();
1115    let requirements = plan.required_input_distribution();
1116    let mut repartition_status_flags = vec![];
1117    for (child, requirement, roundrobin_beneficial) in
1118        izip!(children.into_iter(), requirements, rr_beneficial)
1119    {
1120        // Decide whether adding a round robin is beneficial depending on
1121        // the statistical information we have on the number of rows:
1122        let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1123        {
1124            Precision::Exact(n_rows) => n_rows > batch_size,
1125            Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1126            Precision::Absent => true,
1127        };
1128        let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1129        // Hash re-partitioning is necessary when the input has more than one
1130        // partitions:
1131        let multi_partitions = child.output_partitioning().partition_count() > 1;
1132        let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1133        needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1134        repartition_status_flags.push((
1135            is_hash,
1136            RepartitionRequirementStatus {
1137                requirement,
1138                roundrobin_beneficial,
1139                roundrobin_beneficial_stats,
1140                hash_necessary: is_hash && multi_partitions,
1141            },
1142        ));
1143    }
1144    // Align hash necessary flags for hash partitions to generate consistent
1145    // hash partitions at each children:
1146    if needs_alignment {
1147        // When there is at least one hash requirement that is necessary or
1148        // beneficial according to statistics, make all children require hash
1149        // repartitioning:
1150        for (is_hash, status) in &mut repartition_status_flags {
1151            if *is_hash {
1152                status.hash_necessary = true;
1153            }
1154        }
1155    }
1156    Ok(repartition_status_flags
1157        .into_iter()
1158        .map(|(_, status)| status)
1159        .collect())
1160}
1161
1162/// This function checks whether we need to add additional data exchange
1163/// operators to satisfy distribution requirements. Since this function
1164/// takes care of such requirements, we should avoid manually adding data
1165/// exchange operators in other places.
1166///
1167/// This function is intended to be used in a bottom up traversal, as it
1168/// can first repartition (or newly partition) at the datasources -- these
1169/// source partitions may be later repartitioned with additional data exchange operators.
1170pub fn ensure_distribution(
1171    dist_context: DistributionContext,
1172    config: &ConfigOptions,
1173) -> Result<Transformed<DistributionContext>> {
1174    let dist_context = update_children(dist_context)?;
1175
1176    if dist_context.plan.children().is_empty() {
1177        return Ok(Transformed::no(dist_context));
1178    }
1179
1180    let target_partitions = config.execution.target_partitions;
1181    // When `false`, round robin repartition will not be added to increase parallelism
1182    let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1183    let repartition_file_scans = config.optimizer.repartition_file_scans;
1184    let batch_size = config.execution.batch_size;
1185    let should_use_estimates = config
1186        .execution
1187        .use_row_number_estimates_to_optimize_partitioning;
1188    let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1189        && matches!(
1190            dist_context.plan.pipeline_behavior(),
1191            EmissionType::Incremental | EmissionType::Both
1192        );
1193    // Use order preserving variants either of the conditions true
1194    // - it is desired according to config
1195    // - when plan is unbounded
1196    // - when it is pipeline friendly (can incrementally produce results)
1197    let order_preserving_variants_desirable =
1198        unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1199
1200    // Remove unnecessary repartition from the physical plan if any
1201    let DistributionContext {
1202        mut plan,
1203        data,
1204        children,
1205    } = remove_dist_changing_operators(dist_context)?;
1206
1207    if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
1208        if let Some(updated_window) = get_best_fitting_window(
1209            exec.window_expr(),
1210            exec.input(),
1211            &exec.partition_keys(),
1212        )? {
1213            plan = updated_window;
1214        }
1215    } else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
1216        if let Some(updated_window) = get_best_fitting_window(
1217            exec.window_expr(),
1218            exec.input(),
1219            &exec.partition_keys(),
1220        )? {
1221            plan = updated_window;
1222        }
1223    };
1224
1225    let repartition_status_flags =
1226        get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1227    // This loop iterates over all the children to:
1228    // - Increase parallelism for every child if it is beneficial.
1229    // - Satisfy the distribution requirements of every child, if it is not
1230    //   already satisfied.
1231    // We store the updated children in `new_children`.
1232    let children = izip!(
1233        children.into_iter(),
1234        plan.required_input_ordering(),
1235        plan.maintains_input_order(),
1236        repartition_status_flags.into_iter()
1237    )
1238    .map(
1239        |(
1240            mut child,
1241            required_input_ordering,
1242            maintains,
1243            RepartitionRequirementStatus {
1244                requirement,
1245                roundrobin_beneficial,
1246                roundrobin_beneficial_stats,
1247                hash_necessary,
1248            },
1249        )| {
1250            let add_roundrobin = enable_round_robin
1251                // Operator benefits from partitioning (e.g. filter):
1252                && roundrobin_beneficial
1253                && roundrobin_beneficial_stats
1254                // Unless partitioning increases the partition count, it is not beneficial:
1255                && child.plan.output_partitioning().partition_count() < target_partitions;
1256
1257            // When `repartition_file_scans` is set, attempt to increase
1258            // parallelism at the source.
1259            //
1260            // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1261            // then no repartitioning will have occurred. As the default implementation returns None, it is only
1262            // specific physical plan nodes, such as certain datasources, which are repartitioned.
1263            if repartition_file_scans && roundrobin_beneficial_stats {
1264                if let Some(new_child) =
1265                    child.plan.repartitioned(target_partitions, config)?
1266                {
1267                    child.plan = new_child;
1268                }
1269            }
1270
1271            // Satisfy the distribution requirement if it is unmet.
1272            match &requirement {
1273                Distribution::SinglePartition => {
1274                    child = add_merge_on_top(child);
1275                }
1276                Distribution::HashPartitioned(exprs) => {
1277                    if add_roundrobin {
1278                        // Add round-robin repartitioning on top of the operator
1279                        // to increase parallelism.
1280                        child = add_roundrobin_on_top(child, target_partitions)?;
1281                    }
1282                    // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1283                    if hash_necessary {
1284                        child =
1285                            add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
1286                    }
1287                }
1288                Distribution::UnspecifiedDistribution => {
1289                    if add_roundrobin {
1290                        // Add round-robin repartitioning on top of the operator
1291                        // to increase parallelism.
1292                        child = add_roundrobin_on_top(child, target_partitions)?;
1293                    }
1294                }
1295            };
1296
1297            // There is an ordering requirement of the operator:
1298            if let Some(required_input_ordering) = required_input_ordering {
1299                // Either:
1300                // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1301                // - using order preserving variant is not desirable.
1302                let sort_req = required_input_ordering.into_single();
1303                let ordering_satisfied = child
1304                    .plan
1305                    .equivalence_properties()
1306                    .ordering_satisfy_requirement(sort_req.clone())?;
1307
1308                if (!ordering_satisfied || !order_preserving_variants_desirable)
1309                    && child.data
1310                {
1311                    child = replace_order_preserving_variants(child)?;
1312                    // If ordering requirements were satisfied before repartitioning,
1313                    // make sure ordering requirements are still satisfied after.
1314                    if ordering_satisfied {
1315                        // Make sure to satisfy ordering requirement:
1316                        child = add_sort_above_with_check(
1317                            child,
1318                            sort_req,
1319                            plan.as_any()
1320                                .downcast_ref::<OutputRequirementExec>()
1321                                .map(|output| output.fetch())
1322                                .unwrap_or(None),
1323                        )?;
1324                    }
1325                }
1326                // Stop tracking distribution changing operators
1327                child.data = false;
1328            } else {
1329                // no ordering requirement
1330                match requirement {
1331                    // Operator requires specific distribution.
1332                    Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1333                        // Since there is no ordering requirement, preserving ordering is pointless
1334                        child = replace_order_preserving_variants(child)?;
1335                    }
1336                    Distribution::UnspecifiedDistribution => {
1337                        // Since ordering is lost, trying to preserve ordering is pointless
1338                        if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1339                            child = replace_order_preserving_variants(child)?;
1340                        }
1341                    }
1342                }
1343            }
1344            Ok(child)
1345        },
1346    )
1347    .collect::<Result<Vec<_>>>()?;
1348
1349    let children_plans = children
1350        .iter()
1351        .map(|c| Arc::clone(&c.plan))
1352        .collect::<Vec<_>>();
1353
1354    plan = if plan.as_any().is::<UnionExec>()
1355        && !config.optimizer.prefer_existing_union
1356        && can_interleave(children_plans.iter())
1357    {
1358        // Add a special case for [`UnionExec`] since we want to "bubble up"
1359        // hash-partitioned data. So instead of
1360        //
1361        // Agg:
1362        //   Repartition (hash):
1363        //     Union:
1364        //       - Agg:
1365        //           Repartition (hash):
1366        //             Data
1367        //       - Agg:
1368        //           Repartition (hash):
1369        //             Data
1370        //
1371        // we can use:
1372        //
1373        // Agg:
1374        //   Interleave:
1375        //     - Agg:
1376        //         Repartition (hash):
1377        //           Data
1378        //     - Agg:
1379        //         Repartition (hash):
1380        //           Data
1381        Arc::new(InterleaveExec::try_new(children_plans)?)
1382    } else {
1383        plan.with_new_children(children_plans)?
1384    };
1385
1386    Ok(Transformed::yes(DistributionContext::new(
1387        plan, data, children,
1388    )))
1389}
1390
1391/// Keeps track of distribution changing operators (like `RepartitionExec`,
1392/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1393/// Using this information, we can optimize distribution of the plan if/when
1394/// necessary.
1395pub type DistributionContext = PlanContext<bool>;
1396
1397fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1398    for child_context in dist_context.children.iter_mut() {
1399        let child_plan_any = child_context.plan.as_any();
1400        child_context.data =
1401            if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
1402                !matches!(
1403                    repartition.partitioning(),
1404                    Partitioning::UnknownPartitioning(_)
1405                )
1406            } else {
1407                child_plan_any.is::<SortPreservingMergeExec>()
1408                    || child_plan_any.is::<CoalescePartitionsExec>()
1409                    || child_context.plan.children().is_empty()
1410                    || child_context.children[0].data
1411                    || child_context
1412                        .plan
1413                        .required_input_distribution()
1414                        .iter()
1415                        .zip(child_context.children.iter())
1416                        .any(|(required_dist, child_context)| {
1417                            child_context.data
1418                                && matches!(
1419                                    required_dist,
1420                                    Distribution::UnspecifiedDistribution
1421                                )
1422                        })
1423            }
1424    }
1425
1426    dist_context.data = false;
1427    Ok(dist_context)
1428}
1429
1430// See tests in datafusion/core/tests/physical_optimizer