datafusion_physical_optimizer/enforce_distribution.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::any::Any;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28use crate::optimizer::PhysicalOptimizerRule;
29use crate::output_requirements::OutputRequirementExec;
30use crate::utils::{
31 add_sort_above_with_check, is_coalesce_partitions, is_repartition,
32 is_sort_preserving_merge,
33};
34
35use arrow::compute::SortOptions;
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::error::Result;
38use datafusion_common::stats::Precision;
39use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
40use datafusion_expr::logical_plan::{Aggregate, JoinType};
41use datafusion_physical_expr::expressions::{Column, NoOp};
42use datafusion_physical_expr::utils::map_columns_before_projection;
43use datafusion_physical_expr::{
44 EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
45};
46use datafusion_physical_plan::ExecutionPlanProperties;
47use datafusion_physical_plan::aggregates::{
48 AggregateExec, AggregateMode, PhysicalGroupBy,
49};
50use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
51use datafusion_physical_plan::execution_plan::EmissionType;
52use datafusion_physical_plan::joins::{
53 CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
54};
55use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
56use datafusion_physical_plan::repartition::RepartitionExec;
57use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
58use datafusion_physical_plan::tree_node::PlanContext;
59use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
60use datafusion_physical_plan::windows::WindowAggExec;
61use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
62use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
63
64use itertools::izip;
65
66/// The `EnforceDistribution` rule ensures that distribution requirements are
67/// met. In doing so, this rule will increase the parallelism in the plan by
68/// introducing repartitioning operators to the physical plan.
69///
70/// For example, given an input such as:
71///
72///
73/// ```text
74/// ┌─────────────────────────────────┐
75/// │ │
76/// │ ExecutionPlan │
77/// │ │
78/// └─────────────────────────────────┘
79/// ▲ ▲
80/// │ │
81/// ┌─────┘ └─────┐
82/// │ │
83/// │ │
84/// │ │
85/// ┌───────────┐ ┌───────────┐
86/// │ │ │ │
87/// │ batch A1 │ │ batch B1 │
88/// │ │ │ │
89/// ├───────────┤ ├───────────┤
90/// │ │ │ │
91/// │ batch A2 │ │ batch B2 │
92/// │ │ │ │
93/// ├───────────┤ ├───────────┤
94/// │ │ │ │
95/// │ batch A3 │ │ batch B3 │
96/// │ │ │ │
97/// └───────────┘ └───────────┘
98///
99/// Input Input
100/// A B
101/// ```
102///
103/// This rule will attempt to add a `RepartitionExec` to increase parallelism
104/// (to 3, in this case) and create the following arrangement:
105///
106/// ```text
107/// ┌─────────────────────────────────┐
108/// │ │
109/// │ ExecutionPlan │
110/// │ │
111/// └─────────────────────────────────┘
112/// ▲ ▲ ▲ Input now has 3
113/// │ │ │ partitions
114/// ┌───────┘ │ └───────┐
115/// │ │ │
116/// │ │ │
117/// ┌───────────┐ ┌───────────┐ ┌───────────┐
118/// │ │ │ │ │ │
119/// │ batch A1 │ │ batch A3 │ │ batch B3 │
120/// │ │ │ │ │ │
121/// ├───────────┤ ├───────────┤ ├───────────┤
122/// │ │ │ │ │ │
123/// │ batch B2 │ │ batch B1 │ │ batch A2 │
124/// │ │ │ │ │ │
125/// └───────────┘ └───────────┘ └───────────┘
126/// ▲ ▲ ▲
127/// │ │ │
128/// └─────────┐ │ ┌──────────┘
129/// │ │ │
130/// │ │ │
131/// ┌─────────────────────────────────┐ batches are
132/// │ RepartitionExec(3) │ repartitioned
133/// │ RoundRobin │
134/// │ │
135/// └─────────────────────────────────┘
136/// ▲ ▲
137/// │ │
138/// ┌─────┘ └─────┐
139/// │ │
140/// │ │
141/// │ │
142/// ┌───────────┐ ┌───────────┐
143/// │ │ │ │
144/// │ batch A1 │ │ batch B1 │
145/// │ │ │ │
146/// ├───────────┤ ├───────────┤
147/// │ │ │ │
148/// │ batch A2 │ │ batch B2 │
149/// │ │ │ │
150/// ├───────────┤ ├───────────┤
151/// │ │ │ │
152/// │ batch A3 │ │ batch B3 │
153/// │ │ │ │
154/// └───────────┘ └───────────┘
155///
156///
157/// Input Input
158/// A B
159/// ```
160///
161/// The `EnforceDistribution` rule
162/// - is idempotent; i.e. it can be applied multiple times, each time producing
163/// the same result.
164/// - always produces a valid plan in terms of distribution requirements. Its
165/// input plan can be valid or invalid with respect to distribution requirements,
166/// but the output plan will always be valid.
167/// - produces a valid plan in terms of ordering requirements, *if* its input is
168/// a valid plan in terms of ordering requirements. If the input plan is invalid,
169/// this rule does not attempt to fix it as doing so is the responsibility of the
170/// `EnforceSorting` rule.
171///
172/// Note that distribution requirements are met in the strictest way. This may
173/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
174/// meeting the requirements in the strictest way may help avoid possible data
175/// skew in joins.
176///
177/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
178/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
179/// (a, c), (b, c), (a), (b), (c) and ( ).
180///
181/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
182/// by a HashPartition(a, b, c).
183#[derive(Default, Debug)]
184pub struct EnforceDistribution {}
185
186impl EnforceDistribution {
187 #[expect(missing_docs)]
188 pub fn new() -> Self {
189 Self {}
190 }
191}
192
193impl PhysicalOptimizerRule for EnforceDistribution {
194 fn optimize(
195 &self,
196 plan: Arc<dyn ExecutionPlan>,
197 config: &ConfigOptions,
198 ) -> Result<Arc<dyn ExecutionPlan>> {
199 let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
200
201 let adjusted = if top_down_join_key_reordering {
202 // Run a top-down process to adjust input key ordering recursively
203 let plan_requirements = PlanWithKeyRequirements::new_default(plan);
204 let adjusted = plan_requirements
205 .transform_down(adjust_input_keys_ordering)
206 .data()?;
207 adjusted.plan
208 } else {
209 // Run a bottom-up process
210 plan.transform_up(|plan| {
211 Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
212 })
213 .data()?
214 };
215
216 let distribution_context = DistributionContext::new_default(adjusted);
217 // Distribution enforcement needs to be applied bottom-up.
218 let distribution_context = distribution_context
219 .transform_up(|distribution_context| {
220 ensure_distribution(distribution_context, config)
221 })
222 .data()?;
223 Ok(distribution_context.plan)
224 }
225
226 fn name(&self) -> &str {
227 "EnforceDistribution"
228 }
229
230 fn schema_check(&self) -> bool {
231 true
232 }
233}
234
235#[derive(Debug, Clone)]
236struct JoinKeyPairs {
237 left_keys: Vec<Arc<dyn PhysicalExpr>>,
238 right_keys: Vec<Arc<dyn PhysicalExpr>>,
239}
240
241/// Keeps track of parent required key orderings.
242pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
243
244/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
245/// That might not match with the output partitioning of the join node's children
246/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
247///
248/// Example:
249/// TopJoin on (a, b, c)
250/// bottom left join on(b, a, c)
251/// bottom right join on(c, b, a)
252///
253/// Will be adjusted to:
254/// TopJoin on (a, b, c)
255/// bottom left join on(a, b, c)
256/// bottom right join on(a, b, c)
257///
258/// Example:
259/// TopJoin on (a, b, c)
260/// Agg1 group by (b, a, c)
261/// Agg2 group by (c, b, a)
262///
263/// Will be adjusted to:
264/// TopJoin on (a, b, c)
265/// Projection(b, a, c)
266/// Agg1 group by (a, b, c)
267/// Projection(c, b, a)
268/// Agg2 group by (a, b, c)
269///
270/// Following is the explanation of the reordering process:
271///
272/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
273/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
275/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
276///
277/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
278/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
279/// Requirements is already satisfied, clear all the requirements, return the unchanged plan.
280/// Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
281///
282/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
283/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
284/// 5) For other types of operators, by default, pushdown the parent requirements to children.
285pub fn adjust_input_keys_ordering(
286 mut requirements: PlanWithKeyRequirements,
287) -> Result<Transformed<PlanWithKeyRequirements>> {
288 let plan = Arc::clone(&requirements.plan);
289
290 if let Some(
291 exec @ HashJoinExec {
292 left,
293 on,
294 join_type,
295 mode,
296 ..
297 },
298 ) = plan.downcast_ref::<HashJoinExec>()
299 {
300 match mode {
301 PartitionMode::Partitioned => {
302 let join_constructor = |new_conditions: (
303 Vec<(PhysicalExprRef, PhysicalExprRef)>,
304 Vec<SortOptions>,
305 )| {
306 exec.builder()
307 .with_partition_mode(PartitionMode::Partitioned)
308 .with_on(new_conditions.0)
309 .build_exec()
310 };
311 return reorder_partitioned_join_keys(
312 requirements,
313 on,
314 &[],
315 &join_constructor,
316 )
317 .map(Transformed::yes);
318 }
319 PartitionMode::CollectLeft => {
320 // Push down requirements to the right side
321 requirements.children[1].data = match join_type {
322 JoinType::Inner | JoinType::Right => shift_right_required(
323 &requirements.data,
324 left.schema().fields().len(),
325 )
326 .unwrap_or_default(),
327 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
328 requirements.data.clone()
329 }
330 JoinType::Left
331 | JoinType::LeftSemi
332 | JoinType::LeftAnti
333 | JoinType::Full
334 | JoinType::LeftMark => vec![],
335 };
336 }
337 PartitionMode::Auto => {
338 // Can not satisfy, clear the current requirements and generate new empty requirements
339 requirements.data.clear();
340 }
341 }
342 } else if let Some(CrossJoinExec { left, .. }) = plan.downcast_ref::<CrossJoinExec>()
343 {
344 let left_columns_len = left.schema().fields().len();
345 // Push down requirements to the right side
346 requirements.children[1].data =
347 shift_right_required(&requirements.data, left_columns_len)
348 .unwrap_or_default();
349 } else if let Some(SortMergeJoinExec {
350 left,
351 right,
352 on,
353 filter,
354 join_type,
355 sort_options,
356 null_equality,
357 ..
358 }) = plan.downcast_ref::<SortMergeJoinExec>()
359 {
360 let join_constructor = |new_conditions: (
361 Vec<(PhysicalExprRef, PhysicalExprRef)>,
362 Vec<SortOptions>,
363 )| {
364 SortMergeJoinExec::try_new(
365 Arc::clone(left),
366 Arc::clone(right),
367 new_conditions.0,
368 filter.clone(),
369 *join_type,
370 new_conditions.1,
371 *null_equality,
372 )
373 .map(|e| Arc::new(e) as _)
374 };
375 return reorder_partitioned_join_keys(
376 requirements,
377 on,
378 sort_options,
379 &join_constructor,
380 )
381 .map(Transformed::yes);
382 } else if let Some(aggregate_exec) = plan.downcast_ref::<AggregateExec>() {
383 if !requirements.data.is_empty() {
384 if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
385 return reorder_aggregate_keys(requirements, aggregate_exec)
386 .map(Transformed::yes);
387 } else {
388 requirements.data.clear();
389 }
390 } else {
391 // Keep everything unchanged
392 return Ok(Transformed::no(requirements));
393 }
394 } else if let Some(proj) = plan.downcast_ref::<ProjectionExec>() {
395 let expr = proj.expr();
396 // For Projection, we need to transform the requirements to the columns before the Projection
397 // And then to push down the requirements
398 // Construct a mapping from new name to the original Column
399 let proj_exprs: Vec<_> = expr
400 .iter()
401 .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
402 .collect();
403 let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
404 if new_required.len() == requirements.data.len() {
405 requirements.children[0].data = new_required;
406 } else {
407 // Can not satisfy, clear the current requirements and generate new empty requirements
408 requirements.data.clear();
409 }
410 } else if plan.is::<RepartitionExec>()
411 || plan.is::<CoalescePartitionsExec>()
412 || plan.is::<WindowAggExec>()
413 {
414 requirements.data.clear();
415 } else if requirements.data.is_empty() {
416 // No requirements to push down and no plan changes — skip rebuild.
417 return Ok(Transformed::no(requirements));
418 } else {
419 // By default, push down the parent requirements to children
420 for child in requirements.children.iter_mut() {
421 child.data.clone_from(&requirements.data);
422 }
423 }
424 Ok(Transformed::yes(requirements))
425}
426
427pub fn reorder_partitioned_join_keys<F>(
428 mut join_plan: PlanWithKeyRequirements,
429 on: &[(PhysicalExprRef, PhysicalExprRef)],
430 sort_options: &[SortOptions],
431 join_constructor: &F,
432) -> Result<PlanWithKeyRequirements>
433where
434 F: Fn(
435 (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
436 ) -> Result<Arc<dyn ExecutionPlan>>,
437{
438 let parent_required = &join_plan.data;
439 let join_key_pairs = extract_join_keys(on);
440 let eq_properties = join_plan.plan.equivalence_properties();
441
442 let (
443 JoinKeyPairs {
444 left_keys,
445 right_keys,
446 },
447 positions,
448 ) = try_reorder(join_key_pairs, parent_required, eq_properties);
449
450 if let Some(positions) = positions
451 && !positions.is_empty()
452 {
453 let new_join_on = new_join_conditions(&left_keys, &right_keys);
454 let new_sort_options = (0..sort_options.len())
455 .map(|idx| sort_options[positions[idx]])
456 .collect();
457 join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
458 }
459
460 join_plan.children[0].data = left_keys;
461 join_plan.children[1].data = right_keys;
462 Ok(join_plan)
463}
464
465pub fn reorder_aggregate_keys(
466 mut agg_node: PlanWithKeyRequirements,
467 agg_exec: &AggregateExec,
468) -> Result<PlanWithKeyRequirements> {
469 let parent_required = &agg_node.data;
470 let output_columns = agg_exec
471 .group_expr()
472 .expr()
473 .iter()
474 .enumerate()
475 .map(|(index, (_, name))| Column::new(name, index))
476 .collect::<Vec<_>>();
477
478 let output_exprs = output_columns
479 .iter()
480 .map(|c| Arc::new(c.clone()) as _)
481 .collect::<Vec<_>>();
482
483 if parent_required.len() == output_exprs.len()
484 && agg_exec.group_expr().null_expr().is_empty()
485 && !physical_exprs_equal(&output_exprs, parent_required)
486 && let Some(positions) = expected_expr_positions(&output_exprs, parent_required)
487 && let Some(agg_exec) = agg_exec.input().downcast_ref::<AggregateExec>()
488 && *agg_exec.mode() == AggregateMode::Partial
489 {
490 let group_exprs = agg_exec.group_expr().expr();
491 let new_group_exprs = positions
492 .into_iter()
493 .map(|idx| group_exprs[idx].clone())
494 .collect();
495 let partial_agg = Arc::new(AggregateExec::try_new(
496 AggregateMode::Partial,
497 PhysicalGroupBy::new_single(new_group_exprs),
498 agg_exec.aggr_expr().to_vec(),
499 agg_exec.filter_expr().to_vec(),
500 Arc::clone(agg_exec.input()),
501 Arc::clone(&agg_exec.input_schema),
502 )?);
503 // Build new group expressions that correspond to the output
504 // of the "reordered" aggregator:
505 let group_exprs = partial_agg.group_expr().expr();
506 let new_group_by = PhysicalGroupBy::new_single(
507 partial_agg
508 .output_group_expr()
509 .into_iter()
510 .enumerate()
511 .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
512 .collect(),
513 );
514 let new_final_agg = Arc::new(AggregateExec::try_new(
515 AggregateMode::FinalPartitioned,
516 new_group_by,
517 agg_exec.aggr_expr().to_vec(),
518 agg_exec.filter_expr().to_vec(),
519 Arc::clone(&partial_agg) as _,
520 agg_exec.input_schema(),
521 )?);
522
523 agg_node.plan = Arc::clone(&new_final_agg) as _;
524 agg_node.data.clear();
525 agg_node.children = vec![PlanWithKeyRequirements::new(
526 partial_agg as _,
527 vec![],
528 agg_node.children.swap_remove(0).children,
529 )];
530
531 // Need to create a new projection to change the expr ordering back
532 let agg_schema = new_final_agg.schema();
533 let mut proj_exprs = output_columns
534 .iter()
535 .map(|col| {
536 let name = col.name();
537 let index = agg_schema.index_of(name)?;
538 Ok(ProjectionExpr {
539 expr: Arc::new(Column::new(name, index)) as _,
540 alias: name.to_owned(),
541 })
542 })
543 .collect::<Result<Vec<_>>>()?;
544 let agg_fields = agg_schema.fields();
545 for (idx, field) in agg_fields.iter().enumerate().skip(output_columns.len()) {
546 let name = field.name();
547 let plan = Arc::new(Column::new(name, idx)) as _;
548 proj_exprs.push(ProjectionExpr {
549 expr: plan,
550 alias: name.clone(),
551 })
552 }
553 return ProjectionExec::try_new(proj_exprs, new_final_agg)
554 .map(|p| PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]));
555 }
556 Ok(agg_node)
557}
558
559fn shift_right_required(
560 parent_required: &[Arc<dyn PhysicalExpr>],
561 left_columns_len: usize,
562) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
563 let new_right_required = parent_required
564 .iter()
565 .filter_map(|r| {
566 (r.as_ref() as &dyn Any)
567 .downcast_ref::<Column>()
568 .and_then(|col| {
569 col.index()
570 .checked_sub(left_columns_len)
571 .map(|index| Arc::new(Column::new(col.name(), index)) as _)
572 })
573 })
574 .collect::<Vec<_>>();
575
576 // if the parent required are all coming from the right side, the requirements can be pushdown
577 (new_right_required.len() == parent_required.len()).then_some(new_right_required)
578}
579
580/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
581/// That might not match with the output partitioning of the join node's children
582/// This method will try to change the ordering of the join keys to match with the
583/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
584/// match with one, either the left side or the right side.
585///
586/// Example:
587/// TopJoin on (a, b, c)
588/// bottom left join on(b, a, c)
589/// bottom right join on(c, b, a)
590///
591/// Will be adjusted to:
592/// TopJoin on (b, a, c)
593/// bottom left join on(b, a, c)
594/// bottom right join on(c, b, a)
595///
596/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
597/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
598/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
599/// and then can't apply the Top-Down reordering process.
600pub fn reorder_join_keys_to_inputs(
601 plan: Arc<dyn ExecutionPlan>,
602) -> Result<Arc<dyn ExecutionPlan>> {
603 if let Some(
604 exec @ HashJoinExec {
605 left,
606 right,
607 on,
608 mode,
609 ..
610 },
611 ) = plan.downcast_ref::<HashJoinExec>()
612 {
613 if *mode == PartitionMode::Partitioned {
614 let (join_keys, positions) = reorder_current_join_keys(
615 extract_join_keys(on),
616 Some(left.output_partitioning()),
617 Some(right.output_partitioning()),
618 left.equivalence_properties(),
619 right.equivalence_properties(),
620 );
621 if positions.is_some_and(|idxs| !idxs.is_empty()) {
622 let JoinKeyPairs {
623 left_keys,
624 right_keys,
625 } = join_keys;
626 let new_join_on = new_join_conditions(&left_keys, &right_keys);
627 return exec
628 .builder()
629 .with_partition_mode(PartitionMode::Partitioned)
630 .with_on(new_join_on)
631 .build_exec();
632 }
633 }
634 } else if let Some(SortMergeJoinExec {
635 left,
636 right,
637 on,
638 filter,
639 join_type,
640 sort_options,
641 null_equality,
642 ..
643 }) = plan.downcast_ref::<SortMergeJoinExec>()
644 {
645 let (join_keys, positions) = reorder_current_join_keys(
646 extract_join_keys(on),
647 Some(left.output_partitioning()),
648 Some(right.output_partitioning()),
649 left.equivalence_properties(),
650 right.equivalence_properties(),
651 );
652 if let Some(positions) = positions
653 && !positions.is_empty()
654 {
655 let JoinKeyPairs {
656 left_keys,
657 right_keys,
658 } = join_keys;
659 let new_join_on = new_join_conditions(&left_keys, &right_keys);
660 let new_sort_options = (0..sort_options.len())
661 .map(|idx| sort_options[positions[idx]])
662 .collect();
663 return SortMergeJoinExec::try_new(
664 Arc::clone(left),
665 Arc::clone(right),
666 new_join_on,
667 filter.clone(),
668 *join_type,
669 new_sort_options,
670 *null_equality,
671 )
672 .map(|smj| Arc::new(smj) as _);
673 }
674 }
675 Ok(plan)
676}
677
678/// Reorder the current join keys ordering based on either left partition or right partition
679fn reorder_current_join_keys(
680 join_keys: JoinKeyPairs,
681 left_partition: Option<&Partitioning>,
682 right_partition: Option<&Partitioning>,
683 left_equivalence_properties: &EquivalenceProperties,
684 right_equivalence_properties: &EquivalenceProperties,
685) -> (JoinKeyPairs, Option<Vec<usize>>) {
686 match (left_partition, right_partition) {
687 (Some(Partitioning::Hash(left_exprs, _)), _) => {
688 match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
689 (join_keys, None) => reorder_current_join_keys(
690 join_keys,
691 None,
692 right_partition,
693 left_equivalence_properties,
694 right_equivalence_properties,
695 ),
696 result => result,
697 }
698 }
699 (_, Some(Partitioning::Hash(right_exprs, _))) => {
700 try_reorder(join_keys, right_exprs, right_equivalence_properties)
701 }
702 _ => (join_keys, None),
703 }
704}
705
706fn try_reorder(
707 join_keys: JoinKeyPairs,
708 expected: &[Arc<dyn PhysicalExpr>],
709 equivalence_properties: &EquivalenceProperties,
710) -> (JoinKeyPairs, Option<Vec<usize>>) {
711 let eq_groups = equivalence_properties.eq_group();
712 let mut normalized_expected = vec![];
713 let mut normalized_left_keys = vec![];
714 let mut normalized_right_keys = vec![];
715 if join_keys.left_keys.len() != expected.len() {
716 return (join_keys, None);
717 }
718 if physical_exprs_equal(expected, &join_keys.left_keys)
719 || physical_exprs_equal(expected, &join_keys.right_keys)
720 {
721 return (join_keys, Some(vec![]));
722 } else if !equivalence_properties.eq_group().is_empty() {
723 normalized_expected = expected
724 .iter()
725 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
726 .collect();
727
728 normalized_left_keys = join_keys
729 .left_keys
730 .iter()
731 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
732 .collect();
733
734 normalized_right_keys = join_keys
735 .right_keys
736 .iter()
737 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
738 .collect();
739
740 if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
741 || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
742 {
743 return (join_keys, Some(vec![]));
744 }
745 }
746
747 let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
748 .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
749 .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
750 .or_else(|| {
751 expected_expr_positions(&normalized_right_keys, &normalized_expected)
752 })
753 else {
754 return (join_keys, None);
755 };
756
757 let mut new_left_keys = vec![];
758 let mut new_right_keys = vec![];
759 for pos in positions.iter() {
760 new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
761 new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
762 }
763 let pairs = JoinKeyPairs {
764 left_keys: new_left_keys,
765 right_keys: new_right_keys,
766 };
767
768 (pairs, Some(positions))
769}
770
771/// Return the expected expressions positions.
772/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
773///
774/// This method will return a Vec [3, 0, 1, 2]
775fn expected_expr_positions(
776 current: &[Arc<dyn PhysicalExpr>],
777 expected: &[Arc<dyn PhysicalExpr>],
778) -> Option<Vec<usize>> {
779 if current.is_empty() || expected.is_empty() {
780 return None;
781 }
782 let mut indexes: Vec<usize> = vec![];
783 let mut current = current.to_vec();
784 for expr in expected.iter() {
785 // Find the position of the expected expr in the current expressions
786 if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
787 current[expected_position] = Arc::new(NoOp::new());
788 indexes.push(expected_position);
789 } else {
790 return None;
791 }
792 }
793 Some(indexes)
794}
795
796fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
797 let (left_keys, right_keys) = on
798 .iter()
799 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
800 .unzip();
801 JoinKeyPairs {
802 left_keys,
803 right_keys,
804 }
805}
806
807fn new_join_conditions(
808 new_left_keys: &[Arc<dyn PhysicalExpr>],
809 new_right_keys: &[Arc<dyn PhysicalExpr>],
810) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
811 new_left_keys
812 .iter()
813 .zip(new_right_keys.iter())
814 .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
815 .collect()
816}
817
818/// Adds RoundRobin repartition operator to the plan increase parallelism.
819///
820/// # Arguments
821///
822/// * `input`: Current node.
823/// * `n_target`: desired target partition number, if partition number of the
824/// current executor is less than this value. Partition number will be increased.
825///
826/// # Returns
827///
828/// A [`Result`] object that contains new execution plan where the desired
829/// partition number is achieved by adding a RoundRobin repartition.
830fn add_roundrobin_on_top(
831 input: DistributionContext,
832 n_target: usize,
833) -> Result<DistributionContext> {
834 // Adding repartition is helpful:
835 if input.plan.output_partitioning().partition_count() < n_target {
836 // When there is an existing ordering, we preserve ordering
837 // during repartition. This will be un-done in the future
838 // If any of the following conditions is true
839 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
840 // - Usage of order preserving variants is not desirable
841 // (determined by flag `config.optimizer.prefer_existing_sort`)
842 let partitioning = Partitioning::RoundRobinBatch(n_target);
843 let repartition =
844 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
845 .with_preserve_order();
846
847 let new_plan = Arc::new(repartition) as _;
848
849 Ok(DistributionContext::new(new_plan, true, vec![input]))
850 } else {
851 // Partition is not helpful, we already have desired number of partitions.
852 Ok(input)
853 }
854}
855
856/// Adds a hash repartition operator:
857/// - to increase parallelism, and/or
858/// - to satisfy requirements of the subsequent operators.
859///
860/// Repartition(Hash) is added on top of operator `input`.
861///
862/// # Arguments
863///
864/// * `input`: Current node.
865/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
866/// * `n_target`: desired target partition number, if partition number of the
867/// current executor is less than this value. Partition number will be increased.
868/// * `allow_subset_satisfy_partitioning`: Whether to allow subset partitioning logic in satisfaction checks.
869/// Set to `false` for partitioned hash joins to ensure exact hash matching.
870///
871/// # Returns
872///
873/// A [`Result`] object that contains new execution plan where the desired
874/// distribution is satisfied by adding a Hash repartition.
875fn add_hash_on_top(
876 input: DistributionContext,
877 hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
878 n_target: usize,
879 allow_subset_satisfy_partitioning: bool,
880) -> Result<DistributionContext> {
881 // Early return if hash repartition is unnecessary
882 // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
883 if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
884 return Ok(input);
885 }
886
887 let dist = Distribution::HashPartitioned(hash_exprs);
888 let satisfaction = input.plan.output_partitioning().satisfaction(
889 &dist,
890 input.plan.equivalence_properties(),
891 allow_subset_satisfy_partitioning,
892 );
893
894 // Add hash repartitioning when:
895 // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied
896 // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism
897 let needs_repartition = if allow_subset_satisfy_partitioning {
898 !satisfaction.is_satisfied()
899 } else {
900 !satisfaction.is_satisfied()
901 || n_target > input.plan.output_partitioning().partition_count()
902 };
903
904 if needs_repartition {
905 // When there is an existing ordering, we preserve ordering during
906 // repartition. This will be rolled back in the future if any of the
907 // following conditions is true:
908 // - Preserving ordering is not helpful in terms of satisfying ordering
909 // requirements.
910 // - Usage of order preserving variants is not desirable (per the flag
911 // `config.optimizer.prefer_existing_sort`).
912 let partitioning = dist.create_partitioning(n_target);
913 let repartition =
914 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
915 .with_preserve_order();
916 let plan = Arc::new(repartition) as _;
917
918 return Ok(DistributionContext::new(plan, true, vec![input]));
919 }
920
921 Ok(input)
922}
923
924/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
925/// on top of the given plan node to satisfy a single partition requirement
926/// while preserving ordering constraints.
927///
928/// # Parameters
929///
930/// * `input`: Current node.
931///
932/// Checks whether preserving the child's ordering enables the parent to
933/// run in streaming mode. Compares the parent's pipeline behavior with
934/// the ordered child vs. an unordered (coalesced) child. If removing the
935/// ordering would cause the parent to switch from streaming to blocking,
936/// keeping the order-preserving variant is beneficial.
937///
938/// Only applicable to single-child operators; returns `Ok(false)` for
939/// multi-child operators (e.g. joins) where child substitution semantics are
940/// ambiguous.
941fn preserving_order_enables_streaming(
942 parent: &Arc<dyn ExecutionPlan>,
943 ordered_child: &Arc<dyn ExecutionPlan>,
944) -> Result<bool> {
945 // Only applicable to single-child operators that maintain input order
946 // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
947 // maintain input order (e.g. SortExec) handle ordering themselves —
948 // preserving SPM for them is unnecessary.
949 if parent.children().len() != 1 {
950 return Ok(false);
951 }
952 if !parent.maintains_input_order()[0] {
953 return Ok(false);
954 }
955 // Build parent with the ordered child
956 let with_ordered =
957 Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
958 if with_ordered.pipeline_behavior() == EmissionType::Final {
959 // Parent is blocking even with ordering — no benefit
960 return Ok(false);
961 }
962 // Build parent with an unordered child via CoalescePartitionsExec.
963 let unordered_child: Arc<dyn ExecutionPlan> =
964 Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
965 let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?;
966 Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
967}
968
969/// # Returns
970///
971/// Updated node with an execution plan, where the desired single distribution
972/// requirement is satisfied.
973fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
974 // Apply only when the partition count is larger than one.
975 if input.plan.output_partitioning().partition_count() > 1 {
976 // When there is an existing ordering, we preserve ordering
977 // when decreasing partitions. This will be un-done in the future
978 // if any of the following conditions is true
979 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
980 // - Usage of order preserving variants is not desirable
981 // (determined by flag `config.optimizer.prefer_existing_sort`)
982 let new_plan = if let Some(req) = input.plan.output_ordering() {
983 Arc::new(SortPreservingMergeExec::new(
984 req.clone(),
985 Arc::clone(&input.plan),
986 )) as _
987 } else {
988 // If there is no input order, we can simply coalesce partitions:
989 Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
990 };
991
992 DistributionContext::new(new_plan, true, vec![input])
993 } else {
994 input
995 }
996}
997
998/// Updates the physical plan inside [`DistributionContext`] so that distribution
999/// changing operators are removed from the top. If they are necessary, they will
1000/// be added in subsequent stages.
1001///
1002/// Assume that following plan is given:
1003/// ```text
1004/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1005/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1006/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1007/// ```
1008///
1009/// Since `RepartitionExec`s change the distribution, this function removes
1010/// them and returns following plan:
1011///
1012/// ```text
1013/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1014/// ```
1015fn remove_dist_changing_operators(
1016 mut distribution_context: DistributionContext,
1017) -> Result<DistributionContext> {
1018 while is_repartition(&distribution_context.plan)
1019 || is_coalesce_partitions(&distribution_context.plan)
1020 || is_sort_preserving_merge(&distribution_context.plan)
1021 {
1022 // All of above operators have a single child. First child is only child.
1023 // Remove any distribution changing operators at the beginning:
1024 distribution_context = distribution_context.children.swap_remove(0);
1025 // Note that they will be re-inserted later on if necessary or helpful.
1026 }
1027
1028 Ok(distribution_context)
1029}
1030
1031/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
1032///
1033/// Assume that following plan is given:
1034/// ```text
1035/// "SortPreservingMergeExec: \[a@0 ASC]"
1036/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1037/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1038/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1039/// ```
1040///
1041/// This function converts plan above to the following:
1042///
1043/// ```text
1044/// "CoalescePartitionsExec"
1045/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1046/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1047/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1048/// ```
1049pub fn replace_order_preserving_variants(
1050 mut context: DistributionContext,
1051) -> Result<DistributionContext> {
1052 context.children = context
1053 .children
1054 .into_iter()
1055 .map(|child| {
1056 if child.data {
1057 replace_order_preserving_variants(child)
1058 } else {
1059 Ok(child)
1060 }
1061 })
1062 .collect::<Result<Vec<_>>>()?;
1063
1064 if is_sort_preserving_merge(&context.plan) {
1065 let child_plan = Arc::clone(&context.children[0].plan);
1066 context.plan = Arc::new(
1067 CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1068 );
1069 return Ok(context);
1070 } else if let Some(repartition) = context.plan.downcast_ref::<RepartitionExec>()
1071 && repartition.preserve_order()
1072 {
1073 context.plan = Arc::new(RepartitionExec::try_new(
1074 Arc::clone(&context.children[0].plan),
1075 repartition.partitioning().clone(),
1076 )?);
1077 return Ok(context);
1078 }
1079
1080 context.update_plan_from_children()
1081}
1082
1083/// A struct to keep track of repartition requirements for each child node.
1084struct RepartitionRequirementStatus {
1085 /// The distribution requirement for the node.
1086 requirement: Distribution,
1087 /// Designates whether round robin partitioning is theoretically beneficial;
1088 /// i.e. the operator can actually utilize parallelism.
1089 roundrobin_beneficial: bool,
1090 /// Designates whether round robin partitioning is beneficial according to
1091 /// the statistical information we have on the number of rows.
1092 roundrobin_beneficial_stats: bool,
1093 /// Designates whether hash partitioning is necessary.
1094 hash_necessary: bool,
1095}
1096
1097/// Calculates the `RepartitionRequirementStatus` for each children to generate
1098/// consistent and sensible (in terms of performance) distribution requirements.
1099/// As an example, a hash join's left (build) child might produce
1100///
1101/// ```text
1102/// RepartitionRequirementStatus {
1103/// ..,
1104/// hash_necessary: true
1105/// }
1106/// ```
1107///
1108/// while its right (probe) child might have very few rows and produce:
1109///
1110/// ```text
1111/// RepartitionRequirementStatus {
1112/// ..,
1113/// hash_necessary: false
1114/// }
1115/// ```
1116///
1117/// These statuses are not consistent as all children should agree on hash
1118/// partitioning. This function aligns the statuses to generate consistent
1119/// hash partitions for each children. After alignment, the right child's
1120/// status would turn into:
1121///
1122/// ```text
1123/// RepartitionRequirementStatus {
1124/// ..,
1125/// hash_necessary: true
1126/// }
1127/// ```
1128fn get_repartition_requirement_status(
1129 plan: &Arc<dyn ExecutionPlan>,
1130 batch_size: usize,
1131 should_use_estimates: bool,
1132) -> Result<Vec<RepartitionRequirementStatus>> {
1133 let mut needs_alignment = false;
1134 let children = plan.children();
1135 let rr_beneficial = plan.benefits_from_input_partitioning();
1136 let requirements = plan.required_input_distribution();
1137 let mut repartition_status_flags = vec![];
1138 for (child, requirement, roundrobin_beneficial) in
1139 izip!(children.into_iter(), requirements, rr_beneficial)
1140 {
1141 // Decide whether adding a round robin is beneficial depending on
1142 // the statistical information we have on the number of rows:
1143 let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1144 {
1145 Precision::Exact(n_rows) => n_rows > batch_size,
1146 Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1147 Precision::Absent => true,
1148 };
1149 let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1150 // Hash re-partitioning is necessary when the input has more than one
1151 // partitions:
1152 let multi_partitions = child.output_partitioning().partition_count() > 1;
1153 let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1154 needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1155 repartition_status_flags.push((
1156 is_hash,
1157 RepartitionRequirementStatus {
1158 requirement,
1159 roundrobin_beneficial,
1160 roundrobin_beneficial_stats,
1161 hash_necessary: is_hash && multi_partitions,
1162 },
1163 ));
1164 }
1165 // Align hash necessary flags for hash partitions to generate consistent
1166 // hash partitions at each children:
1167 if needs_alignment {
1168 // When there is at least one hash requirement that is necessary or
1169 // beneficial according to statistics, make all children require hash
1170 // repartitioning:
1171 for (is_hash, status) in &mut repartition_status_flags {
1172 if *is_hash {
1173 status.hash_necessary = true;
1174 }
1175 }
1176 }
1177 Ok(repartition_status_flags
1178 .into_iter()
1179 .map(|(_, status)| status)
1180 .collect())
1181}
1182
1183/// This function checks whether we need to add additional data exchange
1184/// operators to satisfy distribution requirements. Since this function
1185/// takes care of such requirements, we should avoid manually adding data
1186/// exchange operators in other places.
1187///
1188/// This function is intended to be used in a bottom up traversal, as it
1189/// can first repartition (or newly partition) at the datasources -- these
1190/// source partitions may be later repartitioned with additional data exchange operators.
1191pub fn ensure_distribution(
1192 dist_context: DistributionContext,
1193 config: &ConfigOptions,
1194) -> Result<Transformed<DistributionContext>> {
1195 let dist_context = update_children(dist_context)?;
1196
1197 if dist_context.plan.children().is_empty() {
1198 return Ok(Transformed::no(dist_context));
1199 }
1200
1201 let target_partitions = config.execution.target_partitions;
1202 // When `false`, round robin repartition will not be added to increase parallelism
1203 let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1204 let repartition_file_scans = config.optimizer.repartition_file_scans;
1205 let batch_size = config.execution.batch_size;
1206 let should_use_estimates = config
1207 .execution
1208 .use_row_number_estimates_to_optimize_partitioning;
1209 let subset_satisfaction_threshold = config.optimizer.subset_repartition_threshold;
1210 let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1211 && matches!(
1212 dist_context.plan.pipeline_behavior(),
1213 EmissionType::Incremental | EmissionType::Both
1214 );
1215 // Use order preserving variants either of the conditions true
1216 // - it is desired according to config
1217 // - when plan is unbounded
1218 // - when it is pipeline friendly (can incrementally produce results)
1219 let order_preserving_variants_desirable =
1220 unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1221
1222 // Remove unnecessary repartition from the physical plan if any
1223 let DistributionContext {
1224 mut plan,
1225 data,
1226 children,
1227 } = remove_dist_changing_operators(dist_context)?;
1228
1229 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
1230 if let Some(updated_window) = get_best_fitting_window(
1231 exec.window_expr(),
1232 exec.input(),
1233 &exec.partition_keys(),
1234 )? {
1235 plan = updated_window;
1236 }
1237 } else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>()
1238 && let Some(updated_window) = get_best_fitting_window(
1239 exec.window_expr(),
1240 exec.input(),
1241 &exec.partition_keys(),
1242 )?
1243 {
1244 plan = updated_window;
1245 };
1246
1247 // For joins in partitioned mode, we need exact hash matching between
1248 // both sides, so subset partitioning logic must be disabled.
1249 //
1250 // Why: Different hash expressions produce different hash values, causing
1251 // rows with the same join key to land in different partitions. Since
1252 // partitioned joins match partition N left with partition N right, rows
1253 // that should match may be in different partitions and miss each other.
1254 //
1255 // Example JOIN ON left.a = right.a:
1256 //
1257 // Left: Hash([a])
1258 // Partition 1: a=1
1259 // Partition 2: a=2
1260 //
1261 // Right: Hash([a, b])
1262 // Partition 1: (a=1, b=1) -> Same a=1
1263 // Partition 2: (a=2, b=2)
1264 // Partition 3: (a=1, b=2) -> Same a=1
1265 //
1266 // Partitioned join execution:
1267 // P1 left (a=1) joins P1 right (a=1, b=1) -> Match
1268 // P2 left (a=2) joins P2 right (a=2, b=2) -> Match
1269 // P3 left (empty) joins P3 right (a=1, b=2) -> Missing, errors
1270 //
1271 // The row (a=1, b=2) should match left.a=1 but they're in different
1272 // partitions, causing panics.
1273 //
1274 // CollectLeft/CollectRight modes are safe because one side is collected
1275 // to a single partition which eliminates partition-to-partition mapping.
1276 let is_partitioned_join = plan
1277 .downcast_ref::<HashJoinExec>()
1278 .is_some_and(|join| join.mode == PartitionMode::Partitioned)
1279 || plan.is::<SortMergeJoinExec>();
1280
1281 let repartition_status_flags =
1282 get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1283 // This loop iterates over all the children to:
1284 // - Increase parallelism for every child if it is beneficial.
1285 // - Satisfy the distribution requirements of every child, if it is not
1286 // already satisfied.
1287 // We store the updated children in `new_children`.
1288 let children = izip!(
1289 children.into_iter(),
1290 plan.required_input_ordering(),
1291 plan.maintains_input_order(),
1292 repartition_status_flags.into_iter()
1293 )
1294 .map(
1295 |(
1296 mut child,
1297 required_input_ordering,
1298 maintains,
1299 RepartitionRequirementStatus {
1300 requirement,
1301 roundrobin_beneficial,
1302 roundrobin_beneficial_stats,
1303 hash_necessary,
1304 },
1305 )| {
1306 let increases_partition_count =
1307 child.plan.output_partitioning().partition_count() < target_partitions;
1308
1309 let add_roundrobin = enable_round_robin
1310 // Operator benefits from partitioning (e.g. filter):
1311 && roundrobin_beneficial
1312 && roundrobin_beneficial_stats
1313 // Unless partitioning increases the partition count, it is not beneficial:
1314 && increases_partition_count;
1315
1316 // Allow subset satisfaction when:
1317 // 1. Current partition count >= threshold
1318 // 2. Not a partitioned join since must use exact hash matching for joins
1319 // 3. Not a grouping set aggregate (requires exact hash including __grouping_id)
1320 let current_partitions = child.plan.output_partitioning().partition_count();
1321
1322 // Check if the hash partitioning requirement includes __grouping_id column.
1323 // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash
1324 // partitioning on all group columns including __grouping_id to ensure partial
1325 // aggregates from different partitions are correctly combined.
1326 let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs)
1327 if exprs.iter().any(|expr| {
1328 (expr.as_ref() as &dyn Any)
1329 .downcast_ref::<Column>()
1330 .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID)
1331 })
1332 );
1333
1334 let allow_subset_satisfy_partitioning = (current_partitions
1335 >= subset_satisfaction_threshold
1336 // `preserve_file_partitions` exposes existing file-group
1337 // partitioning to the optimizer. Respect it when the only
1338 // reason to repartition would be to increase partition count
1339 // beyond the preserved file-group count.
1340 || (config.optimizer.preserve_file_partitions > 0
1341 && current_partitions < target_partitions))
1342 && !is_partitioned_join
1343 && !requires_grouping_id;
1344
1345 // When `repartition_file_scans` is set, attempt to increase
1346 // parallelism at the source.
1347 //
1348 // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1349 // then no repartitioning will have occurred. As the default implementation returns None, it is only
1350 // specific physical plan nodes, such as certain datasources, which are repartitioned.
1351 if repartition_file_scans
1352 && roundrobin_beneficial_stats
1353 && let Some(new_child) =
1354 child.plan.repartitioned(target_partitions, config)?
1355 {
1356 child.plan = new_child;
1357 }
1358
1359 // Satisfy the distribution requirement if it is unmet.
1360 match &requirement {
1361 Distribution::SinglePartition => {
1362 child = add_merge_on_top(child);
1363 }
1364 Distribution::HashPartitioned(exprs) => {
1365 // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
1366 // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1367 if hash_necessary {
1368 child = add_hash_on_top(
1369 child,
1370 exprs.to_vec(),
1371 target_partitions,
1372 allow_subset_satisfy_partitioning,
1373 )?;
1374 }
1375 }
1376 Distribution::UnspecifiedDistribution => {
1377 if add_roundrobin {
1378 // Add round-robin repartitioning on top of the operator
1379 // to increase parallelism.
1380 child = add_roundrobin_on_top(child, target_partitions)?;
1381 }
1382 }
1383 };
1384
1385 let streaming_benefit = if child.data {
1386 preserving_order_enables_streaming(&plan, &child.plan)?
1387 } else {
1388 false
1389 };
1390
1391 // There is an ordering requirement of the operator:
1392 if let Some(required_input_ordering) = required_input_ordering {
1393 // Either:
1394 // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1395 // - using order preserving variant is not desirable.
1396 let sort_req = required_input_ordering.into_single();
1397 let ordering_satisfied = child
1398 .plan
1399 .equivalence_properties()
1400 .ordering_satisfy_requirement(sort_req.clone())?;
1401
1402 if (!ordering_satisfied || !order_preserving_variants_desirable)
1403 && !streaming_benefit
1404 && child.data
1405 {
1406 child = replace_order_preserving_variants(child)?;
1407 // If ordering requirements were satisfied before repartitioning,
1408 // make sure ordering requirements are still satisfied after.
1409 if ordering_satisfied {
1410 // Make sure to satisfy ordering requirement:
1411 child = add_sort_above_with_check(
1412 child,
1413 sort_req,
1414 plan.downcast_ref::<OutputRequirementExec>()
1415 .map(|output| output.fetch())
1416 .unwrap_or(None),
1417 )?;
1418 }
1419 }
1420 // Stop tracking distribution changing operators
1421 child.data = false;
1422 } else {
1423 let streaming_benefit = if child.data {
1424 preserving_order_enables_streaming(&plan, &child.plan)?
1425 } else {
1426 false
1427 };
1428 // no ordering requirement
1429 match requirement {
1430 // Operator requires specific distribution.
1431 Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1432 // If the parent doesn't maintain input order, preserving
1433 // ordering is pointless. However, if it does maintain
1434 // input order, we keep order-preserving variants so
1435 // ordering can flow through to ancestors that need it.
1436 if !maintains && !streaming_benefit {
1437 child = replace_order_preserving_variants(child)?;
1438 }
1439 }
1440 Distribution::UnspecifiedDistribution => {
1441 // Since ordering is lost, trying to preserve ordering is pointless
1442 if !maintains || plan.is::<OutputRequirementExec>() {
1443 child = replace_order_preserving_variants(child)?;
1444 }
1445 }
1446 }
1447 }
1448 Ok(child)
1449 },
1450 )
1451 .collect::<Result<Vec<_>>>()?;
1452
1453 let children_plans = children
1454 .iter()
1455 .map(|c| Arc::clone(&c.plan))
1456 .collect::<Vec<_>>();
1457
1458 plan = if plan.is::<UnionExec>()
1459 && !config.optimizer.prefer_existing_union
1460 && can_interleave(children_plans.iter())
1461 {
1462 // Add a special case for [`UnionExec`] since we want to "bubble up"
1463 // hash-partitioned data. So instead of
1464 //
1465 // Agg:
1466 // Repartition (hash):
1467 // Union:
1468 // - Agg:
1469 // Repartition (hash):
1470 // Data
1471 // - Agg:
1472 // Repartition (hash):
1473 // Data
1474 //
1475 // we can use:
1476 //
1477 // Agg:
1478 // Interleave:
1479 // - Agg:
1480 // Repartition (hash):
1481 // Data
1482 // - Agg:
1483 // Repartition (hash):
1484 // Data
1485 Arc::new(InterleaveExec::try_new(children_plans)?)
1486 } else {
1487 plan.with_new_children(children_plans)?
1488 };
1489
1490 Ok(Transformed::yes(DistributionContext::new(
1491 plan, data, children,
1492 )))
1493}
1494
1495/// Keeps track of distribution changing operators (like `RepartitionExec`,
1496/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1497/// Using this information, we can optimize distribution of the plan if/when
1498/// necessary.
1499pub type DistributionContext = PlanContext<bool>;
1500
1501fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1502 for child_context in dist_context.children.iter_mut() {
1503 child_context.data = if let Some(repartition) =
1504 child_context.plan.downcast_ref::<RepartitionExec>()
1505 {
1506 !matches!(
1507 repartition.partitioning(),
1508 Partitioning::UnknownPartitioning(_)
1509 )
1510 } else {
1511 child_context.plan.is::<SortPreservingMergeExec>()
1512 || child_context.plan.is::<CoalescePartitionsExec>()
1513 || child_context.plan.children().is_empty()
1514 || child_context.children[0].data
1515 || child_context
1516 .plan
1517 .required_input_distribution()
1518 .iter()
1519 .zip(child_context.children.iter())
1520 .any(|(required_dist, child_context)| {
1521 child_context.data
1522 && matches!(
1523 required_dist,
1524 Distribution::UnspecifiedDistribution
1525 )
1526 })
1527 }
1528 }
1529
1530 dist_context.data = false;
1531 Ok(dist_context)
1532}
1533
1534// See tests in datafusion/core/tests/physical_optimizer