Skip to main content

datafusion_physical_optimizer/
enforce_distribution.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use crate::optimizer::PhysicalOptimizerRule;
28use crate::output_requirements::OutputRequirementExec;
29use crate::utils::{
30    add_sort_above_with_check, is_coalesce_partitions, is_repartition,
31    is_sort_preserving_merge,
32};
33
34use arrow::compute::SortOptions;
35use datafusion_common::config::ConfigOptions;
36use datafusion_common::error::Result;
37use datafusion_common::stats::Precision;
38use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
39use datafusion_expr::logical_plan::{Aggregate, JoinType};
40use datafusion_physical_expr::expressions::{Column, NoOp};
41use datafusion_physical_expr::utils::map_columns_before_projection;
42use datafusion_physical_expr::{
43    EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
44};
45use datafusion_physical_plan::ExecutionPlanProperties;
46use datafusion_physical_plan::aggregates::{
47    AggregateExec, AggregateMode, PhysicalGroupBy,
48};
49use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
50use datafusion_physical_plan::execution_plan::EmissionType;
51use datafusion_physical_plan::joins::{
52    CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
53};
54use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
55use datafusion_physical_plan::repartition::RepartitionExec;
56use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
57use datafusion_physical_plan::tree_node::PlanContext;
58use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
59use datafusion_physical_plan::windows::WindowAggExec;
60use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
61use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
62
63use itertools::izip;
64
65/// The `EnforceDistribution` rule ensures that distribution requirements are
66/// met. In doing so, this rule will increase the parallelism in the plan by
67/// introducing repartitioning operators to the physical plan.
68///
69/// For example, given an input such as:
70///
71///
72/// ```text
73/// ┌─────────────────────────────────┐
74/// │                                 │
75/// │          ExecutionPlan          │
76/// │                                 │
77/// └─────────────────────────────────┘
78///             ▲         ▲
79///             │         │
80///       ┌─────┘         └─────┐
81///       │                     │
82///       │                     │
83///       │                     │
84/// ┌───────────┐         ┌───────────┐
85/// │           │         │           │
86/// │ batch A1  │         │ batch B1  │
87/// │           │         │           │
88/// ├───────────┤         ├───────────┤
89/// │           │         │           │
90/// │ batch A2  │         │ batch B2  │
91/// │           │         │           │
92/// ├───────────┤         ├───────────┤
93/// │           │         │           │
94/// │ batch A3  │         │ batch B3  │
95/// │           │         │           │
96/// └───────────┘         └───────────┘
97///
98///      Input                 Input
99///        A                     B
100/// ```
101///
102/// This rule will attempt to add a `RepartitionExec` to increase parallelism
103/// (to 3, in this case) and create the following arrangement:
104///
105/// ```text
106///     ┌─────────────────────────────────┐
107///     │                                 │
108///     │          ExecutionPlan          │
109///     │                                 │
110///     └─────────────────────────────────┘
111///               ▲      ▲       ▲            Input now has 3
112///               │      │       │             partitions
113///       ┌───────┘      │       └───────┐
114///       │              │               │
115///       │              │               │
116/// ┌───────────┐  ┌───────────┐   ┌───────────┐
117/// │           │  │           │   │           │
118/// │ batch A1  │  │ batch A3  │   │ batch B3  │
119/// │           │  │           │   │           │
120/// ├───────────┤  ├───────────┤   ├───────────┤
121/// │           │  │           │   │           │
122/// │ batch B2  │  │ batch B1  │   │ batch A2  │
123/// │           │  │           │   │           │
124/// └───────────┘  └───────────┘   └───────────┘
125///       ▲              ▲               ▲
126///       │              │               │
127///       └─────────┐    │    ┌──────────┘
128///                 │    │    │
129///                 │    │    │
130///     ┌─────────────────────────────────┐   batches are
131///     │       RepartitionExec(3)        │   repartitioned
132///     │           RoundRobin            │
133///     │                                 │
134///     └─────────────────────────────────┘
135///                 ▲         ▲
136///                 │         │
137///           ┌─────┘         └─────┐
138///           │                     │
139///           │                     │
140///           │                     │
141///     ┌───────────┐         ┌───────────┐
142///     │           │         │           │
143///     │ batch A1  │         │ batch B1  │
144///     │           │         │           │
145///     ├───────────┤         ├───────────┤
146///     │           │         │           │
147///     │ batch A2  │         │ batch B2  │
148///     │           │         │           │
149///     ├───────────┤         ├───────────┤
150///     │           │         │           │
151///     │ batch A3  │         │ batch B3  │
152///     │           │         │           │
153///     └───────────┘         └───────────┘
154///
155///
156///      Input                 Input
157///        A                     B
158/// ```
159///
160/// The `EnforceDistribution` rule
161/// - is idempotent; i.e. it can be applied multiple times, each time producing
162///   the same result.
163/// - always produces a valid plan in terms of distribution requirements. Its
164///   input plan can be valid or invalid with respect to distribution requirements,
165///   but the output plan will always be valid.
166/// - produces a valid plan in terms of ordering requirements, *if* its input is
167///   a valid plan in terms of ordering requirements. If the input plan is invalid,
168///   this rule does not attempt to fix it as doing so is the responsibility of the
169///   `EnforceSorting` rule.
170///
171/// Note that distribution requirements are met in the strictest way. This may
172/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
173/// meeting the requirements in the strictest way may help avoid possible data
174/// skew in joins.
175///
176/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
177/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
178/// (a, c), (b, c), (a), (b), (c) and ( ).
179///
180/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
181/// by a HashPartition(a, b, c).
182#[derive(Default, Debug)]
183pub struct EnforceDistribution {}
184
185impl EnforceDistribution {
186    #[expect(missing_docs)]
187    pub fn new() -> Self {
188        Self {}
189    }
190}
191
192impl PhysicalOptimizerRule for EnforceDistribution {
193    fn optimize(
194        &self,
195        plan: Arc<dyn ExecutionPlan>,
196        config: &ConfigOptions,
197    ) -> Result<Arc<dyn ExecutionPlan>> {
198        let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
199
200        let adjusted = if top_down_join_key_reordering {
201            // Run a top-down process to adjust input key ordering recursively
202            let plan_requirements = PlanWithKeyRequirements::new_default(plan);
203            let adjusted = plan_requirements
204                .transform_down(adjust_input_keys_ordering)
205                .data()?;
206            adjusted.plan
207        } else {
208            // Run a bottom-up process
209            plan.transform_up(|plan| {
210                Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
211            })
212            .data()?
213        };
214
215        let distribution_context = DistributionContext::new_default(adjusted);
216        // Distribution enforcement needs to be applied bottom-up.
217        let distribution_context = distribution_context
218            .transform_up(|distribution_context| {
219                ensure_distribution(distribution_context, config)
220            })
221            .data()?;
222        Ok(distribution_context.plan)
223    }
224
225    fn name(&self) -> &str {
226        "EnforceDistribution"
227    }
228
229    fn schema_check(&self) -> bool {
230        true
231    }
232}
233
234#[derive(Debug, Clone)]
235struct JoinKeyPairs {
236    left_keys: Vec<Arc<dyn PhysicalExpr>>,
237    right_keys: Vec<Arc<dyn PhysicalExpr>>,
238}
239
240/// Keeps track of parent required key orderings.
241pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
242
243/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
244/// That might not match with the output partitioning of the join node's children
245/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
246///
247/// Example:
248///     TopJoin on (a, b, c)
249///         bottom left join on(b, a, c)
250///         bottom right join on(c, b, a)
251///
252///  Will be adjusted to:
253///     TopJoin on (a, b, c)
254///         bottom left join on(a, b, c)
255///         bottom right join on(a, b, c)
256///
257/// Example:
258///     TopJoin on (a, b, c)
259///         Agg1 group by (b, a, c)
260///         Agg2 group by (c, b, a)
261///
262/// Will be adjusted to:
263///     TopJoin on (a, b, c)
264///          Projection(b, a, c)
265///             Agg1 group by (a, b, c)
266///          Projection(c, b, a)
267///             Agg2 group by (a, b, c)
268///
269/// Following is the explanation of the reordering process:
270///
271/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
272///    Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
273///    Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274///    Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
275///
276/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
277///    Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
278///    Requirements is already satisfied, clear all the requirements, return the unchanged plan.
279///    Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
280///
281/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
282/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
283/// 5) For other types of operators, by default, pushdown the parent requirements to children.
284pub fn adjust_input_keys_ordering(
285    mut requirements: PlanWithKeyRequirements,
286) -> Result<Transformed<PlanWithKeyRequirements>> {
287    let plan = Arc::clone(&requirements.plan);
288
289    if let Some(
290        exec @ HashJoinExec {
291            left,
292            on,
293            join_type,
294            mode,
295            ..
296        },
297    ) = plan.as_any().downcast_ref::<HashJoinExec>()
298    {
299        match mode {
300            PartitionMode::Partitioned => {
301                let join_constructor = |new_conditions: (
302                    Vec<(PhysicalExprRef, PhysicalExprRef)>,
303                    Vec<SortOptions>,
304                )| {
305                    exec.builder()
306                        .with_partition_mode(PartitionMode::Partitioned)
307                        .with_on(new_conditions.0)
308                        .build_exec()
309                };
310                return reorder_partitioned_join_keys(
311                    requirements,
312                    on,
313                    &[],
314                    &join_constructor,
315                )
316                .map(Transformed::yes);
317            }
318            PartitionMode::CollectLeft => {
319                // Push down requirements to the right side
320                requirements.children[1].data = match join_type {
321                    JoinType::Inner | JoinType::Right => shift_right_required(
322                        &requirements.data,
323                        left.schema().fields().len(),
324                    )
325                    .unwrap_or_default(),
326                    JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
327                        requirements.data.clone()
328                    }
329                    JoinType::Left
330                    | JoinType::LeftSemi
331                    | JoinType::LeftAnti
332                    | JoinType::Full
333                    | JoinType::LeftMark => vec![],
334                };
335            }
336            PartitionMode::Auto => {
337                // Can not satisfy, clear the current requirements and generate new empty requirements
338                requirements.data.clear();
339            }
340        }
341    } else if let Some(CrossJoinExec { left, .. }) =
342        plan.as_any().downcast_ref::<CrossJoinExec>()
343    {
344        let left_columns_len = left.schema().fields().len();
345        // Push down requirements to the right side
346        requirements.children[1].data =
347            shift_right_required(&requirements.data, left_columns_len)
348                .unwrap_or_default();
349    } else if let Some(SortMergeJoinExec {
350        left,
351        right,
352        on,
353        filter,
354        join_type,
355        sort_options,
356        null_equality,
357        ..
358    }) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
359    {
360        let join_constructor = |new_conditions: (
361            Vec<(PhysicalExprRef, PhysicalExprRef)>,
362            Vec<SortOptions>,
363        )| {
364            SortMergeJoinExec::try_new(
365                Arc::clone(left),
366                Arc::clone(right),
367                new_conditions.0,
368                filter.clone(),
369                *join_type,
370                new_conditions.1,
371                *null_equality,
372            )
373            .map(|e| Arc::new(e) as _)
374        };
375        return reorder_partitioned_join_keys(
376            requirements,
377            on,
378            sort_options,
379            &join_constructor,
380        )
381        .map(Transformed::yes);
382    } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
383        if !requirements.data.is_empty() {
384            if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
385                return reorder_aggregate_keys(requirements, aggregate_exec)
386                    .map(Transformed::yes);
387            } else {
388                requirements.data.clear();
389            }
390        } else {
391            // Keep everything unchanged
392            return Ok(Transformed::no(requirements));
393        }
394    } else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
395        let expr = proj.expr();
396        // For Projection, we need to transform the requirements to the columns before the Projection
397        // And then to push down the requirements
398        // Construct a mapping from new name to the original Column
399        let proj_exprs: Vec<_> = expr
400            .iter()
401            .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
402            .collect();
403        let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
404        if new_required.len() == requirements.data.len() {
405            requirements.children[0].data = new_required;
406        } else {
407            // Can not satisfy, clear the current requirements and generate new empty requirements
408            requirements.data.clear();
409        }
410    } else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
411        || plan
412            .as_any()
413            .downcast_ref::<CoalescePartitionsExec>()
414            .is_some()
415        || plan.as_any().downcast_ref::<WindowAggExec>().is_some()
416    {
417        requirements.data.clear();
418    } else {
419        // By default, push down the parent requirements to children
420        for child in requirements.children.iter_mut() {
421            child.data.clone_from(&requirements.data);
422        }
423    }
424    Ok(Transformed::yes(requirements))
425}
426
427pub fn reorder_partitioned_join_keys<F>(
428    mut join_plan: PlanWithKeyRequirements,
429    on: &[(PhysicalExprRef, PhysicalExprRef)],
430    sort_options: &[SortOptions],
431    join_constructor: &F,
432) -> Result<PlanWithKeyRequirements>
433where
434    F: Fn(
435        (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
436    ) -> Result<Arc<dyn ExecutionPlan>>,
437{
438    let parent_required = &join_plan.data;
439    let join_key_pairs = extract_join_keys(on);
440    let eq_properties = join_plan.plan.equivalence_properties();
441
442    let (
443        JoinKeyPairs {
444            left_keys,
445            right_keys,
446        },
447        positions,
448    ) = try_reorder(join_key_pairs, parent_required, eq_properties);
449
450    if let Some(positions) = positions
451        && !positions.is_empty()
452    {
453        let new_join_on = new_join_conditions(&left_keys, &right_keys);
454        let new_sort_options = (0..sort_options.len())
455            .map(|idx| sort_options[positions[idx]])
456            .collect();
457        join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
458    }
459
460    join_plan.children[0].data = left_keys;
461    join_plan.children[1].data = right_keys;
462    Ok(join_plan)
463}
464
465pub fn reorder_aggregate_keys(
466    mut agg_node: PlanWithKeyRequirements,
467    agg_exec: &AggregateExec,
468) -> Result<PlanWithKeyRequirements> {
469    let parent_required = &agg_node.data;
470    let output_columns = agg_exec
471        .group_expr()
472        .expr()
473        .iter()
474        .enumerate()
475        .map(|(index, (_, name))| Column::new(name, index))
476        .collect::<Vec<_>>();
477
478    let output_exprs = output_columns
479        .iter()
480        .map(|c| Arc::new(c.clone()) as _)
481        .collect::<Vec<_>>();
482
483    if parent_required.len() == output_exprs.len()
484        && agg_exec.group_expr().null_expr().is_empty()
485        && !physical_exprs_equal(&output_exprs, parent_required)
486        && let Some(positions) = expected_expr_positions(&output_exprs, parent_required)
487        && let Some(agg_exec) = agg_exec.input().as_any().downcast_ref::<AggregateExec>()
488        && *agg_exec.mode() == AggregateMode::Partial
489    {
490        let group_exprs = agg_exec.group_expr().expr();
491        let new_group_exprs = positions
492            .into_iter()
493            .map(|idx| group_exprs[idx].clone())
494            .collect();
495        let partial_agg = Arc::new(AggregateExec::try_new(
496            AggregateMode::Partial,
497            PhysicalGroupBy::new_single(new_group_exprs),
498            agg_exec.aggr_expr().to_vec(),
499            agg_exec.filter_expr().to_vec(),
500            Arc::clone(agg_exec.input()),
501            Arc::clone(&agg_exec.input_schema),
502        )?);
503        // Build new group expressions that correspond to the output
504        // of the "reordered" aggregator:
505        let group_exprs = partial_agg.group_expr().expr();
506        let new_group_by = PhysicalGroupBy::new_single(
507            partial_agg
508                .output_group_expr()
509                .into_iter()
510                .enumerate()
511                .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
512                .collect(),
513        );
514        let new_final_agg = Arc::new(AggregateExec::try_new(
515            AggregateMode::FinalPartitioned,
516            new_group_by,
517            agg_exec.aggr_expr().to_vec(),
518            agg_exec.filter_expr().to_vec(),
519            Arc::clone(&partial_agg) as _,
520            agg_exec.input_schema(),
521        )?);
522
523        agg_node.plan = Arc::clone(&new_final_agg) as _;
524        agg_node.data.clear();
525        agg_node.children = vec![PlanWithKeyRequirements::new(
526            partial_agg as _,
527            vec![],
528            agg_node.children.swap_remove(0).children,
529        )];
530
531        // Need to create a new projection to change the expr ordering back
532        let agg_schema = new_final_agg.schema();
533        let mut proj_exprs = output_columns
534            .iter()
535            .map(|col| {
536                let name = col.name();
537                let index = agg_schema.index_of(name)?;
538                Ok(ProjectionExpr {
539                    expr: Arc::new(Column::new(name, index)) as _,
540                    alias: name.to_owned(),
541                })
542            })
543            .collect::<Result<Vec<_>>>()?;
544        let agg_fields = agg_schema.fields();
545        for (idx, field) in agg_fields.iter().enumerate().skip(output_columns.len()) {
546            let name = field.name();
547            let plan = Arc::new(Column::new(name, idx)) as _;
548            proj_exprs.push(ProjectionExpr {
549                expr: plan,
550                alias: name.clone(),
551            })
552        }
553        return ProjectionExec::try_new(proj_exprs, new_final_agg)
554            .map(|p| PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]));
555    }
556    Ok(agg_node)
557}
558
559fn shift_right_required(
560    parent_required: &[Arc<dyn PhysicalExpr>],
561    left_columns_len: usize,
562) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
563    let new_right_required = parent_required
564        .iter()
565        .filter_map(|r| {
566            r.as_any().downcast_ref::<Column>().and_then(|col| {
567                col.index()
568                    .checked_sub(left_columns_len)
569                    .map(|index| Arc::new(Column::new(col.name(), index)) as _)
570            })
571        })
572        .collect::<Vec<_>>();
573
574    // if the parent required are all coming from the right side, the requirements can be pushdown
575    (new_right_required.len() == parent_required.len()).then_some(new_right_required)
576}
577
578/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
579/// That might not match with the output partitioning of the join node's children
580/// This method will try to change the ordering of the join keys to match with the
581/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
582/// match with one, either the left side or the right side.
583///
584/// Example:
585///     TopJoin on (a, b, c)
586///         bottom left join on(b, a, c)
587///         bottom right join on(c, b, a)
588///
589///  Will be adjusted to:
590///     TopJoin on (b, a, c)
591///         bottom left join on(b, a, c)
592///         bottom right join on(c, b, a)
593///
594/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
595/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
596/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
597/// and then can't apply the Top-Down reordering process.
598pub fn reorder_join_keys_to_inputs(
599    plan: Arc<dyn ExecutionPlan>,
600) -> Result<Arc<dyn ExecutionPlan>> {
601    let plan_any = plan.as_any();
602    if let Some(
603        exec @ HashJoinExec {
604            left,
605            right,
606            on,
607            mode,
608            ..
609        },
610    ) = plan_any.downcast_ref::<HashJoinExec>()
611    {
612        if *mode == PartitionMode::Partitioned {
613            let (join_keys, positions) = reorder_current_join_keys(
614                extract_join_keys(on),
615                Some(left.output_partitioning()),
616                Some(right.output_partitioning()),
617                left.equivalence_properties(),
618                right.equivalence_properties(),
619            );
620            if positions.is_some_and(|idxs| !idxs.is_empty()) {
621                let JoinKeyPairs {
622                    left_keys,
623                    right_keys,
624                } = join_keys;
625                let new_join_on = new_join_conditions(&left_keys, &right_keys);
626                return exec
627                    .builder()
628                    .with_partition_mode(PartitionMode::Partitioned)
629                    .with_on(new_join_on)
630                    .build_exec();
631            }
632        }
633    } else if let Some(SortMergeJoinExec {
634        left,
635        right,
636        on,
637        filter,
638        join_type,
639        sort_options,
640        null_equality,
641        ..
642    }) = plan_any.downcast_ref::<SortMergeJoinExec>()
643    {
644        let (join_keys, positions) = reorder_current_join_keys(
645            extract_join_keys(on),
646            Some(left.output_partitioning()),
647            Some(right.output_partitioning()),
648            left.equivalence_properties(),
649            right.equivalence_properties(),
650        );
651        if let Some(positions) = positions
652            && !positions.is_empty()
653        {
654            let JoinKeyPairs {
655                left_keys,
656                right_keys,
657            } = join_keys;
658            let new_join_on = new_join_conditions(&left_keys, &right_keys);
659            let new_sort_options = (0..sort_options.len())
660                .map(|idx| sort_options[positions[idx]])
661                .collect();
662            return SortMergeJoinExec::try_new(
663                Arc::clone(left),
664                Arc::clone(right),
665                new_join_on,
666                filter.clone(),
667                *join_type,
668                new_sort_options,
669                *null_equality,
670            )
671            .map(|smj| Arc::new(smj) as _);
672        }
673    }
674    Ok(plan)
675}
676
677/// Reorder the current join keys ordering based on either left partition or right partition
678fn reorder_current_join_keys(
679    join_keys: JoinKeyPairs,
680    left_partition: Option<&Partitioning>,
681    right_partition: Option<&Partitioning>,
682    left_equivalence_properties: &EquivalenceProperties,
683    right_equivalence_properties: &EquivalenceProperties,
684) -> (JoinKeyPairs, Option<Vec<usize>>) {
685    match (left_partition, right_partition) {
686        (Some(Partitioning::Hash(left_exprs, _)), _) => {
687            match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
688                (join_keys, None) => reorder_current_join_keys(
689                    join_keys,
690                    None,
691                    right_partition,
692                    left_equivalence_properties,
693                    right_equivalence_properties,
694                ),
695                result => result,
696            }
697        }
698        (_, Some(Partitioning::Hash(right_exprs, _))) => {
699            try_reorder(join_keys, right_exprs, right_equivalence_properties)
700        }
701        _ => (join_keys, None),
702    }
703}
704
705fn try_reorder(
706    join_keys: JoinKeyPairs,
707    expected: &[Arc<dyn PhysicalExpr>],
708    equivalence_properties: &EquivalenceProperties,
709) -> (JoinKeyPairs, Option<Vec<usize>>) {
710    let eq_groups = equivalence_properties.eq_group();
711    let mut normalized_expected = vec![];
712    let mut normalized_left_keys = vec![];
713    let mut normalized_right_keys = vec![];
714    if join_keys.left_keys.len() != expected.len() {
715        return (join_keys, None);
716    }
717    if physical_exprs_equal(expected, &join_keys.left_keys)
718        || physical_exprs_equal(expected, &join_keys.right_keys)
719    {
720        return (join_keys, Some(vec![]));
721    } else if !equivalence_properties.eq_group().is_empty() {
722        normalized_expected = expected
723            .iter()
724            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
725            .collect();
726
727        normalized_left_keys = join_keys
728            .left_keys
729            .iter()
730            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
731            .collect();
732
733        normalized_right_keys = join_keys
734            .right_keys
735            .iter()
736            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
737            .collect();
738
739        if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
740            || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
741        {
742            return (join_keys, Some(vec![]));
743        }
744    }
745
746    let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
747        .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
748        .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
749        .or_else(|| {
750            expected_expr_positions(&normalized_right_keys, &normalized_expected)
751        })
752    else {
753        return (join_keys, None);
754    };
755
756    let mut new_left_keys = vec![];
757    let mut new_right_keys = vec![];
758    for pos in positions.iter() {
759        new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
760        new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
761    }
762    let pairs = JoinKeyPairs {
763        left_keys: new_left_keys,
764        right_keys: new_right_keys,
765    };
766
767    (pairs, Some(positions))
768}
769
770/// Return the expected expressions positions.
771/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
772///
773/// This method will return a Vec [3, 0, 1, 2]
774fn expected_expr_positions(
775    current: &[Arc<dyn PhysicalExpr>],
776    expected: &[Arc<dyn PhysicalExpr>],
777) -> Option<Vec<usize>> {
778    if current.is_empty() || expected.is_empty() {
779        return None;
780    }
781    let mut indexes: Vec<usize> = vec![];
782    let mut current = current.to_vec();
783    for expr in expected.iter() {
784        // Find the position of the expected expr in the current expressions
785        if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
786            current[expected_position] = Arc::new(NoOp::new());
787            indexes.push(expected_position);
788        } else {
789            return None;
790        }
791    }
792    Some(indexes)
793}
794
795fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
796    let (left_keys, right_keys) = on
797        .iter()
798        .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
799        .unzip();
800    JoinKeyPairs {
801        left_keys,
802        right_keys,
803    }
804}
805
806fn new_join_conditions(
807    new_left_keys: &[Arc<dyn PhysicalExpr>],
808    new_right_keys: &[Arc<dyn PhysicalExpr>],
809) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
810    new_left_keys
811        .iter()
812        .zip(new_right_keys.iter())
813        .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
814        .collect()
815}
816
817/// Adds RoundRobin repartition operator to the plan increase parallelism.
818///
819/// # Arguments
820///
821/// * `input`: Current node.
822/// * `n_target`: desired target partition number, if partition number of the
823///   current executor is less than this value. Partition number will be increased.
824///
825/// # Returns
826///
827/// A [`Result`] object that contains new execution plan where the desired
828/// partition number is achieved by adding a RoundRobin repartition.
829fn add_roundrobin_on_top(
830    input: DistributionContext,
831    n_target: usize,
832) -> Result<DistributionContext> {
833    // Adding repartition is helpful:
834    if input.plan.output_partitioning().partition_count() < n_target {
835        // When there is an existing ordering, we preserve ordering
836        // during repartition. This will be un-done in the future
837        // If any of the following conditions is true
838        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
839        // - Usage of order preserving variants is not desirable
840        // (determined by flag `config.optimizer.prefer_existing_sort`)
841        let partitioning = Partitioning::RoundRobinBatch(n_target);
842        let repartition =
843            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
844                .with_preserve_order();
845
846        let new_plan = Arc::new(repartition) as _;
847
848        Ok(DistributionContext::new(new_plan, true, vec![input]))
849    } else {
850        // Partition is not helpful, we already have desired number of partitions.
851        Ok(input)
852    }
853}
854
855/// Adds a hash repartition operator:
856/// - to increase parallelism, and/or
857/// - to satisfy requirements of the subsequent operators.
858///
859/// Repartition(Hash) is added on top of operator `input`.
860///
861/// # Arguments
862///
863/// * `input`: Current node.
864/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
865/// * `n_target`: desired target partition number, if partition number of the
866///   current executor is less than this value. Partition number will be increased.
867/// * `allow_subset_satisfy_partitioning`: Whether to allow subset partitioning logic in satisfaction checks.
868///   Set to `false` for partitioned hash joins to ensure exact hash matching.
869///
870/// # Returns
871///
872/// A [`Result`] object that contains new execution plan where the desired
873/// distribution is satisfied by adding a Hash repartition.
874fn add_hash_on_top(
875    input: DistributionContext,
876    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
877    n_target: usize,
878    allow_subset_satisfy_partitioning: bool,
879) -> Result<DistributionContext> {
880    // Early return if hash repartition is unnecessary
881    // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
882    if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
883        return Ok(input);
884    }
885
886    let dist = Distribution::HashPartitioned(hash_exprs);
887    let satisfaction = input.plan.output_partitioning().satisfaction(
888        &dist,
889        input.plan.equivalence_properties(),
890        allow_subset_satisfy_partitioning,
891    );
892
893    // Add hash repartitioning when:
894    // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied
895    // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism
896    let needs_repartition = if allow_subset_satisfy_partitioning {
897        !satisfaction.is_satisfied()
898    } else {
899        !satisfaction.is_satisfied()
900            || n_target > input.plan.output_partitioning().partition_count()
901    };
902
903    if needs_repartition {
904        // When there is an existing ordering, we preserve ordering during
905        // repartition. This will be rolled back in the future if any of the
906        // following conditions is true:
907        // - Preserving ordering is not helpful in terms of satisfying ordering
908        //   requirements.
909        // - Usage of order preserving variants is not desirable (per the flag
910        //   `config.optimizer.prefer_existing_sort`).
911        let partitioning = dist.create_partitioning(n_target);
912        let repartition =
913            RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
914                .with_preserve_order();
915        let plan = Arc::new(repartition) as _;
916
917        return Ok(DistributionContext::new(plan, true, vec![input]));
918    }
919
920    Ok(input)
921}
922
923/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
924/// on top of the given plan node to satisfy a single partition requirement
925/// while preserving ordering constraints.
926///
927/// # Parameters
928///
929/// * `input`: Current node.
930///
931/// # Returns
932///
933/// Updated node with an execution plan, where the desired single distribution
934/// requirement is satisfied.
935fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
936    // Apply only when the partition count is larger than one.
937    if input.plan.output_partitioning().partition_count() > 1 {
938        // When there is an existing ordering, we preserve ordering
939        // when decreasing partitions. This will be un-done in the future
940        // if any of the following conditions is true
941        // - Preserving ordering is not helpful in terms of satisfying ordering requirements
942        // - Usage of order preserving variants is not desirable
943        // (determined by flag `config.optimizer.prefer_existing_sort`)
944        let new_plan = if let Some(req) = input.plan.output_ordering() {
945            Arc::new(SortPreservingMergeExec::new(
946                req.clone(),
947                Arc::clone(&input.plan),
948            )) as _
949        } else {
950            // If there is no input order, we can simply coalesce partitions:
951            Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
952        };
953
954        DistributionContext::new(new_plan, true, vec![input])
955    } else {
956        input
957    }
958}
959
960/// Updates the physical plan inside [`DistributionContext`] so that distribution
961/// changing operators are removed from the top. If they are necessary, they will
962/// be added in subsequent stages.
963///
964/// Assume that following plan is given:
965/// ```text
966/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
967/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
968/// "    DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
969/// ```
970///
971/// Since `RepartitionExec`s change the distribution, this function removes
972/// them and returns following plan:
973///
974/// ```text
975/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
976/// ```
977fn remove_dist_changing_operators(
978    mut distribution_context: DistributionContext,
979) -> Result<DistributionContext> {
980    while is_repartition(&distribution_context.plan)
981        || is_coalesce_partitions(&distribution_context.plan)
982        || is_sort_preserving_merge(&distribution_context.plan)
983    {
984        // All of above operators have a single child. First child is only child.
985        // Remove any distribution changing operators at the beginning:
986        distribution_context = distribution_context.children.swap_remove(0);
987        // Note that they will be re-inserted later on if necessary or helpful.
988    }
989
990    Ok(distribution_context)
991}
992
993/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
994///
995/// Assume that following plan is given:
996/// ```text
997/// "SortPreservingMergeExec: \[a@0 ASC]"
998/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
999/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1000/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1001/// ```
1002///
1003/// This function converts plan above to the following:
1004///
1005/// ```text
1006/// "CoalescePartitionsExec"
1007/// "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1008/// "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1009/// "      DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1010/// ```
1011pub fn replace_order_preserving_variants(
1012    mut context: DistributionContext,
1013) -> Result<DistributionContext> {
1014    context.children = context
1015        .children
1016        .into_iter()
1017        .map(|child| {
1018            if child.data {
1019                replace_order_preserving_variants(child)
1020            } else {
1021                Ok(child)
1022            }
1023        })
1024        .collect::<Result<Vec<_>>>()?;
1025
1026    if is_sort_preserving_merge(&context.plan) {
1027        let child_plan = Arc::clone(&context.children[0].plan);
1028        context.plan = Arc::new(
1029            CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1030        );
1031        return Ok(context);
1032    } else if let Some(repartition) =
1033        context.plan.as_any().downcast_ref::<RepartitionExec>()
1034        && repartition.preserve_order()
1035    {
1036        context.plan = Arc::new(RepartitionExec::try_new(
1037            Arc::clone(&context.children[0].plan),
1038            repartition.partitioning().clone(),
1039        )?);
1040        return Ok(context);
1041    }
1042
1043    context.update_plan_from_children()
1044}
1045
1046/// A struct to keep track of repartition requirements for each child node.
1047struct RepartitionRequirementStatus {
1048    /// The distribution requirement for the node.
1049    requirement: Distribution,
1050    /// Designates whether round robin partitioning is theoretically beneficial;
1051    /// i.e. the operator can actually utilize parallelism.
1052    roundrobin_beneficial: bool,
1053    /// Designates whether round robin partitioning is beneficial according to
1054    /// the statistical information we have on the number of rows.
1055    roundrobin_beneficial_stats: bool,
1056    /// Designates whether hash partitioning is necessary.
1057    hash_necessary: bool,
1058}
1059
1060/// Calculates the `RepartitionRequirementStatus` for each children to generate
1061/// consistent and sensible (in terms of performance) distribution requirements.
1062/// As an example, a hash join's left (build) child might produce
1063///
1064/// ```text
1065/// RepartitionRequirementStatus {
1066///     ..,
1067///     hash_necessary: true
1068/// }
1069/// ```
1070///
1071/// while its right (probe) child might have very few rows and produce:
1072///
1073/// ```text
1074/// RepartitionRequirementStatus {
1075///     ..,
1076///     hash_necessary: false
1077/// }
1078/// ```
1079///
1080/// These statuses are not consistent as all children should agree on hash
1081/// partitioning. This function aligns the statuses to generate consistent
1082/// hash partitions for each children. After alignment, the right child's
1083/// status would turn into:
1084///
1085/// ```text
1086/// RepartitionRequirementStatus {
1087///     ..,
1088///     hash_necessary: true
1089/// }
1090/// ```
1091fn get_repartition_requirement_status(
1092    plan: &Arc<dyn ExecutionPlan>,
1093    batch_size: usize,
1094    should_use_estimates: bool,
1095) -> Result<Vec<RepartitionRequirementStatus>> {
1096    let mut needs_alignment = false;
1097    let children = plan.children();
1098    let rr_beneficial = plan.benefits_from_input_partitioning();
1099    let requirements = plan.required_input_distribution();
1100    let mut repartition_status_flags = vec![];
1101    for (child, requirement, roundrobin_beneficial) in
1102        izip!(children.into_iter(), requirements, rr_beneficial)
1103    {
1104        // Decide whether adding a round robin is beneficial depending on
1105        // the statistical information we have on the number of rows:
1106        let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1107        {
1108            Precision::Exact(n_rows) => n_rows > batch_size,
1109            Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1110            Precision::Absent => true,
1111        };
1112        let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1113        // Hash re-partitioning is necessary when the input has more than one
1114        // partitions:
1115        let multi_partitions = child.output_partitioning().partition_count() > 1;
1116        let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1117        needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1118        repartition_status_flags.push((
1119            is_hash,
1120            RepartitionRequirementStatus {
1121                requirement,
1122                roundrobin_beneficial,
1123                roundrobin_beneficial_stats,
1124                hash_necessary: is_hash && multi_partitions,
1125            },
1126        ));
1127    }
1128    // Align hash necessary flags for hash partitions to generate consistent
1129    // hash partitions at each children:
1130    if needs_alignment {
1131        // When there is at least one hash requirement that is necessary or
1132        // beneficial according to statistics, make all children require hash
1133        // repartitioning:
1134        for (is_hash, status) in &mut repartition_status_flags {
1135            if *is_hash {
1136                status.hash_necessary = true;
1137            }
1138        }
1139    }
1140    Ok(repartition_status_flags
1141        .into_iter()
1142        .map(|(_, status)| status)
1143        .collect())
1144}
1145
1146/// This function checks whether we need to add additional data exchange
1147/// operators to satisfy distribution requirements. Since this function
1148/// takes care of such requirements, we should avoid manually adding data
1149/// exchange operators in other places.
1150///
1151/// This function is intended to be used in a bottom up traversal, as it
1152/// can first repartition (or newly partition) at the datasources -- these
1153/// source partitions may be later repartitioned with additional data exchange operators.
1154pub fn ensure_distribution(
1155    dist_context: DistributionContext,
1156    config: &ConfigOptions,
1157) -> Result<Transformed<DistributionContext>> {
1158    let dist_context = update_children(dist_context)?;
1159
1160    if dist_context.plan.children().is_empty() {
1161        return Ok(Transformed::no(dist_context));
1162    }
1163
1164    let target_partitions = config.execution.target_partitions;
1165    // When `false`, round robin repartition will not be added to increase parallelism
1166    let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1167    let repartition_file_scans = config.optimizer.repartition_file_scans;
1168    let batch_size = config.execution.batch_size;
1169    let should_use_estimates = config
1170        .execution
1171        .use_row_number_estimates_to_optimize_partitioning;
1172    let subset_satisfaction_threshold = config.optimizer.subset_repartition_threshold;
1173    let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1174        && matches!(
1175            dist_context.plan.pipeline_behavior(),
1176            EmissionType::Incremental | EmissionType::Both
1177        );
1178    // Use order preserving variants either of the conditions true
1179    // - it is desired according to config
1180    // - when plan is unbounded
1181    // - when it is pipeline friendly (can incrementally produce results)
1182    let order_preserving_variants_desirable =
1183        unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1184
1185    // Remove unnecessary repartition from the physical plan if any
1186    let DistributionContext {
1187        mut plan,
1188        data,
1189        children,
1190    } = remove_dist_changing_operators(dist_context)?;
1191
1192    if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
1193        if let Some(updated_window) = get_best_fitting_window(
1194            exec.window_expr(),
1195            exec.input(),
1196            &exec.partition_keys(),
1197        )? {
1198            plan = updated_window;
1199        }
1200    } else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>()
1201        && let Some(updated_window) = get_best_fitting_window(
1202            exec.window_expr(),
1203            exec.input(),
1204            &exec.partition_keys(),
1205        )?
1206    {
1207        plan = updated_window;
1208    };
1209
1210    // For joins in partitioned mode, we need exact hash matching between
1211    // both sides, so subset partitioning logic must be disabled.
1212    //
1213    // Why: Different hash expressions produce different hash values, causing
1214    // rows with the same join key to land in different partitions. Since
1215    // partitioned joins match partition N left with partition N right, rows
1216    // that should match may be in different partitions and miss each other.
1217    //
1218    // Example JOIN ON left.a = right.a:
1219    //
1220    // Left: Hash([a])
1221    //  Partition 1: a=1
1222    //  Partition 2: a=2
1223    //
1224    // Right: Hash([a, b])
1225    //  Partition 1: (a=1, b=1) -> Same a=1
1226    //  Partition 2: (a=2, b=2)
1227    //  Partition 3: (a=1, b=2) -> Same a=1
1228    //
1229    // Partitioned join execution:
1230    //  P1 left (a=1) joins P1 right (a=1, b=1) -> Match
1231    //  P2 left (a=2) joins P2 right (a=2, b=2) -> Match
1232    //  P3 left (empty) joins P3 right (a=1, b=2) -> Missing, errors
1233    //
1234    // The row (a=1, b=2) should match left.a=1 but they're in different
1235    // partitions, causing panics.
1236    //
1237    // CollectLeft/CollectRight modes are safe because one side is collected
1238    // to a single partition which eliminates partition-to-partition mapping.
1239    let is_partitioned_join = plan
1240        .as_any()
1241        .downcast_ref::<HashJoinExec>()
1242        .is_some_and(|join| join.mode == PartitionMode::Partitioned)
1243        || plan.as_any().is::<SortMergeJoinExec>();
1244
1245    let repartition_status_flags =
1246        get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1247    // This loop iterates over all the children to:
1248    // - Increase parallelism for every child if it is beneficial.
1249    // - Satisfy the distribution requirements of every child, if it is not
1250    //   already satisfied.
1251    // We store the updated children in `new_children`.
1252    let children = izip!(
1253        children.into_iter(),
1254        plan.required_input_ordering(),
1255        plan.maintains_input_order(),
1256        repartition_status_flags.into_iter()
1257    )
1258    .map(
1259        |(
1260            mut child,
1261            required_input_ordering,
1262            maintains,
1263            RepartitionRequirementStatus {
1264                requirement,
1265                roundrobin_beneficial,
1266                roundrobin_beneficial_stats,
1267                hash_necessary,
1268            },
1269        )| {
1270            let increases_partition_count =
1271                child.plan.output_partitioning().partition_count() < target_partitions;
1272
1273            let add_roundrobin = enable_round_robin
1274                // Operator benefits from partitioning (e.g. filter):
1275                && roundrobin_beneficial
1276                && roundrobin_beneficial_stats
1277                // Unless partitioning increases the partition count, it is not beneficial:
1278                && increases_partition_count;
1279
1280            // Allow subset satisfaction when:
1281            // 1. Current partition count >= threshold
1282            // 2. Not a partitioned join since must use exact hash matching for joins
1283            // 3. Not a grouping set aggregate (requires exact hash including __grouping_id)
1284            let current_partitions = child.plan.output_partitioning().partition_count();
1285
1286            // Check if the hash partitioning requirement includes __grouping_id column.
1287            // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash
1288            // partitioning on all group columns including __grouping_id to ensure partial
1289            // aggregates from different partitions are correctly combined.
1290            let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs)
1291                if exprs.iter().any(|expr| {
1292                    expr.as_any()
1293                        .downcast_ref::<Column>()
1294                        .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID)
1295                })
1296            );
1297
1298            let allow_subset_satisfy_partitioning = current_partitions
1299                >= subset_satisfaction_threshold
1300                && !is_partitioned_join
1301                && !requires_grouping_id;
1302
1303            // When `repartition_file_scans` is set, attempt to increase
1304            // parallelism at the source.
1305            //
1306            // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1307            // then no repartitioning will have occurred. As the default implementation returns None, it is only
1308            // specific physical plan nodes, such as certain datasources, which are repartitioned.
1309            if repartition_file_scans
1310                && roundrobin_beneficial_stats
1311                && let Some(new_child) =
1312                    child.plan.repartitioned(target_partitions, config)?
1313            {
1314                child.plan = new_child;
1315            }
1316
1317            // Satisfy the distribution requirement if it is unmet.
1318            match &requirement {
1319                Distribution::SinglePartition => {
1320                    child = add_merge_on_top(child);
1321                }
1322                Distribution::HashPartitioned(exprs) => {
1323                    // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
1324                    // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1325                    if hash_necessary {
1326                        child = add_hash_on_top(
1327                            child,
1328                            exprs.to_vec(),
1329                            target_partitions,
1330                            allow_subset_satisfy_partitioning,
1331                        )?;
1332                    }
1333                }
1334                Distribution::UnspecifiedDistribution => {
1335                    if add_roundrobin {
1336                        // Add round-robin repartitioning on top of the operator
1337                        // to increase parallelism.
1338                        child = add_roundrobin_on_top(child, target_partitions)?;
1339                    }
1340                }
1341            };
1342
1343            // There is an ordering requirement of the operator:
1344            if let Some(required_input_ordering) = required_input_ordering {
1345                // Either:
1346                // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1347                // - using order preserving variant is not desirable.
1348                let sort_req = required_input_ordering.into_single();
1349                let ordering_satisfied = child
1350                    .plan
1351                    .equivalence_properties()
1352                    .ordering_satisfy_requirement(sort_req.clone())?;
1353
1354                if (!ordering_satisfied || !order_preserving_variants_desirable)
1355                    && child.data
1356                {
1357                    child = replace_order_preserving_variants(child)?;
1358                    // If ordering requirements were satisfied before repartitioning,
1359                    // make sure ordering requirements are still satisfied after.
1360                    if ordering_satisfied {
1361                        // Make sure to satisfy ordering requirement:
1362                        child = add_sort_above_with_check(
1363                            child,
1364                            sort_req,
1365                            plan.as_any()
1366                                .downcast_ref::<OutputRequirementExec>()
1367                                .map(|output| output.fetch())
1368                                .unwrap_or(None),
1369                        )?;
1370                    }
1371                }
1372                // Stop tracking distribution changing operators
1373                child.data = false;
1374            } else {
1375                // no ordering requirement
1376                match requirement {
1377                    // Operator requires specific distribution.
1378                    Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1379                        // Since there is no ordering requirement, preserving ordering is pointless
1380                        child = replace_order_preserving_variants(child)?;
1381                    }
1382                    Distribution::UnspecifiedDistribution => {
1383                        // Since ordering is lost, trying to preserve ordering is pointless
1384                        if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1385                            child = replace_order_preserving_variants(child)?;
1386                        }
1387                    }
1388                }
1389            }
1390            Ok(child)
1391        },
1392    )
1393    .collect::<Result<Vec<_>>>()?;
1394
1395    let children_plans = children
1396        .iter()
1397        .map(|c| Arc::clone(&c.plan))
1398        .collect::<Vec<_>>();
1399
1400    plan = if plan.as_any().is::<UnionExec>()
1401        && !config.optimizer.prefer_existing_union
1402        && can_interleave(children_plans.iter())
1403    {
1404        // Add a special case for [`UnionExec`] since we want to "bubble up"
1405        // hash-partitioned data. So instead of
1406        //
1407        // Agg:
1408        //   Repartition (hash):
1409        //     Union:
1410        //       - Agg:
1411        //           Repartition (hash):
1412        //             Data
1413        //       - Agg:
1414        //           Repartition (hash):
1415        //             Data
1416        //
1417        // we can use:
1418        //
1419        // Agg:
1420        //   Interleave:
1421        //     - Agg:
1422        //         Repartition (hash):
1423        //           Data
1424        //     - Agg:
1425        //         Repartition (hash):
1426        //           Data
1427        Arc::new(InterleaveExec::try_new(children_plans)?)
1428    } else {
1429        plan.with_new_children(children_plans)?
1430    };
1431
1432    Ok(Transformed::yes(DistributionContext::new(
1433        plan, data, children,
1434    )))
1435}
1436
1437/// Keeps track of distribution changing operators (like `RepartitionExec`,
1438/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1439/// Using this information, we can optimize distribution of the plan if/when
1440/// necessary.
1441pub type DistributionContext = PlanContext<bool>;
1442
1443fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1444    for child_context in dist_context.children.iter_mut() {
1445        let child_plan_any = child_context.plan.as_any();
1446        child_context.data =
1447            if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
1448                !matches!(
1449                    repartition.partitioning(),
1450                    Partitioning::UnknownPartitioning(_)
1451                )
1452            } else {
1453                child_plan_any.is::<SortPreservingMergeExec>()
1454                    || child_plan_any.is::<CoalescePartitionsExec>()
1455                    || child_context.plan.children().is_empty()
1456                    || child_context.children[0].data
1457                    || child_context
1458                        .plan
1459                        .required_input_distribution()
1460                        .iter()
1461                        .zip(child_context.children.iter())
1462                        .any(|(required_dist, child_context)| {
1463                            child_context.data
1464                                && matches!(
1465                                    required_dist,
1466                                    Distribution::UnspecifiedDistribution
1467                                )
1468                        })
1469            }
1470    }
1471
1472    dist_context.data = false;
1473    Ok(dist_context)
1474}
1475
1476// See tests in datafusion/core/tests/physical_optimizer