Skip to main content

datafusion_physical_optimizer/
enforce_distribution.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::any::Any;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28use crate::optimizer::PhysicalOptimizerRule;
29use crate::output_requirements::OutputRequirementExec;
30use crate::utils::{
31    add_sort_above_with_check, is_coalesce_partitions, is_repartition,
32    is_sort_preserving_merge,
33};
34
35use arrow::compute::SortOptions;
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::error::Result;
38use datafusion_common::stats::Precision;
39use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
40use datafusion_expr::logical_plan::{Aggregate, JoinType};
41use datafusion_physical_expr::expressions::{Column, NoOp};
42use datafusion_physical_expr::utils::map_columns_before_projection;
43use datafusion_physical_expr::{
44    EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
45};
46use datafusion_physical_plan::ExecutionPlanProperties;
47use datafusion_physical_plan::aggregates::{
48    AggregateExec, AggregateMode, PhysicalGroupBy,
49};
50use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
51use datafusion_physical_plan::execution_plan::EmissionType;
52use datafusion_physical_plan::joins::{
53    CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
54};
55use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
56use datafusion_physical_plan::repartition::RepartitionExec;
57use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
58use datafusion_physical_plan::tree_node::PlanContext;
59use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
60use datafusion_physical_plan::windows::WindowAggExec;
61use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
62use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
63
64use itertools::izip;
65
66/// The `EnforceDistribution` rule ensures that distribution requirements are
67/// met. In doing so, this rule will increase the parallelism in the plan by
68/// introducing repartitioning operators to the physical plan.
69///
70/// For example, given an input such as:
71///
72///
73/// ```text
74/// ┌─────────────────────────────────┐
75/// │                                 │
76/// │          ExecutionPlan          │
77/// │                                 │
78/// └─────────────────────────────────┘
79///             ▲         ▲
80///             │         │
81///       ┌─────┘         └─────┐
82///       │                     │
83///       │                     │
84///       │                     │
85/// ┌───────────┐         ┌───────────┐
86/// │           │         │           │
87/// │ batch A1  │         │ batch B1  │
88/// │           │         │           │
89/// ├───────────┤         ├───────────┤
90/// │           │         │           │
91/// │ batch A2  │         │ batch B2  │
92/// │           │         │           │
93/// ├───────────┤         ├───────────┤
94/// │           │         │           │
95/// │ batch A3  │         │ batch B3  │
96/// │           │         │           │
97/// └───────────┘         └───────────┘
98///
99///      Input                 Input
100///        A                     B
101/// ```
102///
103/// This rule will attempt to add a `RepartitionExec` to increase parallelism
104/// (to 3, in this case) and create the following arrangement:
105///
106/// ```text
107///     ┌─────────────────────────────────┐
108///     │                                 │
109///     │          ExecutionPlan          │
110///     │                                 │
111///     └─────────────────────────────────┘
112///               ▲      ▲       ▲            Input now has 3
113///               │      │       │             partitions
114///       ┌───────┘      │       └───────┐
115///       │              │               │
116///       │              │               │
117/// ┌───────────┐  ┌───────────┐   ┌───────────┐
118/// │           │  │           │   │           │
119/// │ batch A1  │  │ batch A3  │   │ batch B3  │
120/// │           │  │           │   │           │
121/// ├───────────┤  ├───────────┤   ├───────────┤
122/// │           │  │           │   │           │
123/// │ batch B2  │  │ batch B1  │   │ batch A2  │
124/// │           │  │           │   │           │
125/// └───────────┘  └───────────┘   └───────────┘
126///       ▲              ▲               ▲
127///       │              │               │
128///       └─────────┐    │    ┌──────────┘
129///                 │    │    │
130///                 │    │    │
131///     ┌─────────────────────────────────┐   batches are
132///     │       RepartitionExec(3)        │   repartitioned
133///     │           RoundRobin            │
134///     │                                 │
135///     └─────────────────────────────────┘
136///                 ▲         ▲
137///                 │         │
138///           ┌─────┘         └─────┐
139///           │                     │
140///           │                     │
141///           │                     │
142///     ┌───────────┐         ┌───────────┐
143///     │           │         │           │
144///     │ batch A1  │         │ batch B1  │
145///     │           │         │           │
146///     ├───────────┤         ├───────────┤
147///     │           │         │           │
148///     │ batch A2  │         │ batch B2  │
149///     │           │         │           │
150///     ├───────────┤         ├───────────┤
151///     │           │         │           │
152///     │ batch A3  │         │ batch B3  │
153///     │           │         │           │
154///     └───────────┘         └───────────┘
155///
156///
157///      Input                 Input
158///        A                     B
159/// ```
160///
161/// The `EnforceDistribution` rule
162/// - is idempotent; i.e. it can be applied multiple times, each time producing
163///   the same result.
164/// - always produces a valid plan in terms of distribution requirements. Its
165///   input plan can be valid or invalid with respect to distribution requirements,
166///   but the output plan will always be valid.
167/// - produces a valid plan in terms of ordering requirements, *if* its input is
168///   a valid plan in terms of ordering requirements. If the input plan is invalid,
169///   this rule does not attempt to fix it as doing so is the responsibility of the
170///   `EnforceSorting` rule.
171///
172/// Note that distribution requirements are met in the strictest way. This may
173/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
174/// meeting the requirements in the strictest way may help avoid possible data
175/// skew in joins.
176///
177/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
178/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
179/// (a, c), (b, c), (a), (b), (c) and ( ).
180///
181/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
182/// by a HashPartition(a, b, c).
183#[derive(Default, Debug)]
184pub struct EnforceDistribution {}
185
186impl EnforceDistribution {
187    #[expect(missing_docs)]
188    pub fn new() -> Self {
189        Self {}
190    }
191}
192
193impl PhysicalOptimizerRule for EnforceDistribution {
194    fn optimize(
195        &self,
196        plan: Arc<dyn ExecutionPlan>,
197        config: &ConfigOptions,
198    ) -> Result<Arc<dyn ExecutionPlan>> {
199        let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
200
201        let adjusted = if top_down_join_key_reordering {
202            // Run a top-down process to adjust input key ordering recursively
203            let plan_requirements = PlanWithKeyRequirements::new_default(plan);
204            let adjusted = plan_requirements
205                .transform_down(adjust_input_keys_ordering)
206                .data()?;
207            adjusted.plan
208        } else {
209            // Run a bottom-up process
210            plan.transform_up(|plan| {
211                Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
212            })
213            .data()?
214        };
215
216        let distribution_context = DistributionContext::new_default(adjusted);
217        // Distribution enforcement needs to be applied bottom-up.
218        let distribution_context = distribution_context
219            .transform_up(|distribution_context| {
220                ensure_distribution(distribution_context, config)
221            })
222            .data()?;
223        Ok(distribution_context.plan)
224    }
225
226    fn name(&self) -> &str {
227        "EnforceDistribution"
228    }
229
230    fn schema_check(&self) -> bool {
231        true
232    }
233}
234
235#[derive(Debug, Clone)]
236struct JoinKeyPairs {
237    left_keys: Vec<Arc<dyn PhysicalExpr>>,
238    right_keys: Vec<Arc<dyn PhysicalExpr>>,
239}
240
241/// Keeps track of parent required key orderings.
242pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
243
244/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
245/// That might not match with the output partitioning of the join node's children
246/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
247///
248/// Example:
249///     TopJoin on (a, b, c)
250///         bottom left join on(b, a, c)
251///         bottom right join on(c, b, a)
252///
253///  Will be adjusted to:
254///     TopJoin on (a, b, c)
255///         bottom left join on(a, b, c)
256///         bottom right join on(a, b, c)
257///
258/// Example:
259///     TopJoin on (a, b, c)
260///         Agg1 group by (b, a, c)
261///         Agg2 group by (c, b, a)
262///
263/// Will be adjusted to:
264///     TopJoin on (a, b, c)
265///          Projection(b, a, c)
266///             Agg1 group by (a, b, c)
267///          Projection(c, b, a)
268///             Agg2 group by (a, b, c)
269///
270/// Following is the explanation of the reordering process:
271///
272/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
273///    Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274///    Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
275///    Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
276///
277/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
278///    Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
279///    Requirements is already satisfied, clear all the requirements, return the unchanged plan.
280///    Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
281///
282/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
283/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
284/// 5) For other types of operators, by default, pushdown the parent requirements to children.
285pub fn adjust_input_keys_ordering(
286    mut requirements: PlanWithKeyRequirements,
287) -> Result<Transformed<PlanWithKeyRequirements>> {
288    let plan = Arc::clone(&requirements.plan);
289
290    if let Some(
291        exec @ HashJoinExec {
292            left,
293            on,
294            join_type,
295            mode,
296            ..
297        },
298    ) = plan.downcast_ref::<HashJoinExec>()
299    {
300        match mode {
301            PartitionMode::Partitioned => {
302                let join_constructor = |new_conditions: (
303                    Vec<(PhysicalExprRef, PhysicalExprRef)>,
304                    Vec<SortOptions>,
305                )| {
306                    exec.builder()
307                        .with_partition_mode(PartitionMode::Partitioned)
308                        .with_on(new_conditions.0)
309                        .build_exec()
310                };
311                return reorder_partitioned_join_keys(
312                    requirements,
313                    on,
314                    &[],
315                    &join_constructor,
316                )
317                .map(Transformed::yes);
318            }
319            PartitionMode::CollectLeft => {
320                // Push down requirements to the right side
321                requirements.children[1].data = match join_type {
322                    JoinType::Inner | JoinType::Right => shift_right_required(
323                        &requirements.data,
324                        left.schema().fields().len(),
325                    )
326                    .unwrap_or_default(),
327                    JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
328                        requirements.data.clone()
329                    }
330                    JoinType::Left
331                    | JoinType::LeftSemi
332                    | JoinType::LeftAnti
333                    | JoinType::Full
334                    | JoinType::LeftMark => vec![],
335                };
336            }
337            PartitionMode::Auto => {
338                // Can not satisfy, clear the current requirements and generate new empty requirements
339                requirements.data.clear();
340            }
341        }
342    } else if let Some(CrossJoinExec { left, .. }) = plan.downcast_ref::<CrossJoinExec>()
343    {
344        let left_columns_len = left.schema().fields().len();
345        // Push down requirements to the right side
346        requirements.children[1].data =
347            shift_right_required(&requirements.data, left_columns_len)
348                .unwrap_or_default();
349    } else if let Some(SortMergeJoinExec {
350        left,
351        right,
352        on,
353        filter,
354        join_type,
355        sort_options,
356        null_equality,
357        ..
358    }) = plan.downcast_ref::<SortMergeJoinExec>()
359    {
360        let join_constructor = |new_conditions: (
361            Vec<(PhysicalExprRef, PhysicalExprRef)>,
362            Vec<SortOptions>,
363        )| {
364            SortMergeJoinExec::try_new(
365                Arc::clone(left),
366                Arc::clone(right),
367                new_conditions.0,
368                filter.clone(),
369                *join_type,
370                new_conditions.1,
371                *null_equality,
372            )
373            .map(|e| Arc::new(e) as _)
374        };
375        return reorder_partitioned_join_keys(
376            requirements,
377            on,
378            sort_options,
379            &join_constructor,
380        )
381        .map(Transformed::yes);
382    } else if let Some(aggregate_exec) = plan.downcast_ref::<AggregateExec>() {
383        if !requirements.data.is_empty() {
384            if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
385                return reorder_aggregate_keys(requirements, aggregate_exec)
386                    .map(Transformed::yes);
387            } else {
388                requirements.data.clear();
389            }
390        } else {
391            // Keep everything unchanged
392            return Ok(Transformed::no(requirements));
393        }
394    } else if let Some(proj) = plan.downcast_ref::<ProjectionExec>() {
395        let expr = proj.expr();
396        // For Projection, we need to transform the requirements to the columns before the Projection
397        // And then to push down the requirements
398        // Construct a mapping from new name to the original Column
399        let proj_exprs: Vec<_> = expr
400            .iter()
401            .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
402            .collect();
403        let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
404        if new_required.len() == requirements.data.len() {
405            requirements.children[0].data = new_required;
406        } else {
407            // Can not satisfy, clear the current requirements and generate new empty requirements
408            requirements.data.clear();
409        }
410    } else if plan.is::<RepartitionExec>()
411        || plan.is::<CoalescePartitionsExec>()
412        || plan.is::<WindowAggExec>()
413    {
414        requirements.data.clear();
415    } else if requirements.data.is_empty() {
416        // No requirements to push down and no plan changes — skip rebuild.
417        return Ok(Transformed::no(requirements));
418    } else {
419        // By default, push down the parent requirements to children
420        for child in requirements.children.iter_mut() {
421            child.data.clone_from(&requirements.data);
422        }
423    }
424    Ok(Transformed::yes(requirements))
425}
426
427pub fn reorder_partitioned_join_keys<F>(
428    mut join_plan: PlanWithKeyRequirements,
429    on: &[(PhysicalExprRef, PhysicalExprRef)],
430    sort_options: &[SortOptions],
431    join_constructor: &F,
432) -> Result<PlanWithKeyRequirements>
433where
434    F: Fn(
435        (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
436    ) -> Result<Arc<dyn ExecutionPlan>>,
437{
438    let parent_required = &join_plan.data;
439    let join_key_pairs = extract_join_keys(on);
440    let eq_properties = join_plan.plan.equivalence_properties();
441
442    let (
443        JoinKeyPairs {
444            left_keys,
445            right_keys,
446        },
447        positions,
448    ) = try_reorder(join_key_pairs, parent_required, eq_properties);
449
450    if let Some(positions) = positions
451        && !positions.is_empty()
452    {
453        let new_join_on = new_join_conditions(&left_keys, &right_keys);
454        let new_sort_options = (0..sort_options.len())
455            .map(|idx| sort_options[positions[idx]])
456            .collect();
457        join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
458    }
459
460    join_plan.children[0].data = left_keys;
461    join_plan.children[1].data = right_keys;
462    Ok(join_plan)
463}
464
465pub fn reorder_aggregate_keys(
466    mut agg_node: PlanWithKeyRequirements,
467    agg_exec: &AggregateExec,
468) -> Result<PlanWithKeyRequirements> {
469    let parent_required = &agg_node.data;
470    let output_columns = agg_exec
471        .group_expr()
472        .expr()
473        .iter()
474        .enumerate()
475        .map(|(index, (_, name))| Column::new(name, index))
476        .collect::<Vec<_>>();
477
478    let output_exprs = output_columns
479        .iter()
480        .map(|c| Arc::new(c.clone()) as _)
481        .collect::<Vec<_>>();
482
483    if parent_required.len() == output_exprs.len()
484        && agg_exec.group_expr().null_expr().is_empty()
485        && !physical_exprs_equal(&output_exprs, parent_required)
486        && let Some(positions) = expected_expr_positions(&output_exprs, parent_required)
487        && let Some(agg_exec) = agg_exec.input().downcast_ref::<AggregateExec>()
488        && *agg_exec.mode() == AggregateMode::Partial
489    {
490        let group_exprs = agg_exec.group_expr().expr();
491        let new_group_exprs = positions
492            .into_iter()
493            .map(|idx| group_exprs[idx].clone())
494            .collect();
495        let partial_agg = Arc::new(AggregateExec::try_new(
496            AggregateMode::Partial,
497            PhysicalGroupBy::new_single(new_group_exprs),
498            agg_exec.aggr_expr().to_vec(),
499            agg_exec.filter_expr().to_vec(),
500            Arc::clone(agg_exec.input()),
501            Arc::clone(&agg_exec.input_schema),
502        )?);
503        // Build new group expressions that correspond to the output
504        // of the "reordered" aggregator:
505        let group_exprs = partial_agg.group_expr().expr();
506        let new_group_by = PhysicalGroupBy::new_single(
507            partial_agg
508                .output_group_expr()
509                .into_iter()
510                .enumerate()
511                .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
512                .collect(),
513        );
514        let new_final_agg = Arc::new(AggregateExec::try_new(
515            AggregateMode::FinalPartitioned,
516            new_group_by,
517            agg_exec.aggr_expr().to_vec(),
518            agg_exec.filter_expr().to_vec(),
519            Arc::clone(&partial_agg) as _,
520            agg_exec.input_schema(),
521        )?);
522
523        agg_node.plan = Arc::clone(&new_final_agg) as _;
524        agg_node.data.clear();
525        agg_node.children = vec![PlanWithKeyRequirements::new(
526            partial_agg as _,
527            vec![],
528            agg_node.children.swap_remove(0).children,
529        )];
530
531        // Need to create a new projection to change the expr ordering back
532        let agg_schema = new_final_agg.schema();
533        let mut proj_exprs = output_columns
534            .iter()
535            .map(|col| {
536                let name = col.name();
537                let index = agg_schema.index_of(name)?;
538                Ok(ProjectionExpr {
539                    expr: Arc::new(Column::new(name, index)) as _,
540                    alias: name.to_owned(),
541                })
542            })
543            .collect::<Result<Vec<_>>>()?;
544        let agg_fields = agg_schema.fields();
545        for (idx, field) in agg_fields.iter().enumerate().skip(output_columns.len()) {
546            let name = field.name();
547            let plan = Arc::new(Column::new(name, idx)) as _;
548            proj_exprs.push(ProjectionExpr {
549                expr: plan,
550                alias: name.clone(),
551            })
552        }
553        return ProjectionExec::try_new(proj_exprs, new_final_agg)
554            .map(|p| PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]));
555    }
556    Ok(agg_node)
557}
558
559fn shift_right_required(
560    parent_required: &[Arc<dyn PhysicalExpr>],
561    left_columns_len: usize,
562) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
563    let new_right_required = parent_required
564        .iter()
565        .filter_map(|r| {
566            (r.as_ref() as &dyn Any)
567                .downcast_ref::<Column>()
568                .and_then(|col| {
569                    col.index()
570                        .checked_sub(left_columns_len)
571                        .map(|index| Arc::new(Column::new(col.name(), index)) as _)
572                })
573        })
574        .collect::<Vec<_>>();
575
576    // if the parent required are all coming from the right side, the requirements can be pushdown
577    (new_right_required.len() == parent_required.len()).then_some(new_right_required)
578}
579
580/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
581/// That might not match with the output partitioning of the join node's children
582/// This method will try to change the ordering of the join keys to match with the
583/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
584/// match with one, either the left side or the right side.
585///
586/// Example:
587///     TopJoin on (a, b, c)
588///         bottom left join on(b, a, c)
589///         bottom right join on(c, b, a)
590///
591///  Will be adjusted to:
592///     TopJoin on (b, a, c)
593///         bottom left join on(b, a, c)
594///         bottom right join on(c, b, a)
595///
596/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
597/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
598/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
599/// and then can't apply the Top-Down reordering process.
600pub fn reorder_join_keys_to_inputs(
601    plan: Arc<dyn ExecutionPlan>,
602) -> Result<Arc<dyn ExecutionPlan>> {
603    if let Some(
604        exec @ HashJoinExec {
605            left,
606            right,
607            on,
608            mode,
609            ..
610        },
611    ) = plan.downcast_ref::<HashJoinExec>()
612    {
613        if *mode == PartitionMode::Partitioned {
614            let (join_keys, positions) = reorder_current_join_keys(
615                extract_join_keys(on),
616                Some(left.output_partitioning()),
617                Some(right.output_partitioning()),
618                left.equivalence_properties(),
619                right.equivalence_properties(),
620            );
621            if positions.is_some_and(|idxs| !idxs.is_empty()) {
622                let JoinKeyPairs {
623                    left_keys,
624                    right_keys,
625                } = join_keys;
626                let new_join_on = new_join_conditions(&left_keys, &right_keys);
627                return exec
628                    .builder()
629                    .with_partition_mode(PartitionMode::Partitioned)
630                    .with_on(new_join_on)
631                    .build_exec();
632            }
633        }
634    } else if let Some(SortMergeJoinExec {
635        left,
636        right,
637        on,
638        filter,
639        join_type,
640        sort_options,
641        null_equality,
642        ..
643    }) = plan.downcast_ref::<SortMergeJoinExec>()
644    {
645        let (join_keys, positions) = reorder_current_join_keys(
646            extract_join_keys(on),
647            Some(left.output_partitioning()),
648            Some(right.output_partitioning()),
649            left.equivalence_properties(),
650            right.equivalence_properties(),
651        );
652        if let Some(positions) = positions
653            && !positions.is_empty()
654        {
655            let JoinKeyPairs {
656                left_keys,
657                right_keys,
658            } = join_keys;
659            let new_join_on = new_join_conditions(&left_keys, &right_keys);
660            let new_sort_options = (0..sort_options.len())
661                .map(|idx| sort_options[positions[idx]])
662                .collect();
663            return SortMergeJoinExec::try_new(
664                Arc::clone(left),
665                Arc::clone(right),
666                new_join_on,
667                filter.clone(),
668                *join_type,
669                new_sort_options,
670                *null_equality,
671            )
672            .map(|smj| Arc::new(smj) as _);
673        }
674    }
675    Ok(plan)
676}
677
678/// Reorder the current join keys ordering based on either left partition or right partition
679fn reorder_current_join_keys(
680    join_keys: JoinKeyPairs,
681    left_partition: Option<&Partitioning>,
682    right_partition: Option<&Partitioning>,
683    left_equivalence_properties: &EquivalenceProperties,
684    right_equivalence_properties: &EquivalenceProperties,
685) -> (JoinKeyPairs, Option<Vec<usize>>) {
686    match (left_partition, right_partition) {
687        (Some(Partitioning::Hash(left_exprs, _)), _) => {
688            match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
689                (join_keys, None) => reorder_current_join_keys(
690                    join_keys,
691                    None,
692                    right_partition,
693                    left_equivalence_properties,
694                    right_equivalence_properties,
695                ),
696                result => result,
697            }
698        }
699        (_, Some(Partitioning::Hash(right_exprs, _))) => {
700            try_reorder(join_keys, right_exprs, right_equivalence_properties)
701        }
702        _ => (join_keys, None),
703    }
704}
705
706fn try_reorder(
707    join_keys: JoinKeyPairs,
708    expected: &[Arc<dyn PhysicalExpr>],
709    equivalence_properties: &EquivalenceProperties,
710) -> (JoinKeyPairs, Option<Vec<usize>>) {
711    let eq_groups = equivalence_properties.eq_group();
712    let mut normalized_expected = vec![];
713    let mut normalized_left_keys = vec![];
714    let mut normalized_right_keys = vec![];
715    if join_keys.left_keys.len() != expected.len() {
716        return (join_keys, None);
717    }
718    if physical_exprs_equal(expected, &join_keys.left_keys)
719        || physical_exprs_equal(expected, &join_keys.right_keys)
720    {
721        return (join_keys, Some(vec![]));
722    } else if !equivalence_properties.eq_group().is_empty() {
723        normalized_expected = expected
724            .iter()
725            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
726            .collect();
727
728        normalized_left_keys = join_keys
729            .left_keys
730            .iter()
731            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
732            .collect();
733
734        normalized_right_keys = join_keys
735            .right_keys
736            .iter()
737            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
738            .collect();
739
740        if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
741            || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
742        {
743            return (join_keys, Some(vec![]));
744        }
745    }
746
747    let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
748        .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
749        .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
750        .or_else(|| {
751            expected_expr_positions(&normalized_right_keys, &normalized_expected)
752        })
753    else {
754        return (join_keys, None);
755    };
756
757    let mut new_left_keys = vec![];
758    let mut new_right_keys = vec![];
759    for pos in positions.iter() {
760        new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
761        new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
762    }
763    let pairs = JoinKeyPairs {
764        left_keys: new_left_keys,
765        right_keys: new_right_keys,
766    };
767
768    (pairs, Some(positions))
769}
770
771/// Return the expected expressions positions.
772/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
773///
774/// This method will return a Vec [3, 0, 1, 2]
775fn expected_expr_positions(
776    current: &[Arc<dyn PhysicalExpr>],
777    expected: &[Arc<dyn PhysicalExpr>],
778) -> Option<Vec<usize>> {
779    if current.is_empty() || expected.is_empty() {
780        return None;
781    }
782    let mut indexes: Vec<usize> = vec![];
783    let mut current = current.to_vec();
784    for expr in expected.iter() {
785        // Find the position of the expected expr in the current expressions
786        if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
787            current[expected_position] = Arc::new(NoOp::new());
788            indexes.push(expected_position);
789        } else {
790            return None;
791        }
792    }
793    Some(indexes)
794}
795
796fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
797    let (left_keys, right_keys) = on
798        .iter()
799        .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
800        .unzip();
801    JoinKeyPairs {
802        left_keys,
803        right_keys,
804    }
805}
806
807fn new_join_conditions(
808    new_left_keys: &[Arc<dyn PhysicalExpr>],
809    new_right_keys: &[Arc<dyn PhysicalExpr>],
810) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
811    new_left_keys
812        .iter()
813        .zip(new_right_keys.iter())
814        .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
815        .collect()
816}
817
818/// Adds RoundRobin repartition operator to the plan increase parallelism.
819///
820/// # Arguments
821///
822/// * `input`: Current node.
823/// * `n_target`: desired target partition number, if partition number of the
824///   current executor is less than this value. Partition number will be increased.
825///
826/// # Returns
827///
828/// A [`Result`] object that contains new execution plan where the desired
829/// partition number is achieved by adding a RoundRobin repartition.
830fn add_roundrobin_on_top(
831    input: DistributionContext,
832    n_target: usize,
833) -> Result<DistributionContext> {
834    // Adding repartition is helpful:
835    if input.plan.output_partitioning().partition_count() < n_target {
836        // When there is an existing ordering, we preserve ordering
837        // during repartition. This will be un-done in the future
838        // If any of the following conditions is true
839        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
840        // - Usage of order preserving variants is not desirable
841        // (determined by flag `config.optimizer.prefer_existing_sort`)
842        let partitioning = Partitioning::RoundRobinBatch(n_target);
843        let repartition =
844            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
845                .with_preserve_order();
846
847        let new_plan = Arc::new(repartition) as _;
848
849        Ok(DistributionContext::new(new_plan, true, vec![input]))
850    } else {
851        // Partition is not helpful, we already have desired number of partitions.
852        Ok(input)
853    }
854}
855
856/// Adds a hash repartition operator:
857/// - to increase parallelism, and/or
858/// - to satisfy requirements of the subsequent operators.
859///
860/// Repartition(Hash) is added on top of operator `input`.
861///
862/// # Arguments
863///
864/// * `input`: Current node.
865/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
866/// * `n_target`: desired target partition number, if partition number of the
867///   current executor is less than this value. Partition number will be increased.
868/// * `allow_subset_satisfy_partitioning`: Whether to allow subset partitioning logic in satisfaction checks.
869///   Set to `false` for partitioned hash joins to ensure exact hash matching.
870///
871/// # Returns
872///
873/// A [`Result`] object that contains new execution plan where the desired
874/// distribution is satisfied by adding a Hash repartition.
875fn add_hash_on_top(
876    input: DistributionContext,
877    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
878    n_target: usize,
879    allow_subset_satisfy_partitioning: bool,
880) -> Result<DistributionContext> {
881    // Early return if hash repartition is unnecessary
882    // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
883    if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
884        return Ok(input);
885    }
886
887    let dist = Distribution::HashPartitioned(hash_exprs);
888    let satisfaction = input.plan.output_partitioning().satisfaction(
889        &dist,
890        input.plan.equivalence_properties(),
891        allow_subset_satisfy_partitioning,
892    );
893
894    // Add hash repartitioning when:
895    // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied
896    // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism
897    let needs_repartition = if allow_subset_satisfy_partitioning {
898        !satisfaction.is_satisfied()
899    } else {
900        !satisfaction.is_satisfied()
901            || n_target > input.plan.output_partitioning().partition_count()
902    };
903
904    if needs_repartition {
905        // When there is an existing ordering, we preserve ordering during
906        // repartition. This will be rolled back in the future if any of the
907        // following conditions is true:
908        // - Preserving ordering is not helpful in terms of satisfying ordering
909        //   requirements.
910        // - Usage of order preserving variants is not desirable (per the flag
911        //   `config.optimizer.prefer_existing_sort`).
912        let partitioning = dist.create_partitioning(n_target);
913        let repartition =
914            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
915                .with_preserve_order();
916        let plan = Arc::new(repartition) as _;
917
918        return Ok(DistributionContext::new(plan, true, vec![input]));
919    }
920
921    Ok(input)
922}
923
924/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
925/// on top of the given plan node to satisfy a single partition requirement
926/// while preserving ordering constraints.
927///
928/// # Parameters
929///
930/// * `input`: Current node.
931///
932/// Checks whether preserving the child's ordering enables the parent to
933/// run in streaming mode. Compares the parent's pipeline behavior with
934/// the ordered child vs. an unordered (coalesced) child. If removing the
935/// ordering would cause the parent to switch from streaming to blocking,
936/// keeping the order-preserving variant is beneficial.
937///
938/// Only applicable to single-child operators; returns `Ok(false)` for
939/// multi-child operators (e.g. joins) where child substitution semantics are
940/// ambiguous.
941fn preserving_order_enables_streaming(
942    parent: &Arc<dyn ExecutionPlan>,
943    ordered_child: &Arc<dyn ExecutionPlan>,
944) -> Result<bool> {
945    // Only applicable to single-child operators that maintain input order
946    // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
947    // maintain input order (e.g. SortExec) handle ordering themselves —
948    // preserving SPM for them is unnecessary.
949    if parent.children().len() != 1 {
950        return Ok(false);
951    }
952    if !parent.maintains_input_order()[0] {
953        return Ok(false);
954    }
955    // Build parent with the ordered child
956    let with_ordered =
957        Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
958    if with_ordered.pipeline_behavior() == EmissionType::Final {
959        // Parent is blocking even with ordering — no benefit
960        return Ok(false);
961    }
962    // Build parent with an unordered child via CoalescePartitionsExec.
963    let unordered_child: Arc<dyn ExecutionPlan> =
964        Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
965    let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?;
966    Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
967}
968
969/// # Returns
970///
971/// Updated node with an execution plan, where the desired single distribution
972/// requirement is satisfied.
973fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
974    // Apply only when the partition count is larger than one.
975    if input.plan.output_partitioning().partition_count() > 1 {
976        // When there is an existing ordering, we preserve ordering
977        // when decreasing partitions. This will be un-done in the future
978        // if any of the following conditions is true
979        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
980        // - Usage of order preserving variants is not desirable
981        // (determined by flag `config.optimizer.prefer_existing_sort`)
982        let new_plan = if let Some(req) = input.plan.output_ordering() {
983            Arc::new(SortPreservingMergeExec::new(
984                req.clone(),
985                Arc::clone(&input.plan),
986            )) as _
987        } else {
988            // If there is no input order, we can simply coalesce partitions:
989            Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
990        };
991
992        DistributionContext::new(new_plan, true, vec![input])
993    } else {
994        input
995    }
996}
997
998/// Updates the physical plan inside [`DistributionContext`] so that distribution
999/// changing operators are removed from the top. If they are necessary, they will
1000/// be added in subsequent stages.
1001///
1002/// Assume that following plan is given:
1003/// ```text
1004/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1005/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1006/// "    DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1007/// ```
1008///
1009/// Since `RepartitionExec`s change the distribution, this function removes
1010/// them and returns following plan:
1011///
1012/// ```text
1013/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1014/// ```
1015fn remove_dist_changing_operators(
1016    mut distribution_context: DistributionContext,
1017) -> Result<DistributionContext> {
1018    while is_repartition(&distribution_context.plan)
1019        || is_coalesce_partitions(&distribution_context.plan)
1020        || is_sort_preserving_merge(&distribution_context.plan)
1021    {
1022        // All of above operators have a single child. First child is only child.
1023        // Remove any distribution changing operators at the beginning:
1024        distribution_context = distribution_context.children.swap_remove(0);
1025        // Note that they will be re-inserted later on if necessary or helpful.
1026    }
1027
1028    Ok(distribution_context)
1029}
1030
1031/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
1032///
1033/// Assume that following plan is given:
1034/// ```text
1035/// "SortPreservingMergeExec: \[a@0 ASC]"
1036/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1037/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1038/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1039/// ```
1040///
1041/// This function converts plan above to the following:
1042///
1043/// ```text
1044/// "CoalescePartitionsExec"
1045/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1046/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1047/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1048/// ```
1049pub fn replace_order_preserving_variants(
1050    mut context: DistributionContext,
1051) -> Result<DistributionContext> {
1052    context.children = context
1053        .children
1054        .into_iter()
1055        .map(|child| {
1056            if child.data {
1057                replace_order_preserving_variants(child)
1058            } else {
1059                Ok(child)
1060            }
1061        })
1062        .collect::<Result<Vec<_>>>()?;
1063
1064    if is_sort_preserving_merge(&context.plan) {
1065        let child_plan = Arc::clone(&context.children[0].plan);
1066        context.plan = Arc::new(
1067            CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1068        );
1069        return Ok(context);
1070    } else if let Some(repartition) = context.plan.downcast_ref::<RepartitionExec>()
1071        && repartition.preserve_order()
1072    {
1073        context.plan = Arc::new(RepartitionExec::try_new(
1074            Arc::clone(&context.children[0].plan),
1075            repartition.partitioning().clone(),
1076        )?);
1077        return Ok(context);
1078    }
1079
1080    context.update_plan_from_children()
1081}
1082
1083/// A struct to keep track of repartition requirements for each child node.
1084struct RepartitionRequirementStatus {
1085    /// The distribution requirement for the node.
1086    requirement: Distribution,
1087    /// Designates whether round robin partitioning is theoretically beneficial;
1088    /// i.e. the operator can actually utilize parallelism.
1089    roundrobin_beneficial: bool,
1090    /// Designates whether round robin partitioning is beneficial according to
1091    /// the statistical information we have on the number of rows.
1092    roundrobin_beneficial_stats: bool,
1093    /// Designates whether hash partitioning is necessary.
1094    hash_necessary: bool,
1095}
1096
1097/// Calculates the `RepartitionRequirementStatus` for each children to generate
1098/// consistent and sensible (in terms of performance) distribution requirements.
1099/// As an example, a hash join's left (build) child might produce
1100///
1101/// ```text
1102/// RepartitionRequirementStatus {
1103///     ..,
1104///     hash_necessary: true
1105/// }
1106/// ```
1107///
1108/// while its right (probe) child might have very few rows and produce:
1109///
1110/// ```text
1111/// RepartitionRequirementStatus {
1112///     ..,
1113///     hash_necessary: false
1114/// }
1115/// ```
1116///
1117/// These statuses are not consistent as all children should agree on hash
1118/// partitioning. This function aligns the statuses to generate consistent
1119/// hash partitions for each children. After alignment, the right child's
1120/// status would turn into:
1121///
1122/// ```text
1123/// RepartitionRequirementStatus {
1124///     ..,
1125///     hash_necessary: true
1126/// }
1127/// ```
1128fn get_repartition_requirement_status(
1129    plan: &Arc<dyn ExecutionPlan>,
1130    batch_size: usize,
1131    should_use_estimates: bool,
1132) -> Result<Vec<RepartitionRequirementStatus>> {
1133    let mut needs_alignment = false;
1134    let children = plan.children();
1135    let rr_beneficial = plan.benefits_from_input_partitioning();
1136    let requirements = plan.required_input_distribution();
1137    let mut repartition_status_flags = vec![];
1138    for (child, requirement, roundrobin_beneficial) in
1139        izip!(children.into_iter(), requirements, rr_beneficial)
1140    {
1141        // Decide whether adding a round robin is beneficial depending on
1142        // the statistical information we have on the number of rows:
1143        let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1144        {
1145            Precision::Exact(n_rows) => n_rows > batch_size,
1146            Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1147            Precision::Absent => true,
1148        };
1149        let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1150        // Hash re-partitioning is necessary when the input has more than one
1151        // partitions:
1152        let multi_partitions = child.output_partitioning().partition_count() > 1;
1153        let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1154        needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1155        repartition_status_flags.push((
1156            is_hash,
1157            RepartitionRequirementStatus {
1158                requirement,
1159                roundrobin_beneficial,
1160                roundrobin_beneficial_stats,
1161                hash_necessary: is_hash && multi_partitions,
1162            },
1163        ));
1164    }
1165    // Align hash necessary flags for hash partitions to generate consistent
1166    // hash partitions at each children:
1167    if needs_alignment {
1168        // When there is at least one hash requirement that is necessary or
1169        // beneficial according to statistics, make all children require hash
1170        // repartitioning:
1171        for (is_hash, status) in &mut repartition_status_flags {
1172            if *is_hash {
1173                status.hash_necessary = true;
1174            }
1175        }
1176    }
1177    Ok(repartition_status_flags
1178        .into_iter()
1179        .map(|(_, status)| status)
1180        .collect())
1181}
1182
1183/// This function checks whether we need to add additional data exchange
1184/// operators to satisfy distribution requirements. Since this function
1185/// takes care of such requirements, we should avoid manually adding data
1186/// exchange operators in other places.
1187///
1188/// This function is intended to be used in a bottom up traversal, as it
1189/// can first repartition (or newly partition) at the datasources -- these
1190/// source partitions may be later repartitioned with additional data exchange operators.
1191pub fn ensure_distribution(
1192    dist_context: DistributionContext,
1193    config: &ConfigOptions,
1194) -> Result<Transformed<DistributionContext>> {
1195    let dist_context = update_children(dist_context)?;
1196
1197    if dist_context.plan.children().is_empty() {
1198        return Ok(Transformed::no(dist_context));
1199    }
1200
1201    let target_partitions = config.execution.target_partitions;
1202    // When `false`, round robin repartition will not be added to increase parallelism
1203    let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1204    let repartition_file_scans = config.optimizer.repartition_file_scans;
1205    let batch_size = config.execution.batch_size;
1206    let should_use_estimates = config
1207        .execution
1208        .use_row_number_estimates_to_optimize_partitioning;
1209    let subset_satisfaction_threshold = config.optimizer.subset_repartition_threshold;
1210    let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1211        && matches!(
1212            dist_context.plan.pipeline_behavior(),
1213            EmissionType::Incremental | EmissionType::Both
1214        );
1215    // Use order preserving variants either of the conditions true
1216    // - it is desired according to config
1217    // - when plan is unbounded
1218    // - when it is pipeline friendly (can incrementally produce results)
1219    let order_preserving_variants_desirable =
1220        unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1221
1222    // Remove unnecessary repartition from the physical plan if any
1223    let DistributionContext {
1224        mut plan,
1225        data,
1226        children,
1227    } = remove_dist_changing_operators(dist_context)?;
1228
1229    if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
1230        if let Some(updated_window) = get_best_fitting_window(
1231            exec.window_expr(),
1232            exec.input(),
1233            &exec.partition_keys(),
1234        )? {
1235            plan = updated_window;
1236        }
1237    } else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>()
1238        && let Some(updated_window) = get_best_fitting_window(
1239            exec.window_expr(),
1240            exec.input(),
1241            &exec.partition_keys(),
1242        )?
1243    {
1244        plan = updated_window;
1245    };
1246
1247    // For joins in partitioned mode, we need exact hash matching between
1248    // both sides, so subset partitioning logic must be disabled.
1249    //
1250    // Why: Different hash expressions produce different hash values, causing
1251    // rows with the same join key to land in different partitions. Since
1252    // partitioned joins match partition N left with partition N right, rows
1253    // that should match may be in different partitions and miss each other.
1254    //
1255    // Example JOIN ON left.a = right.a:
1256    //
1257    // Left: Hash([a])
1258    //  Partition 1: a=1
1259    //  Partition 2: a=2
1260    //
1261    // Right: Hash([a, b])
1262    //  Partition 1: (a=1, b=1) -> Same a=1
1263    //  Partition 2: (a=2, b=2)
1264    //  Partition 3: (a=1, b=2) -> Same a=1
1265    //
1266    // Partitioned join execution:
1267    //  P1 left (a=1) joins P1 right (a=1, b=1) -> Match
1268    //  P2 left (a=2) joins P2 right (a=2, b=2) -> Match
1269    //  P3 left (empty) joins P3 right (a=1, b=2) -> Missing, errors
1270    //
1271    // The row (a=1, b=2) should match left.a=1 but they're in different
1272    // partitions, causing panics.
1273    //
1274    // CollectLeft/CollectRight modes are safe because one side is collected
1275    // to a single partition which eliminates partition-to-partition mapping.
1276    let is_partitioned_join = plan
1277        .downcast_ref::<HashJoinExec>()
1278        .is_some_and(|join| join.mode == PartitionMode::Partitioned)
1279        || plan.is::<SortMergeJoinExec>();
1280
1281    let repartition_status_flags =
1282        get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1283    // This loop iterates over all the children to:
1284    // - Increase parallelism for every child if it is beneficial.
1285    // - Satisfy the distribution requirements of every child, if it is not
1286    //   already satisfied.
1287    // We store the updated children in `new_children`.
1288    let children = izip!(
1289        children.into_iter(),
1290        plan.required_input_ordering(),
1291        plan.maintains_input_order(),
1292        repartition_status_flags.into_iter()
1293    )
1294    .map(
1295        |(
1296            mut child,
1297            required_input_ordering,
1298            maintains,
1299            RepartitionRequirementStatus {
1300                requirement,
1301                roundrobin_beneficial,
1302                roundrobin_beneficial_stats,
1303                hash_necessary,
1304            },
1305        )| {
1306            let increases_partition_count =
1307                child.plan.output_partitioning().partition_count() < target_partitions;
1308
1309            let add_roundrobin = enable_round_robin
1310                // Operator benefits from partitioning (e.g. filter):
1311                && roundrobin_beneficial
1312                && roundrobin_beneficial_stats
1313                // Unless partitioning increases the partition count, it is not beneficial:
1314                && increases_partition_count;
1315
1316            // Allow subset satisfaction when:
1317            // 1. Current partition count >= threshold
1318            // 2. Not a partitioned join since must use exact hash matching for joins
1319            // 3. Not a grouping set aggregate (requires exact hash including __grouping_id)
1320            let current_partitions = child.plan.output_partitioning().partition_count();
1321
1322            // Check if the hash partitioning requirement includes __grouping_id column.
1323            // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash
1324            // partitioning on all group columns including __grouping_id to ensure partial
1325            // aggregates from different partitions are correctly combined.
1326            let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs)
1327                if exprs.iter().any(|expr| {
1328                    (expr.as_ref() as &dyn Any)
1329                        .downcast_ref::<Column>()
1330                        .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID)
1331                })
1332            );
1333
1334            let allow_subset_satisfy_partitioning = (current_partitions
1335                >= subset_satisfaction_threshold
1336                // `preserve_file_partitions` exposes existing file-group
1337                // partitioning to the optimizer. Respect it when the only
1338                // reason to repartition would be to increase partition count
1339                // beyond the preserved file-group count.
1340                || (config.optimizer.preserve_file_partitions > 0
1341                    && current_partitions < target_partitions))
1342                && !is_partitioned_join
1343                && !requires_grouping_id;
1344
1345            // When `repartition_file_scans` is set, attempt to increase
1346            // parallelism at the source.
1347            //
1348            // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1349            // then no repartitioning will have occurred. As the default implementation returns None, it is only
1350            // specific physical plan nodes, such as certain datasources, which are repartitioned.
1351            if repartition_file_scans
1352                && roundrobin_beneficial_stats
1353                && let Some(new_child) =
1354                    child.plan.repartitioned(target_partitions, config)?
1355            {
1356                child.plan = new_child;
1357            }
1358
1359            // Satisfy the distribution requirement if it is unmet.
1360            match &requirement {
1361                Distribution::SinglePartition => {
1362                    child = add_merge_on_top(child);
1363                }
1364                Distribution::HashPartitioned(exprs) => {
1365                    // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
1366                    // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1367                    if hash_necessary {
1368                        child = add_hash_on_top(
1369                            child,
1370                            exprs.to_vec(),
1371                            target_partitions,
1372                            allow_subset_satisfy_partitioning,
1373                        )?;
1374                    }
1375                }
1376                Distribution::UnspecifiedDistribution => {
1377                    if add_roundrobin {
1378                        // Add round-robin repartitioning on top of the operator
1379                        // to increase parallelism.
1380                        child = add_roundrobin_on_top(child, target_partitions)?;
1381                    }
1382                }
1383            };
1384
1385            let streaming_benefit = if child.data {
1386                preserving_order_enables_streaming(&plan, &child.plan)?
1387            } else {
1388                false
1389            };
1390
1391            // There is an ordering requirement of the operator:
1392            if let Some(required_input_ordering) = required_input_ordering {
1393                // Either:
1394                // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1395                // - using order preserving variant is not desirable.
1396                let sort_req = required_input_ordering.into_single();
1397                let ordering_satisfied = child
1398                    .plan
1399                    .equivalence_properties()
1400                    .ordering_satisfy_requirement(sort_req.clone())?;
1401
1402                if (!ordering_satisfied || !order_preserving_variants_desirable)
1403                    && !streaming_benefit
1404                    && child.data
1405                {
1406                    child = replace_order_preserving_variants(child)?;
1407                    // If ordering requirements were satisfied before repartitioning,
1408                    // make sure ordering requirements are still satisfied after.
1409                    if ordering_satisfied {
1410                        // Make sure to satisfy ordering requirement:
1411                        child = add_sort_above_with_check(
1412                            child,
1413                            sort_req,
1414                            plan.downcast_ref::<OutputRequirementExec>()
1415                                .map(|output| output.fetch())
1416                                .unwrap_or(None),
1417                        )?;
1418                    }
1419                }
1420                // Stop tracking distribution changing operators
1421                child.data = false;
1422            } else {
1423                let streaming_benefit = if child.data {
1424                    preserving_order_enables_streaming(&plan, &child.plan)?
1425                } else {
1426                    false
1427                };
1428                // no ordering requirement
1429                match requirement {
1430                    // Operator requires specific distribution.
1431                    Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1432                        // If the parent doesn't maintain input order, preserving
1433                        // ordering is pointless. However, if it does maintain
1434                        // input order, we keep order-preserving variants so
1435                        // ordering can flow through to ancestors that need it.
1436                        if !maintains && !streaming_benefit {
1437                            child = replace_order_preserving_variants(child)?;
1438                        }
1439                    }
1440                    Distribution::UnspecifiedDistribution => {
1441                        // Since ordering is lost, trying to preserve ordering is pointless
1442                        if !maintains || plan.is::<OutputRequirementExec>() {
1443                            child = replace_order_preserving_variants(child)?;
1444                        }
1445                    }
1446                }
1447            }
1448            Ok(child)
1449        },
1450    )
1451    .collect::<Result<Vec<_>>>()?;
1452
1453    let children_plans = children
1454        .iter()
1455        .map(|c| Arc::clone(&c.plan))
1456        .collect::<Vec<_>>();
1457
1458    plan = if plan.is::<UnionExec>()
1459        && !config.optimizer.prefer_existing_union
1460        && can_interleave(children_plans.iter())
1461    {
1462        // Add a special case for [`UnionExec`] since we want to "bubble up"
1463        // hash-partitioned data. So instead of
1464        //
1465        // Agg:
1466        //   Repartition (hash):
1467        //     Union:
1468        //       - Agg:
1469        //           Repartition (hash):
1470        //             Data
1471        //       - Agg:
1472        //           Repartition (hash):
1473        //             Data
1474        //
1475        // we can use:
1476        //
1477        // Agg:
1478        //   Interleave:
1479        //     - Agg:
1480        //         Repartition (hash):
1481        //           Data
1482        //     - Agg:
1483        //         Repartition (hash):
1484        //           Data
1485        Arc::new(InterleaveExec::try_new(children_plans)?)
1486    } else {
1487        plan.with_new_children(children_plans)?
1488    };
1489
1490    Ok(Transformed::yes(DistributionContext::new(
1491        plan, data, children,
1492    )))
1493}
1494
1495/// Keeps track of distribution changing operators (like `RepartitionExec`,
1496/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1497/// Using this information, we can optimize distribution of the plan if/when
1498/// necessary.
1499pub type DistributionContext = PlanContext<bool>;
1500
1501fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1502    for child_context in dist_context.children.iter_mut() {
1503        child_context.data = if let Some(repartition) =
1504            child_context.plan.downcast_ref::<RepartitionExec>()
1505        {
1506            !matches!(
1507                repartition.partitioning(),
1508                Partitioning::UnknownPartitioning(_)
1509            )
1510        } else {
1511            child_context.plan.is::<SortPreservingMergeExec>()
1512                || child_context.plan.is::<CoalescePartitionsExec>()
1513                || child_context.plan.children().is_empty()
1514                || child_context.children[0].data
1515                || child_context
1516                    .plan
1517                    .required_input_distribution()
1518                    .iter()
1519                    .zip(child_context.children.iter())
1520                    .any(|(required_dist, child_context)| {
1521                        child_context.data
1522                            && matches!(
1523                                required_dist,
1524                                Distribution::UnspecifiedDistribution
1525                            )
1526                    })
1527        }
1528    }
1529
1530    dist_context.data = false;
1531    Ok(dist_context)
1532}
1533
1534// See tests in datafusion/core/tests/physical_optimizer