datafusion_physical_optimizer/enforce_distribution.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use crate::optimizer::PhysicalOptimizerRule;
28use crate::output_requirements::OutputRequirementExec;
29use crate::utils::{
30 add_sort_above_with_check, is_coalesce_partitions, is_repartition,
31 is_sort_preserving_merge,
32};
33
34use arrow::compute::SortOptions;
35use datafusion_common::config::ConfigOptions;
36use datafusion_common::error::Result;
37use datafusion_common::stats::Precision;
38use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
39use datafusion_expr::logical_plan::JoinType;
40use datafusion_physical_expr::expressions::{Column, NoOp};
41use datafusion_physical_expr::utils::map_columns_before_projection;
42use datafusion_physical_expr::{
43 physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
44};
45use datafusion_physical_plan::aggregates::{
46 AggregateExec, AggregateMode, PhysicalGroupBy,
47};
48use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
49use datafusion_physical_plan::execution_plan::EmissionType;
50use datafusion_physical_plan::joins::{
51 CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52};
53use datafusion_physical_plan::projection::ProjectionExec;
54use datafusion_physical_plan::repartition::RepartitionExec;
55use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
56use datafusion_physical_plan::tree_node::PlanContext;
57use datafusion_physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
58use datafusion_physical_plan::windows::WindowAggExec;
59use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
60use datafusion_physical_plan::ExecutionPlanProperties;
61use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
62
63use itertools::izip;
64
65/// The `EnforceDistribution` rule ensures that distribution requirements are
66/// met. In doing so, this rule will increase the parallelism in the plan by
67/// introducing repartitioning operators to the physical plan.
68///
69/// For example, given an input such as:
70///
71///
72/// ```text
73/// ┌─────────────────────────────────┐
74/// │ │
75/// │ ExecutionPlan │
76/// │ │
77/// └─────────────────────────────────┘
78/// ▲ ▲
79/// │ │
80/// ┌─────┘ └─────┐
81/// │ │
82/// │ │
83/// │ │
84/// ┌───────────┐ ┌───────────┐
85/// │ │ │ │
86/// │ batch A1 │ │ batch B1 │
87/// │ │ │ │
88/// ├───────────┤ ├───────────┤
89/// │ │ │ │
90/// │ batch A2 │ │ batch B2 │
91/// │ │ │ │
92/// ├───────────┤ ├───────────┤
93/// │ │ │ │
94/// │ batch A3 │ │ batch B3 │
95/// │ │ │ │
96/// └───────────┘ └───────────┘
97///
98/// Input Input
99/// A B
100/// ```
101///
102/// This rule will attempt to add a `RepartitionExec` to increase parallelism
103/// (to 3, in this case) and create the following arrangement:
104///
105/// ```text
106/// ┌─────────────────────────────────┐
107/// │ │
108/// │ ExecutionPlan │
109/// │ │
110/// └─────────────────────────────────┘
111/// ▲ ▲ ▲ Input now has 3
112/// │ │ │ partitions
113/// ┌───────┘ │ └───────┐
114/// │ │ │
115/// │ │ │
116/// ┌───────────┐ ┌───────────┐ ┌───────────┐
117/// │ │ │ │ │ │
118/// │ batch A1 │ │ batch A3 │ │ batch B3 │
119/// │ │ │ │ │ │
120/// ├───────────┤ ├───────────┤ ├───────────┤
121/// │ │ │ │ │ │
122/// │ batch B2 │ │ batch B1 │ │ batch A2 │
123/// │ │ │ │ │ │
124/// └───────────┘ └───────────┘ └───────────┘
125/// ▲ ▲ ▲
126/// │ │ │
127/// └─────────┐ │ ┌──────────┘
128/// │ │ │
129/// │ │ │
130/// ┌─────────────────────────────────┐ batches are
131/// │ RepartitionExec(3) │ repartitioned
132/// │ RoundRobin │
133/// │ │
134/// └─────────────────────────────────┘
135/// ▲ ▲
136/// │ │
137/// ┌─────┘ └─────┐
138/// │ │
139/// │ │
140/// │ │
141/// ┌───────────┐ ┌───────────┐
142/// │ │ │ │
143/// │ batch A1 │ │ batch B1 │
144/// │ │ │ │
145/// ├───────────┤ ├───────────┤
146/// │ │ │ │
147/// │ batch A2 │ │ batch B2 │
148/// │ │ │ │
149/// ├───────────┤ ├───────────┤
150/// │ │ │ │
151/// │ batch A3 │ │ batch B3 │
152/// │ │ │ │
153/// └───────────┘ └───────────┘
154///
155///
156/// Input Input
157/// A B
158/// ```
159///
160/// The `EnforceDistribution` rule
161/// - is idempotent; i.e. it can be applied multiple times, each time producing
162/// the same result.
163/// - always produces a valid plan in terms of distribution requirements. Its
164/// input plan can be valid or invalid with respect to distribution requirements,
165/// but the output plan will always be valid.
166/// - produces a valid plan in terms of ordering requirements, *if* its input is
167/// a valid plan in terms of ordering requirements. If the input plan is invalid,
168/// this rule does not attempt to fix it as doing so is the responsibility of the
169/// `EnforceSorting` rule.
170///
171/// Note that distribution requirements are met in the strictest way. This may
172/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
173/// meeting the requirements in the strictest way may help avoid possible data
174/// skew in joins.
175///
176/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
177/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
178/// (a, c), (b, c), (a), (b), (c) and ( ).
179///
180/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
181/// by a HashPartition(a, b, c).
182#[derive(Default, Debug)]
183pub struct EnforceDistribution {}
184
185impl EnforceDistribution {
186 #[allow(missing_docs)]
187 pub fn new() -> Self {
188 Self {}
189 }
190}
191
192impl PhysicalOptimizerRule for EnforceDistribution {
193 fn optimize(
194 &self,
195 plan: Arc<dyn ExecutionPlan>,
196 config: &ConfigOptions,
197 ) -> Result<Arc<dyn ExecutionPlan>> {
198 let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
199
200 let adjusted = if top_down_join_key_reordering {
201 // Run a top-down process to adjust input key ordering recursively
202 let plan_requirements = PlanWithKeyRequirements::new_default(plan);
203 let adjusted = plan_requirements
204 .transform_down(adjust_input_keys_ordering)
205 .data()?;
206 adjusted.plan
207 } else {
208 // Run a bottom-up process
209 plan.transform_up(|plan| {
210 Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
211 })
212 .data()?
213 };
214
215 let distribution_context = DistributionContext::new_default(adjusted);
216 // Distribution enforcement needs to be applied bottom-up.
217 let distribution_context = distribution_context
218 .transform_up(|distribution_context| {
219 ensure_distribution(distribution_context, config)
220 })
221 .data()?;
222 Ok(distribution_context.plan)
223 }
224
225 fn name(&self) -> &str {
226 "EnforceDistribution"
227 }
228
229 fn schema_check(&self) -> bool {
230 true
231 }
232}
233
234#[derive(Debug, Clone)]
235struct JoinKeyPairs {
236 left_keys: Vec<Arc<dyn PhysicalExpr>>,
237 right_keys: Vec<Arc<dyn PhysicalExpr>>,
238}
239
240/// Keeps track of parent required key orderings.
241pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
242
243/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
244/// That might not match with the output partitioning of the join node's children
245/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
246///
247/// Example:
248/// TopJoin on (a, b, c)
249/// bottom left join on(b, a, c)
250/// bottom right join on(c, b, a)
251///
252/// Will be adjusted to:
253/// TopJoin on (a, b, c)
254/// bottom left join on(a, b, c)
255/// bottom right join on(a, b, c)
256///
257/// Example:
258/// TopJoin on (a, b, c)
259/// Agg1 group by (b, a, c)
260/// Agg2 group by (c, b, a)
261///
262/// Will be adjusted to:
263/// TopJoin on (a, b, c)
264/// Projection(b, a, c)
265/// Agg1 group by (a, b, c)
266/// Projection(c, b, a)
267/// Agg2 group by (a, b, c)
268///
269/// Following is the explanation of the reordering process:
270///
271/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
272/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
273/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
275///
276/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
277/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
278/// Requirements is already satisfied, clear all the requirements, return the unchanged plan.
279/// Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
280///
281/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
282/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
283/// 5) For other types of operators, by default, pushdown the parent requirements to children.
284///
285pub fn adjust_input_keys_ordering(
286 mut requirements: PlanWithKeyRequirements,
287) -> Result<Transformed<PlanWithKeyRequirements>> {
288 let plan = Arc::clone(&requirements.plan);
289
290 if let Some(HashJoinExec {
291 left,
292 right,
293 on,
294 filter,
295 join_type,
296 projection,
297 mode,
298 null_equals_null,
299 ..
300 }) = plan.as_any().downcast_ref::<HashJoinExec>()
301 {
302 match mode {
303 PartitionMode::Partitioned => {
304 let join_constructor = |new_conditions: (
305 Vec<(PhysicalExprRef, PhysicalExprRef)>,
306 Vec<SortOptions>,
307 )| {
308 HashJoinExec::try_new(
309 Arc::clone(left),
310 Arc::clone(right),
311 new_conditions.0,
312 filter.clone(),
313 join_type,
314 // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
315 projection.clone(),
316 PartitionMode::Partitioned,
317 *null_equals_null,
318 )
319 .map(|e| Arc::new(e) as _)
320 };
321 return reorder_partitioned_join_keys(
322 requirements,
323 on,
324 &[],
325 &join_constructor,
326 )
327 .map(Transformed::yes);
328 }
329 PartitionMode::CollectLeft => {
330 // Push down requirements to the right side
331 requirements.children[1].data = match join_type {
332 JoinType::Inner | JoinType::Right => shift_right_required(
333 &requirements.data,
334 left.schema().fields().len(),
335 )
336 .unwrap_or_default(),
337 JoinType::RightSemi | JoinType::RightAnti => {
338 requirements.data.clone()
339 }
340 JoinType::Left
341 | JoinType::LeftSemi
342 | JoinType::LeftAnti
343 | JoinType::Full
344 | JoinType::LeftMark => vec![],
345 };
346 }
347 PartitionMode::Auto => {
348 // Can not satisfy, clear the current requirements and generate new empty requirements
349 requirements.data.clear();
350 }
351 }
352 } else if let Some(CrossJoinExec { left, .. }) =
353 plan.as_any().downcast_ref::<CrossJoinExec>()
354 {
355 let left_columns_len = left.schema().fields().len();
356 // Push down requirements to the right side
357 requirements.children[1].data =
358 shift_right_required(&requirements.data, left_columns_len)
359 .unwrap_or_default();
360 } else if let Some(SortMergeJoinExec {
361 left,
362 right,
363 on,
364 filter,
365 join_type,
366 sort_options,
367 null_equals_null,
368 ..
369 }) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
370 {
371 let join_constructor = |new_conditions: (
372 Vec<(PhysicalExprRef, PhysicalExprRef)>,
373 Vec<SortOptions>,
374 )| {
375 SortMergeJoinExec::try_new(
376 Arc::clone(left),
377 Arc::clone(right),
378 new_conditions.0,
379 filter.clone(),
380 *join_type,
381 new_conditions.1,
382 *null_equals_null,
383 )
384 .map(|e| Arc::new(e) as _)
385 };
386 return reorder_partitioned_join_keys(
387 requirements,
388 on,
389 sort_options,
390 &join_constructor,
391 )
392 .map(Transformed::yes);
393 } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
394 if !requirements.data.is_empty() {
395 if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
396 return reorder_aggregate_keys(requirements, aggregate_exec)
397 .map(Transformed::yes);
398 } else {
399 requirements.data.clear();
400 }
401 } else {
402 // Keep everything unchanged
403 return Ok(Transformed::no(requirements));
404 }
405 } else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
406 let expr = proj.expr();
407 // For Projection, we need to transform the requirements to the columns before the Projection
408 // And then to push down the requirements
409 // Construct a mapping from new name to the original Column
410 let new_required = map_columns_before_projection(&requirements.data, expr);
411 if new_required.len() == requirements.data.len() {
412 requirements.children[0].data = new_required;
413 } else {
414 // Can not satisfy, clear the current requirements and generate new empty requirements
415 requirements.data.clear();
416 }
417 } else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
418 || plan
419 .as_any()
420 .downcast_ref::<CoalescePartitionsExec>()
421 .is_some()
422 || plan.as_any().downcast_ref::<WindowAggExec>().is_some()
423 {
424 requirements.data.clear();
425 } else {
426 // By default, push down the parent requirements to children
427 for child in requirements.children.iter_mut() {
428 child.data.clone_from(&requirements.data);
429 }
430 }
431 Ok(Transformed::yes(requirements))
432}
433
434pub fn reorder_partitioned_join_keys<F>(
435 mut join_plan: PlanWithKeyRequirements,
436 on: &[(PhysicalExprRef, PhysicalExprRef)],
437 sort_options: &[SortOptions],
438 join_constructor: &F,
439) -> Result<PlanWithKeyRequirements>
440where
441 F: Fn(
442 (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
443 ) -> Result<Arc<dyn ExecutionPlan>>,
444{
445 let parent_required = &join_plan.data;
446 let join_key_pairs = extract_join_keys(on);
447 let eq_properties = join_plan.plan.equivalence_properties();
448
449 let (
450 JoinKeyPairs {
451 left_keys,
452 right_keys,
453 },
454 positions,
455 ) = try_reorder(join_key_pairs, parent_required, eq_properties);
456
457 if let Some(positions) = positions {
458 if !positions.is_empty() {
459 let new_join_on = new_join_conditions(&left_keys, &right_keys);
460 let new_sort_options = (0..sort_options.len())
461 .map(|idx| sort_options[positions[idx]])
462 .collect();
463 join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
464 }
465 }
466
467 join_plan.children[0].data = left_keys;
468 join_plan.children[1].data = right_keys;
469 Ok(join_plan)
470}
471
472pub fn reorder_aggregate_keys(
473 mut agg_node: PlanWithKeyRequirements,
474 agg_exec: &AggregateExec,
475) -> Result<PlanWithKeyRequirements> {
476 let parent_required = &agg_node.data;
477 let output_columns = agg_exec
478 .group_expr()
479 .expr()
480 .iter()
481 .enumerate()
482 .map(|(index, (_, name))| Column::new(name, index))
483 .collect::<Vec<_>>();
484
485 let output_exprs = output_columns
486 .iter()
487 .map(|c| Arc::new(c.clone()) as _)
488 .collect::<Vec<_>>();
489
490 if parent_required.len() == output_exprs.len()
491 && agg_exec.group_expr().null_expr().is_empty()
492 && !physical_exprs_equal(&output_exprs, parent_required)
493 {
494 if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
495 if let Some(agg_exec) =
496 agg_exec.input().as_any().downcast_ref::<AggregateExec>()
497 {
498 if matches!(agg_exec.mode(), &AggregateMode::Partial) {
499 let group_exprs = agg_exec.group_expr().expr();
500 let new_group_exprs = positions
501 .into_iter()
502 .map(|idx| group_exprs[idx].clone())
503 .collect();
504 let partial_agg = Arc::new(AggregateExec::try_new(
505 AggregateMode::Partial,
506 PhysicalGroupBy::new_single(new_group_exprs),
507 agg_exec.aggr_expr().to_vec(),
508 agg_exec.filter_expr().to_vec(),
509 Arc::clone(agg_exec.input()),
510 Arc::clone(&agg_exec.input_schema),
511 )?);
512 // Build new group expressions that correspond to the output
513 // of the "reordered" aggregator:
514 let group_exprs = partial_agg.group_expr().expr();
515 let new_group_by = PhysicalGroupBy::new_single(
516 partial_agg
517 .output_group_expr()
518 .into_iter()
519 .enumerate()
520 .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
521 .collect(),
522 );
523 let new_final_agg = Arc::new(AggregateExec::try_new(
524 AggregateMode::FinalPartitioned,
525 new_group_by,
526 agg_exec.aggr_expr().to_vec(),
527 agg_exec.filter_expr().to_vec(),
528 Arc::clone(&partial_agg) as _,
529 agg_exec.input_schema(),
530 )?);
531
532 agg_node.plan = Arc::clone(&new_final_agg) as _;
533 agg_node.data.clear();
534 agg_node.children = vec![PlanWithKeyRequirements::new(
535 partial_agg as _,
536 vec![],
537 agg_node.children.swap_remove(0).children,
538 )];
539
540 // Need to create a new projection to change the expr ordering back
541 let agg_schema = new_final_agg.schema();
542 let mut proj_exprs = output_columns
543 .iter()
544 .map(|col| {
545 let name = col.name();
546 let index = agg_schema.index_of(name)?;
547 Ok((Arc::new(Column::new(name, index)) as _, name.to_owned()))
548 })
549 .collect::<Result<Vec<_>>>()?;
550 let agg_fields = agg_schema.fields();
551 for (idx, field) in
552 agg_fields.iter().enumerate().skip(output_columns.len())
553 {
554 let name = field.name();
555 let plan = Arc::new(Column::new(name, idx)) as _;
556 proj_exprs.push((plan, name.clone()))
557 }
558 return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
559 PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
560 });
561 }
562 }
563 }
564 }
565 Ok(agg_node)
566}
567
568fn shift_right_required(
569 parent_required: &[Arc<dyn PhysicalExpr>],
570 left_columns_len: usize,
571) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
572 let new_right_required = parent_required
573 .iter()
574 .filter_map(|r| {
575 r.as_any().downcast_ref::<Column>().and_then(|col| {
576 col.index()
577 .checked_sub(left_columns_len)
578 .map(|index| Arc::new(Column::new(col.name(), index)) as _)
579 })
580 })
581 .collect::<Vec<_>>();
582
583 // if the parent required are all coming from the right side, the requirements can be pushdown
584 (new_right_required.len() == parent_required.len()).then_some(new_right_required)
585}
586
587/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
588/// That might not match with the output partitioning of the join node's children
589/// This method will try to change the ordering of the join keys to match with the
590/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
591/// match with one, either the left side or the right side.
592///
593/// Example:
594/// TopJoin on (a, b, c)
595/// bottom left join on(b, a, c)
596/// bottom right join on(c, b, a)
597///
598/// Will be adjusted to:
599/// TopJoin on (b, a, c)
600/// bottom left join on(b, a, c)
601/// bottom right join on(c, b, a)
602///
603/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
604/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
605/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
606/// and then can't apply the Top-Down reordering process.
607pub fn reorder_join_keys_to_inputs(
608 plan: Arc<dyn ExecutionPlan>,
609) -> Result<Arc<dyn ExecutionPlan>> {
610 let plan_any = plan.as_any();
611 if let Some(HashJoinExec {
612 left,
613 right,
614 on,
615 filter,
616 join_type,
617 projection,
618 mode,
619 null_equals_null,
620 ..
621 }) = plan_any.downcast_ref::<HashJoinExec>()
622 {
623 if matches!(mode, PartitionMode::Partitioned) {
624 let (join_keys, positions) = reorder_current_join_keys(
625 extract_join_keys(on),
626 Some(left.output_partitioning()),
627 Some(right.output_partitioning()),
628 left.equivalence_properties(),
629 right.equivalence_properties(),
630 );
631 if positions.is_some_and(|idxs| !idxs.is_empty()) {
632 let JoinKeyPairs {
633 left_keys,
634 right_keys,
635 } = join_keys;
636 let new_join_on = new_join_conditions(&left_keys, &right_keys);
637 return Ok(Arc::new(HashJoinExec::try_new(
638 Arc::clone(left),
639 Arc::clone(right),
640 new_join_on,
641 filter.clone(),
642 join_type,
643 projection.clone(),
644 PartitionMode::Partitioned,
645 *null_equals_null,
646 )?));
647 }
648 }
649 } else if let Some(SortMergeJoinExec {
650 left,
651 right,
652 on,
653 filter,
654 join_type,
655 sort_options,
656 null_equals_null,
657 ..
658 }) = plan_any.downcast_ref::<SortMergeJoinExec>()
659 {
660 let (join_keys, positions) = reorder_current_join_keys(
661 extract_join_keys(on),
662 Some(left.output_partitioning()),
663 Some(right.output_partitioning()),
664 left.equivalence_properties(),
665 right.equivalence_properties(),
666 );
667 if let Some(positions) = positions {
668 if !positions.is_empty() {
669 let JoinKeyPairs {
670 left_keys,
671 right_keys,
672 } = join_keys;
673 let new_join_on = new_join_conditions(&left_keys, &right_keys);
674 let new_sort_options = (0..sort_options.len())
675 .map(|idx| sort_options[positions[idx]])
676 .collect();
677 return SortMergeJoinExec::try_new(
678 Arc::clone(left),
679 Arc::clone(right),
680 new_join_on,
681 filter.clone(),
682 *join_type,
683 new_sort_options,
684 *null_equals_null,
685 )
686 .map(|smj| Arc::new(smj) as _);
687 }
688 }
689 }
690 Ok(plan)
691}
692
693/// Reorder the current join keys ordering based on either left partition or right partition
694fn reorder_current_join_keys(
695 join_keys: JoinKeyPairs,
696 left_partition: Option<&Partitioning>,
697 right_partition: Option<&Partitioning>,
698 left_equivalence_properties: &EquivalenceProperties,
699 right_equivalence_properties: &EquivalenceProperties,
700) -> (JoinKeyPairs, Option<Vec<usize>>) {
701 match (left_partition, right_partition) {
702 (Some(Partitioning::Hash(left_exprs, _)), _) => {
703 match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
704 (join_keys, None) => reorder_current_join_keys(
705 join_keys,
706 None,
707 right_partition,
708 left_equivalence_properties,
709 right_equivalence_properties,
710 ),
711 result => result,
712 }
713 }
714 (_, Some(Partitioning::Hash(right_exprs, _))) => {
715 try_reorder(join_keys, right_exprs, right_equivalence_properties)
716 }
717 _ => (join_keys, None),
718 }
719}
720
721fn try_reorder(
722 join_keys: JoinKeyPairs,
723 expected: &[Arc<dyn PhysicalExpr>],
724 equivalence_properties: &EquivalenceProperties,
725) -> (JoinKeyPairs, Option<Vec<usize>>) {
726 let eq_groups = equivalence_properties.eq_group();
727 let mut normalized_expected = vec![];
728 let mut normalized_left_keys = vec![];
729 let mut normalized_right_keys = vec![];
730 if join_keys.left_keys.len() != expected.len() {
731 return (join_keys, None);
732 }
733 if physical_exprs_equal(expected, &join_keys.left_keys)
734 || physical_exprs_equal(expected, &join_keys.right_keys)
735 {
736 return (join_keys, Some(vec![]));
737 } else if !equivalence_properties.eq_group().is_empty() {
738 normalized_expected = expected
739 .iter()
740 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
741 .collect();
742
743 normalized_left_keys = join_keys
744 .left_keys
745 .iter()
746 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
747 .collect();
748
749 normalized_right_keys = join_keys
750 .right_keys
751 .iter()
752 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
753 .collect();
754
755 if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
756 || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
757 {
758 return (join_keys, Some(vec![]));
759 }
760 }
761
762 let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
763 .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
764 .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
765 .or_else(|| {
766 expected_expr_positions(&normalized_right_keys, &normalized_expected)
767 })
768 else {
769 return (join_keys, None);
770 };
771
772 let mut new_left_keys = vec![];
773 let mut new_right_keys = vec![];
774 for pos in positions.iter() {
775 new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
776 new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
777 }
778 let pairs = JoinKeyPairs {
779 left_keys: new_left_keys,
780 right_keys: new_right_keys,
781 };
782
783 (pairs, Some(positions))
784}
785
786/// Return the expected expressions positions.
787/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
788///
789/// This method will return a Vec [3, 0, 1, 2]
790fn expected_expr_positions(
791 current: &[Arc<dyn PhysicalExpr>],
792 expected: &[Arc<dyn PhysicalExpr>],
793) -> Option<Vec<usize>> {
794 if current.is_empty() || expected.is_empty() {
795 return None;
796 }
797 let mut indexes: Vec<usize> = vec![];
798 let mut current = current.to_vec();
799 for expr in expected.iter() {
800 // Find the position of the expected expr in the current expressions
801 if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
802 current[expected_position] = Arc::new(NoOp::new());
803 indexes.push(expected_position);
804 } else {
805 return None;
806 }
807 }
808 Some(indexes)
809}
810
811fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
812 let (left_keys, right_keys) = on
813 .iter()
814 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
815 .unzip();
816 JoinKeyPairs {
817 left_keys,
818 right_keys,
819 }
820}
821
822fn new_join_conditions(
823 new_left_keys: &[Arc<dyn PhysicalExpr>],
824 new_right_keys: &[Arc<dyn PhysicalExpr>],
825) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
826 new_left_keys
827 .iter()
828 .zip(new_right_keys.iter())
829 .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
830 .collect()
831}
832
833/// Adds RoundRobin repartition operator to the plan increase parallelism.
834///
835/// # Arguments
836///
837/// * `input`: Current node.
838/// * `n_target`: desired target partition number, if partition number of the
839/// current executor is less than this value. Partition number will be increased.
840///
841/// # Returns
842///
843/// A [`Result`] object that contains new execution plan where the desired
844/// partition number is achieved by adding a RoundRobin repartition.
845fn add_roundrobin_on_top(
846 input: DistributionContext,
847 n_target: usize,
848) -> Result<DistributionContext> {
849 // Adding repartition is helpful:
850 if input.plan.output_partitioning().partition_count() < n_target {
851 // When there is an existing ordering, we preserve ordering
852 // during repartition. This will be un-done in the future
853 // If any of the following conditions is true
854 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
855 // - Usage of order preserving variants is not desirable
856 // (determined by flag `config.optimizer.prefer_existing_sort`)
857 let partitioning = Partitioning::RoundRobinBatch(n_target);
858 let repartition =
859 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
860 .with_preserve_order();
861
862 let new_plan = Arc::new(repartition) as _;
863
864 Ok(DistributionContext::new(new_plan, true, vec![input]))
865 } else {
866 // Partition is not helpful, we already have desired number of partitions.
867 Ok(input)
868 }
869}
870
871/// Adds a hash repartition operator:
872/// - to increase parallelism, and/or
873/// - to satisfy requirements of the subsequent operators.
874///
875/// Repartition(Hash) is added on top of operator `input`.
876///
877/// # Arguments
878///
879/// * `input`: Current node.
880/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
881/// * `n_target`: desired target partition number, if partition number of the
882/// current executor is less than this value. Partition number will be increased.
883///
884/// # Returns
885///
886/// A [`Result`] object that contains new execution plan where the desired
887/// distribution is satisfied by adding a Hash repartition.
888fn add_hash_on_top(
889 input: DistributionContext,
890 hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
891 n_target: usize,
892) -> Result<DistributionContext> {
893 // Early return if hash repartition is unnecessary
894 // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
895 if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
896 return Ok(input);
897 }
898
899 let dist = Distribution::HashPartitioned(hash_exprs);
900 let satisfied = input
901 .plan
902 .output_partitioning()
903 .satisfy(&dist, input.plan.equivalence_properties());
904
905 // Add hash repartitioning when:
906 // - The hash distribution requirement is not satisfied, or
907 // - We can increase parallelism by adding hash partitioning.
908 if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
909 // When there is an existing ordering, we preserve ordering during
910 // repartition. This will be rolled back in the future if any of the
911 // following conditions is true:
912 // - Preserving ordering is not helpful in terms of satisfying ordering
913 // requirements.
914 // - Usage of order preserving variants is not desirable (per the flag
915 // `config.optimizer.prefer_existing_sort`).
916 let partitioning = dist.create_partitioning(n_target);
917 let repartition =
918 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
919 .with_preserve_order();
920 let plan = Arc::new(repartition) as _;
921
922 return Ok(DistributionContext::new(plan, true, vec![input]));
923 }
924
925 Ok(input)
926}
927
928/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
929/// to satisfy single distribution requirement.
930///
931/// # Arguments
932///
933/// * `input`: Current node.
934///
935/// # Returns
936///
937/// Updated node with an execution plan, where desired single
938/// distribution is satisfied by adding [`SortPreservingMergeExec`].
939fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
940 // Add SortPreservingMerge only when partition count is larger than 1.
941 if input.plan.output_partitioning().partition_count() > 1 {
942 // When there is an existing ordering, we preserve ordering
943 // when decreasing partitions. This will be un-done in the future
944 // if any of the following conditions is true
945 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
946 // - Usage of order preserving variants is not desirable
947 // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
948 let should_preserve_ordering = input.plan.output_ordering().is_some();
949
950 let new_plan = if should_preserve_ordering {
951 Arc::new(SortPreservingMergeExec::new(
952 input.plan.output_ordering().cloned().unwrap_or_default(),
953 Arc::clone(&input.plan),
954 )) as _
955 } else {
956 Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
957 };
958
959 DistributionContext::new(new_plan, true, vec![input])
960 } else {
961 input
962 }
963}
964
965/// Updates the physical plan inside [`DistributionContext`] so that distribution
966/// changing operators are removed from the top. If they are necessary, they will
967/// be added in subsequent stages.
968///
969/// Assume that following plan is given:
970/// ```text
971/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
972/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
973/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
974/// ```
975///
976/// Since `RepartitionExec`s change the distribution, this function removes
977/// them and returns following plan:
978///
979/// ```text
980/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
981/// ```
982fn remove_dist_changing_operators(
983 mut distribution_context: DistributionContext,
984) -> Result<DistributionContext> {
985 while is_repartition(&distribution_context.plan)
986 || is_coalesce_partitions(&distribution_context.plan)
987 || is_sort_preserving_merge(&distribution_context.plan)
988 {
989 // All of above operators have a single child. First child is only child.
990 // Remove any distribution changing operators at the beginning:
991 distribution_context = distribution_context.children.swap_remove(0);
992 // Note that they will be re-inserted later on if necessary or helpful.
993 }
994
995 Ok(distribution_context)
996}
997
998/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
999///
1000/// Assume that following plan is given:
1001/// ```text
1002/// "SortPreservingMergeExec: \[a@0 ASC]"
1003/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1004/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1005/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1006/// ```
1007///
1008/// This function converts plan above to the following:
1009///
1010/// ```text
1011/// "CoalescePartitionsExec"
1012/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1013/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1014/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1015/// ```
1016pub fn replace_order_preserving_variants(
1017 mut context: DistributionContext,
1018) -> Result<DistributionContext> {
1019 context.children = context
1020 .children
1021 .into_iter()
1022 .map(|child| {
1023 if child.data {
1024 replace_order_preserving_variants(child)
1025 } else {
1026 Ok(child)
1027 }
1028 })
1029 .collect::<Result<Vec<_>>>()?;
1030
1031 if is_sort_preserving_merge(&context.plan) {
1032 let child_plan = Arc::clone(&context.children[0].plan);
1033 context.plan = Arc::new(
1034 CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1035 );
1036 return Ok(context);
1037 } else if let Some(repartition) =
1038 context.plan.as_any().downcast_ref::<RepartitionExec>()
1039 {
1040 if repartition.preserve_order() {
1041 context.plan = Arc::new(RepartitionExec::try_new(
1042 Arc::clone(&context.children[0].plan),
1043 repartition.partitioning().clone(),
1044 )?);
1045 return Ok(context);
1046 }
1047 }
1048
1049 context.update_plan_from_children()
1050}
1051
1052/// A struct to keep track of repartition requirements for each child node.
1053struct RepartitionRequirementStatus {
1054 /// The distribution requirement for the node.
1055 requirement: Distribution,
1056 /// Designates whether round robin partitioning is theoretically beneficial;
1057 /// i.e. the operator can actually utilize parallelism.
1058 roundrobin_beneficial: bool,
1059 /// Designates whether round robin partitioning is beneficial according to
1060 /// the statistical information we have on the number of rows.
1061 roundrobin_beneficial_stats: bool,
1062 /// Designates whether hash partitioning is necessary.
1063 hash_necessary: bool,
1064}
1065
1066/// Calculates the `RepartitionRequirementStatus` for each children to generate
1067/// consistent and sensible (in terms of performance) distribution requirements.
1068/// As an example, a hash join's left (build) child might produce
1069///
1070/// ```text
1071/// RepartitionRequirementStatus {
1072/// ..,
1073/// hash_necessary: true
1074/// }
1075/// ```
1076///
1077/// while its right (probe) child might have very few rows and produce:
1078///
1079/// ```text
1080/// RepartitionRequirementStatus {
1081/// ..,
1082/// hash_necessary: false
1083/// }
1084/// ```
1085///
1086/// These statuses are not consistent as all children should agree on hash
1087/// partitioning. This function aligns the statuses to generate consistent
1088/// hash partitions for each children. After alignment, the right child's
1089/// status would turn into:
1090///
1091/// ```text
1092/// RepartitionRequirementStatus {
1093/// ..,
1094/// hash_necessary: true
1095/// }
1096/// ```
1097fn get_repartition_requirement_status(
1098 plan: &Arc<dyn ExecutionPlan>,
1099 batch_size: usize,
1100 should_use_estimates: bool,
1101) -> Result<Vec<RepartitionRequirementStatus>> {
1102 let mut needs_alignment = false;
1103 let children = plan.children();
1104 let rr_beneficial = plan.benefits_from_input_partitioning();
1105 let requirements = plan.required_input_distribution();
1106 let mut repartition_status_flags = vec![];
1107 for (child, requirement, roundrobin_beneficial) in
1108 izip!(children.into_iter(), requirements, rr_beneficial)
1109 {
1110 // Decide whether adding a round robin is beneficial depending on
1111 // the statistical information we have on the number of rows:
1112 let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1113 {
1114 Precision::Exact(n_rows) => n_rows > batch_size,
1115 Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1116 Precision::Absent => true,
1117 };
1118 let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1119 // Hash re-partitioning is necessary when the input has more than one
1120 // partitions:
1121 let multi_partitions = child.output_partitioning().partition_count() > 1;
1122 let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1123 needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1124 repartition_status_flags.push((
1125 is_hash,
1126 RepartitionRequirementStatus {
1127 requirement,
1128 roundrobin_beneficial,
1129 roundrobin_beneficial_stats,
1130 hash_necessary: is_hash && multi_partitions,
1131 },
1132 ));
1133 }
1134 // Align hash necessary flags for hash partitions to generate consistent
1135 // hash partitions at each children:
1136 if needs_alignment {
1137 // When there is at least one hash requirement that is necessary or
1138 // beneficial according to statistics, make all children require hash
1139 // repartitioning:
1140 for (is_hash, status) in &mut repartition_status_flags {
1141 if *is_hash {
1142 status.hash_necessary = true;
1143 }
1144 }
1145 }
1146 Ok(repartition_status_flags
1147 .into_iter()
1148 .map(|(_, status)| status)
1149 .collect())
1150}
1151
1152/// This function checks whether we need to add additional data exchange
1153/// operators to satisfy distribution requirements. Since this function
1154/// takes care of such requirements, we should avoid manually adding data
1155/// exchange operators in other places.
1156///
1157/// This function is intended to be used in a bottom up traversal, as it
1158/// can first repartition (or newly partition) at the datasources -- these
1159/// source partitions may be later repartitioned with additional data exchange operators.
1160pub fn ensure_distribution(
1161 dist_context: DistributionContext,
1162 config: &ConfigOptions,
1163) -> Result<Transformed<DistributionContext>> {
1164 let dist_context = update_children(dist_context)?;
1165
1166 if dist_context.plan.children().is_empty() {
1167 return Ok(Transformed::no(dist_context));
1168 }
1169
1170 let target_partitions = config.execution.target_partitions;
1171 // When `false`, round robin repartition will not be added to increase parallelism
1172 let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1173 let repartition_file_scans = config.optimizer.repartition_file_scans;
1174 let batch_size = config.execution.batch_size;
1175 let should_use_estimates = config
1176 .execution
1177 .use_row_number_estimates_to_optimize_partitioning;
1178 let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1179 && matches!(
1180 dist_context.plan.pipeline_behavior(),
1181 EmissionType::Incremental | EmissionType::Both
1182 );
1183 // Use order preserving variants either of the conditions true
1184 // - it is desired according to config
1185 // - when plan is unbounded
1186 // - when it is pipeline friendly (can incrementally produce results)
1187 let order_preserving_variants_desirable =
1188 unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1189
1190 // Remove unnecessary repartition from the physical plan if any
1191 let DistributionContext {
1192 mut plan,
1193 data,
1194 children,
1195 } = remove_dist_changing_operators(dist_context)?;
1196
1197 if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
1198 if let Some(updated_window) = get_best_fitting_window(
1199 exec.window_expr(),
1200 exec.input(),
1201 &exec.partition_keys(),
1202 )? {
1203 plan = updated_window;
1204 }
1205 } else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
1206 if let Some(updated_window) = get_best_fitting_window(
1207 exec.window_expr(),
1208 exec.input(),
1209 &exec.partition_keys(),
1210 )? {
1211 plan = updated_window;
1212 }
1213 };
1214
1215 let repartition_status_flags =
1216 get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1217 // This loop iterates over all the children to:
1218 // - Increase parallelism for every child if it is beneficial.
1219 // - Satisfy the distribution requirements of every child, if it is not
1220 // already satisfied.
1221 // We store the updated children in `new_children`.
1222 let children = izip!(
1223 children.into_iter(),
1224 plan.required_input_ordering(),
1225 plan.maintains_input_order(),
1226 repartition_status_flags.into_iter()
1227 )
1228 .map(
1229 |(
1230 mut child,
1231 required_input_ordering,
1232 maintains,
1233 RepartitionRequirementStatus {
1234 requirement,
1235 roundrobin_beneficial,
1236 roundrobin_beneficial_stats,
1237 hash_necessary,
1238 },
1239 )| {
1240 let add_roundrobin = enable_round_robin
1241 // Operator benefits from partitioning (e.g. filter):
1242 && roundrobin_beneficial
1243 && roundrobin_beneficial_stats
1244 // Unless partitioning increases the partition count, it is not beneficial:
1245 && child.plan.output_partitioning().partition_count() < target_partitions;
1246
1247 // When `repartition_file_scans` is set, attempt to increase
1248 // parallelism at the source.
1249 //
1250 // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1251 // then no repartitioning will have occurred. As the default implementation returns None, it is only
1252 // specific physical plan nodes, such as certain datasources, which are repartitioned.
1253 if repartition_file_scans && roundrobin_beneficial_stats {
1254 if let Some(new_child) =
1255 child.plan.repartitioned(target_partitions, config)?
1256 {
1257 child.plan = new_child;
1258 }
1259 }
1260
1261 // Satisfy the distribution requirement if it is unmet.
1262 match &requirement {
1263 Distribution::SinglePartition => {
1264 child = add_spm_on_top(child);
1265 }
1266 Distribution::HashPartitioned(exprs) => {
1267 if add_roundrobin {
1268 // Add round-robin repartitioning on top of the operator
1269 // to increase parallelism.
1270 child = add_roundrobin_on_top(child, target_partitions)?;
1271 }
1272 // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1273 if hash_necessary {
1274 child =
1275 add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
1276 }
1277 }
1278 Distribution::UnspecifiedDistribution => {
1279 if add_roundrobin {
1280 // Add round-robin repartitioning on top of the operator
1281 // to increase parallelism.
1282 child = add_roundrobin_on_top(child, target_partitions)?;
1283 }
1284 }
1285 };
1286
1287 // There is an ordering requirement of the operator:
1288 if let Some(required_input_ordering) = required_input_ordering {
1289 // Either:
1290 // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1291 // - using order preserving variant is not desirable.
1292 let ordering_satisfied = child
1293 .plan
1294 .equivalence_properties()
1295 .ordering_satisfy_requirement(&required_input_ordering);
1296 if (!ordering_satisfied || !order_preserving_variants_desirable)
1297 && child.data
1298 {
1299 child = replace_order_preserving_variants(child)?;
1300 // If ordering requirements were satisfied before repartitioning,
1301 // make sure ordering requirements are still satisfied after.
1302 if ordering_satisfied {
1303 // Make sure to satisfy ordering requirement:
1304 child = add_sort_above_with_check(
1305 child,
1306 required_input_ordering.clone(),
1307 None,
1308 );
1309 }
1310 }
1311 // Stop tracking distribution changing operators
1312 child.data = false;
1313 } else {
1314 // no ordering requirement
1315 match requirement {
1316 // Operator requires specific distribution.
1317 Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1318 // Since there is no ordering requirement, preserving ordering is pointless
1319 child = replace_order_preserving_variants(child)?;
1320 }
1321 Distribution::UnspecifiedDistribution => {
1322 // Since ordering is lost, trying to preserve ordering is pointless
1323 if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1324 child = replace_order_preserving_variants(child)?;
1325 }
1326 }
1327 }
1328 }
1329 Ok(child)
1330 },
1331 )
1332 .collect::<Result<Vec<_>>>()?;
1333
1334 let children_plans = children
1335 .iter()
1336 .map(|c| Arc::clone(&c.plan))
1337 .collect::<Vec<_>>();
1338
1339 plan = if plan.as_any().is::<UnionExec>()
1340 && !config.optimizer.prefer_existing_union
1341 && can_interleave(children_plans.iter())
1342 {
1343 // Add a special case for [`UnionExec`] since we want to "bubble up"
1344 // hash-partitioned data. So instead of
1345 //
1346 // Agg:
1347 // Repartition (hash):
1348 // Union:
1349 // - Agg:
1350 // Repartition (hash):
1351 // Data
1352 // - Agg:
1353 // Repartition (hash):
1354 // Data
1355 //
1356 // we can use:
1357 //
1358 // Agg:
1359 // Interleave:
1360 // - Agg:
1361 // Repartition (hash):
1362 // Data
1363 // - Agg:
1364 // Repartition (hash):
1365 // Data
1366 Arc::new(InterleaveExec::try_new(children_plans)?)
1367 } else {
1368 plan.with_new_children(children_plans)?
1369 };
1370
1371 Ok(Transformed::yes(DistributionContext::new(
1372 plan, data, children,
1373 )))
1374}
1375
1376/// Keeps track of distribution changing operators (like `RepartitionExec`,
1377/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1378/// Using this information, we can optimize distribution of the plan if/when
1379/// necessary.
1380pub type DistributionContext = PlanContext<bool>;
1381
1382fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1383 for child_context in dist_context.children.iter_mut() {
1384 let child_plan_any = child_context.plan.as_any();
1385 child_context.data =
1386 if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
1387 !matches!(
1388 repartition.partitioning(),
1389 Partitioning::UnknownPartitioning(_)
1390 )
1391 } else {
1392 child_plan_any.is::<SortPreservingMergeExec>()
1393 || child_plan_any.is::<CoalescePartitionsExec>()
1394 || child_context.plan.children().is_empty()
1395 || child_context.children[0].data
1396 || child_context
1397 .plan
1398 .required_input_distribution()
1399 .iter()
1400 .zip(child_context.children.iter())
1401 .any(|(required_dist, child_context)| {
1402 child_context.data
1403 && matches!(
1404 required_dist,
1405 Distribution::UnspecifiedDistribution
1406 )
1407 })
1408 }
1409 }
1410
1411 dist_context.data = false;
1412 Ok(dist_context)
1413}
1414
1415// See tests in datafusion/core/tests/physical_optimizer