datafusion_physical_optimizer/enforce_distribution.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use crate::optimizer::PhysicalOptimizerRule;
28use crate::output_requirements::OutputRequirementExec;
29use crate::utils::{
30 add_sort_above_with_check, is_coalesce_partitions, is_repartition,
31 is_sort_preserving_merge,
32};
33
34use arrow::compute::SortOptions;
35use datafusion_common::config::ConfigOptions;
36use datafusion_common::error::Result;
37use datafusion_common::stats::Precision;
38use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
39use datafusion_expr::logical_plan::JoinType;
40use datafusion_physical_expr::expressions::{Column, NoOp};
41use datafusion_physical_expr::utils::map_columns_before_projection;
42use datafusion_physical_expr::{
43 physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
44};
45use datafusion_physical_plan::aggregates::{
46 AggregateExec, AggregateMode, PhysicalGroupBy,
47};
48use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
49use datafusion_physical_plan::execution_plan::EmissionType;
50use datafusion_physical_plan::joins::{
51 CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52};
53use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
54use datafusion_physical_plan::repartition::RepartitionExec;
55use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
56use datafusion_physical_plan::tree_node::PlanContext;
57use datafusion_physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
58use datafusion_physical_plan::windows::WindowAggExec;
59use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
60use datafusion_physical_plan::ExecutionPlanProperties;
61use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
62
63use itertools::izip;
64
65/// The `EnforceDistribution` rule ensures that distribution requirements are
66/// met. In doing so, this rule will increase the parallelism in the plan by
67/// introducing repartitioning operators to the physical plan.
68///
69/// For example, given an input such as:
70///
71///
72/// ```text
73/// ┌─────────────────────────────────┐
74/// │ │
75/// │ ExecutionPlan │
76/// │ │
77/// └─────────────────────────────────┘
78/// ▲ ▲
79/// │ │
80/// ┌─────┘ └─────┐
81/// │ │
82/// │ │
83/// │ │
84/// ┌───────────┐ ┌───────────┐
85/// │ │ │ │
86/// │ batch A1 │ │ batch B1 │
87/// │ │ │ │
88/// ├───────────┤ ├───────────┤
89/// │ │ │ │
90/// │ batch A2 │ │ batch B2 │
91/// │ │ │ │
92/// ├───────────┤ ├───────────┤
93/// │ │ │ │
94/// │ batch A3 │ │ batch B3 │
95/// │ │ │ │
96/// └───────────┘ └───────────┘
97///
98/// Input Input
99/// A B
100/// ```
101///
102/// This rule will attempt to add a `RepartitionExec` to increase parallelism
103/// (to 3, in this case) and create the following arrangement:
104///
105/// ```text
106/// ┌─────────────────────────────────┐
107/// │ │
108/// │ ExecutionPlan │
109/// │ │
110/// └─────────────────────────────────┘
111/// ▲ ▲ ▲ Input now has 3
112/// │ │ │ partitions
113/// ┌───────┘ │ └───────┐
114/// │ │ │
115/// │ │ │
116/// ┌───────────┐ ┌───────────┐ ┌───────────┐
117/// │ │ │ │ │ │
118/// │ batch A1 │ │ batch A3 │ │ batch B3 │
119/// │ │ │ │ │ │
120/// ├───────────┤ ├───────────┤ ├───────────┤
121/// │ │ │ │ │ │
122/// │ batch B2 │ │ batch B1 │ │ batch A2 │
123/// │ │ │ │ │ │
124/// └───────────┘ └───────────┘ └───────────┘
125/// ▲ ▲ ▲
126/// │ │ │
127/// └─────────┐ │ ┌──────────┘
128/// │ │ │
129/// │ │ │
130/// ┌─────────────────────────────────┐ batches are
131/// │ RepartitionExec(3) │ repartitioned
132/// │ RoundRobin │
133/// │ │
134/// └─────────────────────────────────┘
135/// ▲ ▲
136/// │ │
137/// ┌─────┘ └─────┐
138/// │ │
139/// │ │
140/// │ │
141/// ┌───────────┐ ┌───────────┐
142/// │ │ │ │
143/// │ batch A1 │ │ batch B1 │
144/// │ │ │ │
145/// ├───────────┤ ├───────────┤
146/// │ │ │ │
147/// │ batch A2 │ │ batch B2 │
148/// │ │ │ │
149/// ├───────────┤ ├───────────┤
150/// │ │ │ │
151/// │ batch A3 │ │ batch B3 │
152/// │ │ │ │
153/// └───────────┘ └───────────┘
154///
155///
156/// Input Input
157/// A B
158/// ```
159///
160/// The `EnforceDistribution` rule
161/// - is idempotent; i.e. it can be applied multiple times, each time producing
162/// the same result.
163/// - always produces a valid plan in terms of distribution requirements. Its
164/// input plan can be valid or invalid with respect to distribution requirements,
165/// but the output plan will always be valid.
166/// - produces a valid plan in terms of ordering requirements, *if* its input is
167/// a valid plan in terms of ordering requirements. If the input plan is invalid,
168/// this rule does not attempt to fix it as doing so is the responsibility of the
169/// `EnforceSorting` rule.
170///
171/// Note that distribution requirements are met in the strictest way. This may
172/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
173/// meeting the requirements in the strictest way may help avoid possible data
174/// skew in joins.
175///
176/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
177/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
178/// (a, c), (b, c), (a), (b), (c) and ( ).
179///
180/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
181/// by a HashPartition(a, b, c).
182#[derive(Default, Debug)]
183pub struct EnforceDistribution {}
184
185impl EnforceDistribution {
186 #[allow(missing_docs)]
187 pub fn new() -> Self {
188 Self {}
189 }
190}
191
192impl PhysicalOptimizerRule for EnforceDistribution {
193 fn optimize(
194 &self,
195 plan: Arc<dyn ExecutionPlan>,
196 config: &ConfigOptions,
197 ) -> Result<Arc<dyn ExecutionPlan>> {
198 let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
199
200 let adjusted = if top_down_join_key_reordering {
201 // Run a top-down process to adjust input key ordering recursively
202 let plan_requirements = PlanWithKeyRequirements::new_default(plan);
203 let adjusted = plan_requirements
204 .transform_down(adjust_input_keys_ordering)
205 .data()?;
206 adjusted.plan
207 } else {
208 // Run a bottom-up process
209 plan.transform_up(|plan| {
210 Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
211 })
212 .data()?
213 };
214
215 let distribution_context = DistributionContext::new_default(adjusted);
216 // Distribution enforcement needs to be applied bottom-up.
217 let distribution_context = distribution_context
218 .transform_up(|distribution_context| {
219 ensure_distribution(distribution_context, config)
220 })
221 .data()?;
222 Ok(distribution_context.plan)
223 }
224
225 fn name(&self) -> &str {
226 "EnforceDistribution"
227 }
228
229 fn schema_check(&self) -> bool {
230 true
231 }
232}
233
234#[derive(Debug, Clone)]
235struct JoinKeyPairs {
236 left_keys: Vec<Arc<dyn PhysicalExpr>>,
237 right_keys: Vec<Arc<dyn PhysicalExpr>>,
238}
239
240/// Keeps track of parent required key orderings.
241pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
242
243/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
244/// That might not match with the output partitioning of the join node's children
245/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
246///
247/// Example:
248/// TopJoin on (a, b, c)
249/// bottom left join on(b, a, c)
250/// bottom right join on(c, b, a)
251///
252/// Will be adjusted to:
253/// TopJoin on (a, b, c)
254/// bottom left join on(a, b, c)
255/// bottom right join on(a, b, c)
256///
257/// Example:
258/// TopJoin on (a, b, c)
259/// Agg1 group by (b, a, c)
260/// Agg2 group by (c, b, a)
261///
262/// Will be adjusted to:
263/// TopJoin on (a, b, c)
264/// Projection(b, a, c)
265/// Agg1 group by (a, b, c)
266/// Projection(c, b, a)
267/// Agg2 group by (a, b, c)
268///
269/// Following is the explanation of the reordering process:
270///
271/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
272/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
273/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
275///
276/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
277/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
278/// Requirements is already satisfied, clear all the requirements, return the unchanged plan.
279/// Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
280///
281/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
282/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
283/// 5) For other types of operators, by default, pushdown the parent requirements to children.
284///
285pub fn adjust_input_keys_ordering(
286 mut requirements: PlanWithKeyRequirements,
287) -> Result<Transformed<PlanWithKeyRequirements>> {
288 let plan = Arc::clone(&requirements.plan);
289
290 if let Some(HashJoinExec {
291 left,
292 right,
293 on,
294 filter,
295 join_type,
296 projection,
297 mode,
298 null_equality,
299 ..
300 }) = plan.as_any().downcast_ref::<HashJoinExec>()
301 {
302 match mode {
303 PartitionMode::Partitioned => {
304 let join_constructor = |new_conditions: (
305 Vec<(PhysicalExprRef, PhysicalExprRef)>,
306 Vec<SortOptions>,
307 )| {
308 HashJoinExec::try_new(
309 Arc::clone(left),
310 Arc::clone(right),
311 new_conditions.0,
312 filter.clone(),
313 join_type,
314 // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
315 projection.clone(),
316 PartitionMode::Partitioned,
317 *null_equality,
318 )
319 .map(|e| Arc::new(e) as _)
320 };
321 return reorder_partitioned_join_keys(
322 requirements,
323 on,
324 &[],
325 &join_constructor,
326 )
327 .map(Transformed::yes);
328 }
329 PartitionMode::CollectLeft => {
330 // Push down requirements to the right side
331 requirements.children[1].data = match join_type {
332 JoinType::Inner | JoinType::Right => shift_right_required(
333 &requirements.data,
334 left.schema().fields().len(),
335 )
336 .unwrap_or_default(),
337 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
338 requirements.data.clone()
339 }
340 JoinType::Left
341 | JoinType::LeftSemi
342 | JoinType::LeftAnti
343 | JoinType::Full
344 | JoinType::LeftMark => vec![],
345 };
346 }
347 PartitionMode::Auto => {
348 // Can not satisfy, clear the current requirements and generate new empty requirements
349 requirements.data.clear();
350 }
351 }
352 } else if let Some(CrossJoinExec { left, .. }) =
353 plan.as_any().downcast_ref::<CrossJoinExec>()
354 {
355 let left_columns_len = left.schema().fields().len();
356 // Push down requirements to the right side
357 requirements.children[1].data =
358 shift_right_required(&requirements.data, left_columns_len)
359 .unwrap_or_default();
360 } else if let Some(SortMergeJoinExec {
361 left,
362 right,
363 on,
364 filter,
365 join_type,
366 sort_options,
367 null_equality,
368 ..
369 }) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
370 {
371 let join_constructor = |new_conditions: (
372 Vec<(PhysicalExprRef, PhysicalExprRef)>,
373 Vec<SortOptions>,
374 )| {
375 SortMergeJoinExec::try_new(
376 Arc::clone(left),
377 Arc::clone(right),
378 new_conditions.0,
379 filter.clone(),
380 *join_type,
381 new_conditions.1,
382 *null_equality,
383 )
384 .map(|e| Arc::new(e) as _)
385 };
386 return reorder_partitioned_join_keys(
387 requirements,
388 on,
389 sort_options,
390 &join_constructor,
391 )
392 .map(Transformed::yes);
393 } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
394 if !requirements.data.is_empty() {
395 if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
396 return reorder_aggregate_keys(requirements, aggregate_exec)
397 .map(Transformed::yes);
398 } else {
399 requirements.data.clear();
400 }
401 } else {
402 // Keep everything unchanged
403 return Ok(Transformed::no(requirements));
404 }
405 } else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
406 let expr = proj.expr();
407 // For Projection, we need to transform the requirements to the columns before the Projection
408 // And then to push down the requirements
409 // Construct a mapping from new name to the original Column
410 let proj_exprs: Vec<_> = expr
411 .iter()
412 .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
413 .collect();
414 let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
415 if new_required.len() == requirements.data.len() {
416 requirements.children[0].data = new_required;
417 } else {
418 // Can not satisfy, clear the current requirements and generate new empty requirements
419 requirements.data.clear();
420 }
421 } else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
422 || plan
423 .as_any()
424 .downcast_ref::<CoalescePartitionsExec>()
425 .is_some()
426 || plan.as_any().downcast_ref::<WindowAggExec>().is_some()
427 {
428 requirements.data.clear();
429 } else {
430 // By default, push down the parent requirements to children
431 for child in requirements.children.iter_mut() {
432 child.data.clone_from(&requirements.data);
433 }
434 }
435 Ok(Transformed::yes(requirements))
436}
437
438pub fn reorder_partitioned_join_keys<F>(
439 mut join_plan: PlanWithKeyRequirements,
440 on: &[(PhysicalExprRef, PhysicalExprRef)],
441 sort_options: &[SortOptions],
442 join_constructor: &F,
443) -> Result<PlanWithKeyRequirements>
444where
445 F: Fn(
446 (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
447 ) -> Result<Arc<dyn ExecutionPlan>>,
448{
449 let parent_required = &join_plan.data;
450 let join_key_pairs = extract_join_keys(on);
451 let eq_properties = join_plan.plan.equivalence_properties();
452
453 let (
454 JoinKeyPairs {
455 left_keys,
456 right_keys,
457 },
458 positions,
459 ) = try_reorder(join_key_pairs, parent_required, eq_properties);
460
461 if let Some(positions) = positions {
462 if !positions.is_empty() {
463 let new_join_on = new_join_conditions(&left_keys, &right_keys);
464 let new_sort_options = (0..sort_options.len())
465 .map(|idx| sort_options[positions[idx]])
466 .collect();
467 join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
468 }
469 }
470
471 join_plan.children[0].data = left_keys;
472 join_plan.children[1].data = right_keys;
473 Ok(join_plan)
474}
475
476pub fn reorder_aggregate_keys(
477 mut agg_node: PlanWithKeyRequirements,
478 agg_exec: &AggregateExec,
479) -> Result<PlanWithKeyRequirements> {
480 let parent_required = &agg_node.data;
481 let output_columns = agg_exec
482 .group_expr()
483 .expr()
484 .iter()
485 .enumerate()
486 .map(|(index, (_, name))| Column::new(name, index))
487 .collect::<Vec<_>>();
488
489 let output_exprs = output_columns
490 .iter()
491 .map(|c| Arc::new(c.clone()) as _)
492 .collect::<Vec<_>>();
493
494 if parent_required.len() == output_exprs.len()
495 && agg_exec.group_expr().null_expr().is_empty()
496 && !physical_exprs_equal(&output_exprs, parent_required)
497 {
498 if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
499 if let Some(agg_exec) =
500 agg_exec.input().as_any().downcast_ref::<AggregateExec>()
501 {
502 if matches!(agg_exec.mode(), &AggregateMode::Partial) {
503 let group_exprs = agg_exec.group_expr().expr();
504 let new_group_exprs = positions
505 .into_iter()
506 .map(|idx| group_exprs[idx].clone())
507 .collect();
508 let partial_agg = Arc::new(AggregateExec::try_new(
509 AggregateMode::Partial,
510 PhysicalGroupBy::new_single(new_group_exprs),
511 agg_exec.aggr_expr().to_vec(),
512 agg_exec.filter_expr().to_vec(),
513 Arc::clone(agg_exec.input()),
514 Arc::clone(&agg_exec.input_schema),
515 )?);
516 // Build new group expressions that correspond to the output
517 // of the "reordered" aggregator:
518 let group_exprs = partial_agg.group_expr().expr();
519 let new_group_by = PhysicalGroupBy::new_single(
520 partial_agg
521 .output_group_expr()
522 .into_iter()
523 .enumerate()
524 .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
525 .collect(),
526 );
527 let new_final_agg = Arc::new(AggregateExec::try_new(
528 AggregateMode::FinalPartitioned,
529 new_group_by,
530 agg_exec.aggr_expr().to_vec(),
531 agg_exec.filter_expr().to_vec(),
532 Arc::clone(&partial_agg) as _,
533 agg_exec.input_schema(),
534 )?);
535
536 agg_node.plan = Arc::clone(&new_final_agg) as _;
537 agg_node.data.clear();
538 agg_node.children = vec![PlanWithKeyRequirements::new(
539 partial_agg as _,
540 vec![],
541 agg_node.children.swap_remove(0).children,
542 )];
543
544 // Need to create a new projection to change the expr ordering back
545 let agg_schema = new_final_agg.schema();
546 let mut proj_exprs = output_columns
547 .iter()
548 .map(|col| {
549 let name = col.name();
550 let index = agg_schema.index_of(name)?;
551 Ok(ProjectionExpr {
552 expr: Arc::new(Column::new(name, index)) as _,
553 alias: name.to_owned(),
554 })
555 })
556 .collect::<Result<Vec<_>>>()?;
557 let agg_fields = agg_schema.fields();
558 for (idx, field) in
559 agg_fields.iter().enumerate().skip(output_columns.len())
560 {
561 let name = field.name();
562 let plan = Arc::new(Column::new(name, idx)) as _;
563 proj_exprs.push(ProjectionExpr {
564 expr: plan,
565 alias: name.clone(),
566 })
567 }
568 return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
569 PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
570 });
571 }
572 }
573 }
574 }
575 Ok(agg_node)
576}
577
578fn shift_right_required(
579 parent_required: &[Arc<dyn PhysicalExpr>],
580 left_columns_len: usize,
581) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
582 let new_right_required = parent_required
583 .iter()
584 .filter_map(|r| {
585 r.as_any().downcast_ref::<Column>().and_then(|col| {
586 col.index()
587 .checked_sub(left_columns_len)
588 .map(|index| Arc::new(Column::new(col.name(), index)) as _)
589 })
590 })
591 .collect::<Vec<_>>();
592
593 // if the parent required are all coming from the right side, the requirements can be pushdown
594 (new_right_required.len() == parent_required.len()).then_some(new_right_required)
595}
596
597/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
598/// That might not match with the output partitioning of the join node's children
599/// This method will try to change the ordering of the join keys to match with the
600/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
601/// match with one, either the left side or the right side.
602///
603/// Example:
604/// TopJoin on (a, b, c)
605/// bottom left join on(b, a, c)
606/// bottom right join on(c, b, a)
607///
608/// Will be adjusted to:
609/// TopJoin on (b, a, c)
610/// bottom left join on(b, a, c)
611/// bottom right join on(c, b, a)
612///
613/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
614/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
615/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
616/// and then can't apply the Top-Down reordering process.
617pub fn reorder_join_keys_to_inputs(
618 plan: Arc<dyn ExecutionPlan>,
619) -> Result<Arc<dyn ExecutionPlan>> {
620 let plan_any = plan.as_any();
621 if let Some(HashJoinExec {
622 left,
623 right,
624 on,
625 filter,
626 join_type,
627 projection,
628 mode,
629 null_equality,
630 ..
631 }) = plan_any.downcast_ref::<HashJoinExec>()
632 {
633 if matches!(mode, PartitionMode::Partitioned) {
634 let (join_keys, positions) = reorder_current_join_keys(
635 extract_join_keys(on),
636 Some(left.output_partitioning()),
637 Some(right.output_partitioning()),
638 left.equivalence_properties(),
639 right.equivalence_properties(),
640 );
641 if positions.is_some_and(|idxs| !idxs.is_empty()) {
642 let JoinKeyPairs {
643 left_keys,
644 right_keys,
645 } = join_keys;
646 let new_join_on = new_join_conditions(&left_keys, &right_keys);
647 return Ok(Arc::new(HashJoinExec::try_new(
648 Arc::clone(left),
649 Arc::clone(right),
650 new_join_on,
651 filter.clone(),
652 join_type,
653 projection.clone(),
654 PartitionMode::Partitioned,
655 *null_equality,
656 )?));
657 }
658 }
659 } else if let Some(SortMergeJoinExec {
660 left,
661 right,
662 on,
663 filter,
664 join_type,
665 sort_options,
666 null_equality,
667 ..
668 }) = plan_any.downcast_ref::<SortMergeJoinExec>()
669 {
670 let (join_keys, positions) = reorder_current_join_keys(
671 extract_join_keys(on),
672 Some(left.output_partitioning()),
673 Some(right.output_partitioning()),
674 left.equivalence_properties(),
675 right.equivalence_properties(),
676 );
677 if let Some(positions) = positions {
678 if !positions.is_empty() {
679 let JoinKeyPairs {
680 left_keys,
681 right_keys,
682 } = join_keys;
683 let new_join_on = new_join_conditions(&left_keys, &right_keys);
684 let new_sort_options = (0..sort_options.len())
685 .map(|idx| sort_options[positions[idx]])
686 .collect();
687 return SortMergeJoinExec::try_new(
688 Arc::clone(left),
689 Arc::clone(right),
690 new_join_on,
691 filter.clone(),
692 *join_type,
693 new_sort_options,
694 *null_equality,
695 )
696 .map(|smj| Arc::new(smj) as _);
697 }
698 }
699 }
700 Ok(plan)
701}
702
703/// Reorder the current join keys ordering based on either left partition or right partition
704fn reorder_current_join_keys(
705 join_keys: JoinKeyPairs,
706 left_partition: Option<&Partitioning>,
707 right_partition: Option<&Partitioning>,
708 left_equivalence_properties: &EquivalenceProperties,
709 right_equivalence_properties: &EquivalenceProperties,
710) -> (JoinKeyPairs, Option<Vec<usize>>) {
711 match (left_partition, right_partition) {
712 (Some(Partitioning::Hash(left_exprs, _)), _) => {
713 match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
714 (join_keys, None) => reorder_current_join_keys(
715 join_keys,
716 None,
717 right_partition,
718 left_equivalence_properties,
719 right_equivalence_properties,
720 ),
721 result => result,
722 }
723 }
724 (_, Some(Partitioning::Hash(right_exprs, _))) => {
725 try_reorder(join_keys, right_exprs, right_equivalence_properties)
726 }
727 _ => (join_keys, None),
728 }
729}
730
731fn try_reorder(
732 join_keys: JoinKeyPairs,
733 expected: &[Arc<dyn PhysicalExpr>],
734 equivalence_properties: &EquivalenceProperties,
735) -> (JoinKeyPairs, Option<Vec<usize>>) {
736 let eq_groups = equivalence_properties.eq_group();
737 let mut normalized_expected = vec![];
738 let mut normalized_left_keys = vec![];
739 let mut normalized_right_keys = vec![];
740 if join_keys.left_keys.len() != expected.len() {
741 return (join_keys, None);
742 }
743 if physical_exprs_equal(expected, &join_keys.left_keys)
744 || physical_exprs_equal(expected, &join_keys.right_keys)
745 {
746 return (join_keys, Some(vec![]));
747 } else if !equivalence_properties.eq_group().is_empty() {
748 normalized_expected = expected
749 .iter()
750 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
751 .collect();
752
753 normalized_left_keys = join_keys
754 .left_keys
755 .iter()
756 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
757 .collect();
758
759 normalized_right_keys = join_keys
760 .right_keys
761 .iter()
762 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
763 .collect();
764
765 if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
766 || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
767 {
768 return (join_keys, Some(vec![]));
769 }
770 }
771
772 let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
773 .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
774 .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
775 .or_else(|| {
776 expected_expr_positions(&normalized_right_keys, &normalized_expected)
777 })
778 else {
779 return (join_keys, None);
780 };
781
782 let mut new_left_keys = vec![];
783 let mut new_right_keys = vec![];
784 for pos in positions.iter() {
785 new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
786 new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
787 }
788 let pairs = JoinKeyPairs {
789 left_keys: new_left_keys,
790 right_keys: new_right_keys,
791 };
792
793 (pairs, Some(positions))
794}
795
796/// Return the expected expressions positions.
797/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
798///
799/// This method will return a Vec [3, 0, 1, 2]
800fn expected_expr_positions(
801 current: &[Arc<dyn PhysicalExpr>],
802 expected: &[Arc<dyn PhysicalExpr>],
803) -> Option<Vec<usize>> {
804 if current.is_empty() || expected.is_empty() {
805 return None;
806 }
807 let mut indexes: Vec<usize> = vec![];
808 let mut current = current.to_vec();
809 for expr in expected.iter() {
810 // Find the position of the expected expr in the current expressions
811 if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
812 current[expected_position] = Arc::new(NoOp::new());
813 indexes.push(expected_position);
814 } else {
815 return None;
816 }
817 }
818 Some(indexes)
819}
820
821fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
822 let (left_keys, right_keys) = on
823 .iter()
824 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
825 .unzip();
826 JoinKeyPairs {
827 left_keys,
828 right_keys,
829 }
830}
831
832fn new_join_conditions(
833 new_left_keys: &[Arc<dyn PhysicalExpr>],
834 new_right_keys: &[Arc<dyn PhysicalExpr>],
835) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
836 new_left_keys
837 .iter()
838 .zip(new_right_keys.iter())
839 .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
840 .collect()
841}
842
843/// Adds RoundRobin repartition operator to the plan increase parallelism.
844///
845/// # Arguments
846///
847/// * `input`: Current node.
848/// * `n_target`: desired target partition number, if partition number of the
849/// current executor is less than this value. Partition number will be increased.
850///
851/// # Returns
852///
853/// A [`Result`] object that contains new execution plan where the desired
854/// partition number is achieved by adding a RoundRobin repartition.
855fn add_roundrobin_on_top(
856 input: DistributionContext,
857 n_target: usize,
858) -> Result<DistributionContext> {
859 // Adding repartition is helpful:
860 if input.plan.output_partitioning().partition_count() < n_target {
861 // When there is an existing ordering, we preserve ordering
862 // during repartition. This will be un-done in the future
863 // If any of the following conditions is true
864 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
865 // - Usage of order preserving variants is not desirable
866 // (determined by flag `config.optimizer.prefer_existing_sort`)
867 let partitioning = Partitioning::RoundRobinBatch(n_target);
868 let repartition =
869 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
870 .with_preserve_order();
871
872 let new_plan = Arc::new(repartition) as _;
873
874 Ok(DistributionContext::new(new_plan, true, vec![input]))
875 } else {
876 // Partition is not helpful, we already have desired number of partitions.
877 Ok(input)
878 }
879}
880
881/// Adds a hash repartition operator:
882/// - to increase parallelism, and/or
883/// - to satisfy requirements of the subsequent operators.
884///
885/// Repartition(Hash) is added on top of operator `input`.
886///
887/// # Arguments
888///
889/// * `input`: Current node.
890/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
891/// * `n_target`: desired target partition number, if partition number of the
892/// current executor is less than this value. Partition number will be increased.
893///
894/// # Returns
895///
896/// A [`Result`] object that contains new execution plan where the desired
897/// distribution is satisfied by adding a Hash repartition.
898fn add_hash_on_top(
899 input: DistributionContext,
900 hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
901 n_target: usize,
902) -> Result<DistributionContext> {
903 // Early return if hash repartition is unnecessary
904 // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
905 if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
906 return Ok(input);
907 }
908
909 let dist = Distribution::HashPartitioned(hash_exprs);
910 let satisfied = input
911 .plan
912 .output_partitioning()
913 .satisfy(&dist, input.plan.equivalence_properties());
914
915 // Add hash repartitioning when:
916 // - The hash distribution requirement is not satisfied, or
917 // - We can increase parallelism by adding hash partitioning.
918 if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
919 // When there is an existing ordering, we preserve ordering during
920 // repartition. This will be rolled back in the future if any of the
921 // following conditions is true:
922 // - Preserving ordering is not helpful in terms of satisfying ordering
923 // requirements.
924 // - Usage of order preserving variants is not desirable (per the flag
925 // `config.optimizer.prefer_existing_sort`).
926 let partitioning = dist.create_partitioning(n_target);
927 let repartition =
928 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
929 .with_preserve_order();
930 let plan = Arc::new(repartition) as _;
931
932 return Ok(DistributionContext::new(plan, true, vec![input]));
933 }
934
935 Ok(input)
936}
937
938/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
939/// on top of the given plan node to satisfy a single partition requirement
940/// while preserving ordering constraints.
941///
942/// # Parameters
943///
944/// * `input`: Current node.
945///
946/// # Returns
947///
948/// Updated node with an execution plan, where the desired single distribution
949/// requirement is satisfied.
950fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
951 // Apply only when the partition count is larger than one.
952 if input.plan.output_partitioning().partition_count() > 1 {
953 // When there is an existing ordering, we preserve ordering
954 // when decreasing partitions. This will be un-done in the future
955 // if any of the following conditions is true
956 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
957 // - Usage of order preserving variants is not desirable
958 // (determined by flag `config.optimizer.prefer_existing_sort`)
959 let new_plan = if let Some(req) = input.plan.output_ordering() {
960 Arc::new(SortPreservingMergeExec::new(
961 req.clone(),
962 Arc::clone(&input.plan),
963 )) as _
964 } else {
965 // If there is no input order, we can simply coalesce partitions:
966 Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
967 };
968
969 DistributionContext::new(new_plan, true, vec![input])
970 } else {
971 input
972 }
973}
974
975/// Updates the physical plan inside [`DistributionContext`] so that distribution
976/// changing operators are removed from the top. If they are necessary, they will
977/// be added in subsequent stages.
978///
979/// Assume that following plan is given:
980/// ```text
981/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
982/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
983/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
984/// ```
985///
986/// Since `RepartitionExec`s change the distribution, this function removes
987/// them and returns following plan:
988///
989/// ```text
990/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
991/// ```
992fn remove_dist_changing_operators(
993 mut distribution_context: DistributionContext,
994) -> Result<DistributionContext> {
995 while is_repartition(&distribution_context.plan)
996 || is_coalesce_partitions(&distribution_context.plan)
997 || is_sort_preserving_merge(&distribution_context.plan)
998 {
999 // All of above operators have a single child. First child is only child.
1000 // Remove any distribution changing operators at the beginning:
1001 distribution_context = distribution_context.children.swap_remove(0);
1002 // Note that they will be re-inserted later on if necessary or helpful.
1003 }
1004
1005 Ok(distribution_context)
1006}
1007
1008/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
1009///
1010/// Assume that following plan is given:
1011/// ```text
1012/// "SortPreservingMergeExec: \[a@0 ASC]"
1013/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1014/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1015/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1016/// ```
1017///
1018/// This function converts plan above to the following:
1019///
1020/// ```text
1021/// "CoalescePartitionsExec"
1022/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1023/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1024/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1025/// ```
1026pub fn replace_order_preserving_variants(
1027 mut context: DistributionContext,
1028) -> Result<DistributionContext> {
1029 context.children = context
1030 .children
1031 .into_iter()
1032 .map(|child| {
1033 if child.data {
1034 replace_order_preserving_variants(child)
1035 } else {
1036 Ok(child)
1037 }
1038 })
1039 .collect::<Result<Vec<_>>>()?;
1040
1041 if is_sort_preserving_merge(&context.plan) {
1042 let child_plan = Arc::clone(&context.children[0].plan);
1043 context.plan = Arc::new(
1044 CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1045 );
1046 return Ok(context);
1047 } else if let Some(repartition) =
1048 context.plan.as_any().downcast_ref::<RepartitionExec>()
1049 {
1050 if repartition.preserve_order() {
1051 context.plan = Arc::new(RepartitionExec::try_new(
1052 Arc::clone(&context.children[0].plan),
1053 repartition.partitioning().clone(),
1054 )?);
1055 return Ok(context);
1056 }
1057 }
1058
1059 context.update_plan_from_children()
1060}
1061
1062/// A struct to keep track of repartition requirements for each child node.
1063struct RepartitionRequirementStatus {
1064 /// The distribution requirement for the node.
1065 requirement: Distribution,
1066 /// Designates whether round robin partitioning is theoretically beneficial;
1067 /// i.e. the operator can actually utilize parallelism.
1068 roundrobin_beneficial: bool,
1069 /// Designates whether round robin partitioning is beneficial according to
1070 /// the statistical information we have on the number of rows.
1071 roundrobin_beneficial_stats: bool,
1072 /// Designates whether hash partitioning is necessary.
1073 hash_necessary: bool,
1074}
1075
1076/// Calculates the `RepartitionRequirementStatus` for each children to generate
1077/// consistent and sensible (in terms of performance) distribution requirements.
1078/// As an example, a hash join's left (build) child might produce
1079///
1080/// ```text
1081/// RepartitionRequirementStatus {
1082/// ..,
1083/// hash_necessary: true
1084/// }
1085/// ```
1086///
1087/// while its right (probe) child might have very few rows and produce:
1088///
1089/// ```text
1090/// RepartitionRequirementStatus {
1091/// ..,
1092/// hash_necessary: false
1093/// }
1094/// ```
1095///
1096/// These statuses are not consistent as all children should agree on hash
1097/// partitioning. This function aligns the statuses to generate consistent
1098/// hash partitions for each children. After alignment, the right child's
1099/// status would turn into:
1100///
1101/// ```text
1102/// RepartitionRequirementStatus {
1103/// ..,
1104/// hash_necessary: true
1105/// }
1106/// ```
1107fn get_repartition_requirement_status(
1108 plan: &Arc<dyn ExecutionPlan>,
1109 batch_size: usize,
1110 should_use_estimates: bool,
1111) -> Result<Vec<RepartitionRequirementStatus>> {
1112 let mut needs_alignment = false;
1113 let children = plan.children();
1114 let rr_beneficial = plan.benefits_from_input_partitioning();
1115 let requirements = plan.required_input_distribution();
1116 let mut repartition_status_flags = vec![];
1117 for (child, requirement, roundrobin_beneficial) in
1118 izip!(children.into_iter(), requirements, rr_beneficial)
1119 {
1120 // Decide whether adding a round robin is beneficial depending on
1121 // the statistical information we have on the number of rows:
1122 let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1123 {
1124 Precision::Exact(n_rows) => n_rows > batch_size,
1125 Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1126 Precision::Absent => true,
1127 };
1128 let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1129 // Hash re-partitioning is necessary when the input has more than one
1130 // partitions:
1131 let multi_partitions = child.output_partitioning().partition_count() > 1;
1132 let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1133 needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1134 repartition_status_flags.push((
1135 is_hash,
1136 RepartitionRequirementStatus {
1137 requirement,
1138 roundrobin_beneficial,
1139 roundrobin_beneficial_stats,
1140 hash_necessary: is_hash && multi_partitions,
1141 },
1142 ));
1143 }
1144 // Align hash necessary flags for hash partitions to generate consistent
1145 // hash partitions at each children:
1146 if needs_alignment {
1147 // When there is at least one hash requirement that is necessary or
1148 // beneficial according to statistics, make all children require hash
1149 // repartitioning:
1150 for (is_hash, status) in &mut repartition_status_flags {
1151 if *is_hash {
1152 status.hash_necessary = true;
1153 }
1154 }
1155 }
1156 Ok(repartition_status_flags
1157 .into_iter()
1158 .map(|(_, status)| status)
1159 .collect())
1160}
1161
1162/// This function checks whether we need to add additional data exchange
1163/// operators to satisfy distribution requirements. Since this function
1164/// takes care of such requirements, we should avoid manually adding data
1165/// exchange operators in other places.
1166///
1167/// This function is intended to be used in a bottom up traversal, as it
1168/// can first repartition (or newly partition) at the datasources -- these
1169/// source partitions may be later repartitioned with additional data exchange operators.
1170pub fn ensure_distribution(
1171 dist_context: DistributionContext,
1172 config: &ConfigOptions,
1173) -> Result<Transformed<DistributionContext>> {
1174 let dist_context = update_children(dist_context)?;
1175
1176 if dist_context.plan.children().is_empty() {
1177 return Ok(Transformed::no(dist_context));
1178 }
1179
1180 let target_partitions = config.execution.target_partitions;
1181 // When `false`, round robin repartition will not be added to increase parallelism
1182 let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1183 let repartition_file_scans = config.optimizer.repartition_file_scans;
1184 let batch_size = config.execution.batch_size;
1185 let should_use_estimates = config
1186 .execution
1187 .use_row_number_estimates_to_optimize_partitioning;
1188 let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1189 && matches!(
1190 dist_context.plan.pipeline_behavior(),
1191 EmissionType::Incremental | EmissionType::Both
1192 );
1193 // Use order preserving variants either of the conditions true
1194 // - it is desired according to config
1195 // - when plan is unbounded
1196 // - when it is pipeline friendly (can incrementally produce results)
1197 let order_preserving_variants_desirable =
1198 unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1199
1200 // Remove unnecessary repartition from the physical plan if any
1201 let DistributionContext {
1202 mut plan,
1203 data,
1204 children,
1205 } = remove_dist_changing_operators(dist_context)?;
1206
1207 if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
1208 if let Some(updated_window) = get_best_fitting_window(
1209 exec.window_expr(),
1210 exec.input(),
1211 &exec.partition_keys(),
1212 )? {
1213 plan = updated_window;
1214 }
1215 } else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
1216 if let Some(updated_window) = get_best_fitting_window(
1217 exec.window_expr(),
1218 exec.input(),
1219 &exec.partition_keys(),
1220 )? {
1221 plan = updated_window;
1222 }
1223 };
1224
1225 let repartition_status_flags =
1226 get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1227 // This loop iterates over all the children to:
1228 // - Increase parallelism for every child if it is beneficial.
1229 // - Satisfy the distribution requirements of every child, if it is not
1230 // already satisfied.
1231 // We store the updated children in `new_children`.
1232 let children = izip!(
1233 children.into_iter(),
1234 plan.required_input_ordering(),
1235 plan.maintains_input_order(),
1236 repartition_status_flags.into_iter()
1237 )
1238 .map(
1239 |(
1240 mut child,
1241 required_input_ordering,
1242 maintains,
1243 RepartitionRequirementStatus {
1244 requirement,
1245 roundrobin_beneficial,
1246 roundrobin_beneficial_stats,
1247 hash_necessary,
1248 },
1249 )| {
1250 let add_roundrobin = enable_round_robin
1251 // Operator benefits from partitioning (e.g. filter):
1252 && roundrobin_beneficial
1253 && roundrobin_beneficial_stats
1254 // Unless partitioning increases the partition count, it is not beneficial:
1255 && child.plan.output_partitioning().partition_count() < target_partitions;
1256
1257 // When `repartition_file_scans` is set, attempt to increase
1258 // parallelism at the source.
1259 //
1260 // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1261 // then no repartitioning will have occurred. As the default implementation returns None, it is only
1262 // specific physical plan nodes, such as certain datasources, which are repartitioned.
1263 if repartition_file_scans && roundrobin_beneficial_stats {
1264 if let Some(new_child) =
1265 child.plan.repartitioned(target_partitions, config)?
1266 {
1267 child.plan = new_child;
1268 }
1269 }
1270
1271 // Satisfy the distribution requirement if it is unmet.
1272 match &requirement {
1273 Distribution::SinglePartition => {
1274 child = add_merge_on_top(child);
1275 }
1276 Distribution::HashPartitioned(exprs) => {
1277 if add_roundrobin {
1278 // Add round-robin repartitioning on top of the operator
1279 // to increase parallelism.
1280 child = add_roundrobin_on_top(child, target_partitions)?;
1281 }
1282 // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1283 if hash_necessary {
1284 child =
1285 add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
1286 }
1287 }
1288 Distribution::UnspecifiedDistribution => {
1289 if add_roundrobin {
1290 // Add round-robin repartitioning on top of the operator
1291 // to increase parallelism.
1292 child = add_roundrobin_on_top(child, target_partitions)?;
1293 }
1294 }
1295 };
1296
1297 // There is an ordering requirement of the operator:
1298 if let Some(required_input_ordering) = required_input_ordering {
1299 // Either:
1300 // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1301 // - using order preserving variant is not desirable.
1302 let sort_req = required_input_ordering.into_single();
1303 let ordering_satisfied = child
1304 .plan
1305 .equivalence_properties()
1306 .ordering_satisfy_requirement(sort_req.clone())?;
1307
1308 if (!ordering_satisfied || !order_preserving_variants_desirable)
1309 && child.data
1310 {
1311 child = replace_order_preserving_variants(child)?;
1312 // If ordering requirements were satisfied before repartitioning,
1313 // make sure ordering requirements are still satisfied after.
1314 if ordering_satisfied {
1315 // Make sure to satisfy ordering requirement:
1316 child = add_sort_above_with_check(
1317 child,
1318 sort_req,
1319 plan.as_any()
1320 .downcast_ref::<OutputRequirementExec>()
1321 .map(|output| output.fetch())
1322 .unwrap_or(None),
1323 )?;
1324 }
1325 }
1326 // Stop tracking distribution changing operators
1327 child.data = false;
1328 } else {
1329 // no ordering requirement
1330 match requirement {
1331 // Operator requires specific distribution.
1332 Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1333 // Since there is no ordering requirement, preserving ordering is pointless
1334 child = replace_order_preserving_variants(child)?;
1335 }
1336 Distribution::UnspecifiedDistribution => {
1337 // Since ordering is lost, trying to preserve ordering is pointless
1338 if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1339 child = replace_order_preserving_variants(child)?;
1340 }
1341 }
1342 }
1343 }
1344 Ok(child)
1345 },
1346 )
1347 .collect::<Result<Vec<_>>>()?;
1348
1349 let children_plans = children
1350 .iter()
1351 .map(|c| Arc::clone(&c.plan))
1352 .collect::<Vec<_>>();
1353
1354 plan = if plan.as_any().is::<UnionExec>()
1355 && !config.optimizer.prefer_existing_union
1356 && can_interleave(children_plans.iter())
1357 {
1358 // Add a special case for [`UnionExec`] since we want to "bubble up"
1359 // hash-partitioned data. So instead of
1360 //
1361 // Agg:
1362 // Repartition (hash):
1363 // Union:
1364 // - Agg:
1365 // Repartition (hash):
1366 // Data
1367 // - Agg:
1368 // Repartition (hash):
1369 // Data
1370 //
1371 // we can use:
1372 //
1373 // Agg:
1374 // Interleave:
1375 // - Agg:
1376 // Repartition (hash):
1377 // Data
1378 // - Agg:
1379 // Repartition (hash):
1380 // Data
1381 Arc::new(InterleaveExec::try_new(children_plans)?)
1382 } else {
1383 plan.with_new_children(children_plans)?
1384 };
1385
1386 Ok(Transformed::yes(DistributionContext::new(
1387 plan, data, children,
1388 )))
1389}
1390
1391/// Keeps track of distribution changing operators (like `RepartitionExec`,
1392/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1393/// Using this information, we can optimize distribution of the plan if/when
1394/// necessary.
1395pub type DistributionContext = PlanContext<bool>;
1396
1397fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1398 for child_context in dist_context.children.iter_mut() {
1399 let child_plan_any = child_context.plan.as_any();
1400 child_context.data =
1401 if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
1402 !matches!(
1403 repartition.partitioning(),
1404 Partitioning::UnknownPartitioning(_)
1405 )
1406 } else {
1407 child_plan_any.is::<SortPreservingMergeExec>()
1408 || child_plan_any.is::<CoalescePartitionsExec>()
1409 || child_context.plan.children().is_empty()
1410 || child_context.children[0].data
1411 || child_context
1412 .plan
1413 .required_input_distribution()
1414 .iter()
1415 .zip(child_context.children.iter())
1416 .any(|(required_dist, child_context)| {
1417 child_context.data
1418 && matches!(
1419 required_dist,
1420 Distribution::UnspecifiedDistribution
1421 )
1422 })
1423 }
1424 }
1425
1426 dist_context.data = false;
1427 Ok(dist_context)
1428}
1429
1430// See tests in datafusion/core/tests/physical_optimizer