datafusion_physical_optimizer/enforce_sorting/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceSorting optimizer rule inspects the physical plan with respect
19//! to local sorting requirements and does the following:
20//! - Adds a [`SortExec`] when a requirement is not met,
21//! - Removes an already-existing [`SortExec`] if it is possible to prove
22//! that this sort is unnecessary
23//!
24//! The rule can work on valid *and* invalid physical plans with respect to
25//! sorting requirements, but always produces a valid physical plan in this sense.
26//!
27//! A non-realistic but easy to follow example for sort removals: Assume that we
28//! somehow get the fragment
29//!
30//! ```text
31//! SortExec: expr=[nullable_col@0 ASC]
32//! SortExec: expr=[non_nullable_col@1 ASC]
33//! ```
34//!
35//! in the physical plan. The first sort is unnecessary since its result is overwritten
36//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
37
38pub mod replace_with_order_preserving_variants;
39pub mod sort_pushdown;
40
41use std::sync::Arc;
42
43use crate::enforce_sorting::replace_with_order_preserving_variants::{
44 replace_with_order_preserving_variants, OrderPreservationContext,
45};
46use crate::enforce_sorting::sort_pushdown::{
47 assign_initial_requirements, pushdown_sorts, SortPushDown,
48};
49use crate::utils::{
50 add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
51 is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
52};
53use crate::PhysicalOptimizerRule;
54
55use datafusion_common::config::ConfigOptions;
56use datafusion_common::plan_err;
57use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
58use datafusion_common::Result;
59use datafusion_physical_expr::{Distribution, Partitioning};
60use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
61use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
62use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
63use datafusion_physical_plan::repartition::RepartitionExec;
64use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
65use datafusion_physical_plan::sorts::sort::SortExec;
66use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
67use datafusion_physical_plan::tree_node::PlanContext;
68use datafusion_physical_plan::windows::{
69 get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
70};
71use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
72
73use itertools::izip;
74
75/// This rule inspects [`SortExec`]'s in the given physical plan in order to
76/// remove unnecessary sorts, and optimize sort performance across the plan.
77#[derive(Default, Debug)]
78pub struct EnforceSorting {}
79
80impl EnforceSorting {
81 #[allow(missing_docs)]
82 pub fn new() -> Self {
83 Self {}
84 }
85}
86
87/// This context object is used within the [`EnforceSorting`] rule to track the closest
88/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
89/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
90/// via its children.
91pub type PlanWithCorrespondingSort = PlanContext<bool>;
92
93/// For a given node, update the [`PlanContext.data`] attribute.
94///
95/// If the node is a `SortExec`, or any of the node's children are a `SortExec`,
96/// then set the attribute to true.
97///
98/// This requires a bottom-up traversal was previously performed, updating the
99/// children previously.
100fn update_sort_ctx_children_data(
101 mut node_and_ctx: PlanWithCorrespondingSort,
102 data: bool,
103) -> Result<PlanWithCorrespondingSort> {
104 // Update `child.data` for all children.
105 for child_node in node_and_ctx.children.iter_mut() {
106 let child_plan = &child_node.plan;
107 child_node.data = if is_sort(child_plan) {
108 // child is sort
109 true
110 } else if is_limit(child_plan) {
111 // There is no sort linkage for this path, it starts at a limit.
112 false
113 } else {
114 // If a descendent is a sort, and the child maintains the sort.
115 let is_spm = is_sort_preserving_merge(child_plan);
116 let required_orderings = child_plan.required_input_ordering();
117 let flags = child_plan.maintains_input_order();
118 // Add parent node to the tree if there is at least one child with
119 // a sort connection:
120 izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
121 let propagates_ordering =
122 (maintains && required_ordering.is_none()) || is_spm;
123 // `connected_to_sort` only returns the correct answer with bottom-up traversal
124 let connected_to_sort =
125 child_node.children.iter().any(|child| child.data);
126 propagates_ordering && connected_to_sort
127 })
128 }
129 }
130
131 // set data attribute on current node
132 node_and_ctx.data = data;
133
134 Ok(node_and_ctx)
135}
136
137/// This object is used within the [`EnforceSorting`] rule to track the closest
138/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
139/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
140/// connected to a `CoalescePartitionsExec` via its children.
141///
142/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
143///
144/// This requires a bottom-up traversal was previously performed, updating the
145/// children previously.
146pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
147
148/// Discovers the linked Coalesce->Sort cascades.
149///
150/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
151/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added
152/// at the root of the subplan (just after the sort) in order to parallelize sorts.
153/// Refer to the [`parallelize_sorts`] for more details on sort parallelization.
154///
155/// Example of linked Coalesce->Sort:
156/// ```text
157/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
158/// ...nodes... ctx.data=true (e.g. are linked in cascade)
159/// Coalesce ctx.data=true (e.g. is a coalesce)
160/// ```
161///
162/// The link should not be continued (and the coalesce not removed) if the distribution
163/// is changed between the Coalesce->Sort cascade. Example:
164/// ```text
165/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
166/// AggregateExec ctx.data=false, to stop the link
167/// ...nodes... ctx.data=true (e.g. are linked in cascade)
168/// Coalesce ctx.data=true (e.g. is a coalesce)
169/// ```
170fn update_coalesce_ctx_children(
171 coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
172) {
173 let children = &coalesce_context.children;
174 coalesce_context.data = if children.is_empty() {
175 // Plan has no children, it cannot be a `CoalescePartitionsExec`.
176 false
177 } else if is_coalesce_partitions(&coalesce_context.plan) {
178 // Initiate a connection:
179 true
180 } else {
181 children.iter().enumerate().any(|(idx, node)| {
182 // Only consider operators that don't require a single partition,
183 // and connected to some `CoalescePartitionsExec`:
184 node.data
185 && !matches!(
186 coalesce_context.plan.required_input_distribution()[idx],
187 Distribution::SinglePartition
188 )
189 })
190 };
191}
192
193/// Performs optimizations based upon a series of subrules.
194///
195/// Refer to each subrule for detailed descriptions of the optimizations performed:
196/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`],
197/// and [`pushdown_sorts`].
198///
199/// Subrule application is ordering dependent.
200///
201/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled.
202impl PhysicalOptimizerRule for EnforceSorting {
203 fn optimize(
204 &self,
205 plan: Arc<dyn ExecutionPlan>,
206 config: &ConfigOptions,
207 ) -> Result<Arc<dyn ExecutionPlan>> {
208 let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
209 // Execute a bottom-up traversal to enforce sorting requirements,
210 // remove unnecessary sorts, and optimize sort-sensitive operators:
211 let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
212 let new_plan = if config.optimizer.repartition_sorts {
213 let plan_with_coalesce_partitions =
214 PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
215 let parallel = plan_with_coalesce_partitions
216 .transform_up(parallelize_sorts)
217 .data()?;
218 parallel.plan
219 } else {
220 adjusted.plan
221 };
222
223 let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
224 let updated_plan = plan_with_pipeline_fixer
225 .transform_up(|plan_with_pipeline_fixer| {
226 replace_with_order_preserving_variants(
227 plan_with_pipeline_fixer,
228 false,
229 true,
230 config,
231 )
232 })
233 .data()?;
234 // Execute a top-down traversal to exploit sort push-down opportunities
235 // missed by the bottom-up traversal:
236 let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
237 assign_initial_requirements(&mut sort_pushdown);
238 let adjusted = pushdown_sorts(sort_pushdown)?;
239 adjusted
240 .plan
241 .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
242 .data()
243 }
244
245 fn name(&self) -> &str {
246 "EnforceSorting"
247 }
248
249 fn schema_check(&self) -> bool {
250 true
251 }
252}
253
254fn replace_with_partial_sort(
255 plan: Arc<dyn ExecutionPlan>,
256) -> Result<Arc<dyn ExecutionPlan>> {
257 let plan_any = plan.as_any();
258 if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
259 let child = Arc::clone(sort_plan.children()[0]);
260 if !child.boundedness().is_unbounded() {
261 return Ok(plan);
262 }
263
264 // here we're trying to find the common prefix for sorted columns that is required for the
265 // sort and already satisfied by the given ordering
266 let child_eq_properties = child.equivalence_properties();
267 let sort_req = LexRequirement::from(sort_plan.expr().clone());
268
269 let mut common_prefix_length = 0;
270 while child_eq_properties.ordering_satisfy_requirement(&LexRequirement {
271 inner: sort_req[0..common_prefix_length + 1].to_vec(),
272 }) {
273 common_prefix_length += 1;
274 }
275 if common_prefix_length > 0 {
276 return Ok(Arc::new(
277 PartialSortExec::new(
278 LexOrdering::new(sort_plan.expr().to_vec()),
279 Arc::clone(sort_plan.input()),
280 common_prefix_length,
281 )
282 .with_preserve_partitioning(sort_plan.preserve_partitioning())
283 .with_fetch(sort_plan.fetch()),
284 ));
285 }
286 }
287 Ok(plan)
288}
289
290/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into
291/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below:
292///
293/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
294/// combine the partitions first, and then sort:
295/// ```text
296/// ┌ ─ ─ ─ ─ ─ ┐
297/// ┌─┬─┬─┐
298/// ││B│A│D│... ├──┐
299/// └─┴─┴─┘ │
300/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
301/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐
302/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │
303/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘
304/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
305/// ┌─┬─┐ │ Partition Partition
306/// ││E│C│ ... ├──┘
307/// └─┴─┘
308/// └ ─ ─ ─ ─ ─ ┘
309/// Partition 2
310/// ```
311///
312///
313/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
314/// sorts each partition first, then merge partitions while retaining the sort:
315/// ```text
316/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐
317/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐
318/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐
319/// └─┴─┴─┘ │ │ └─┴─┴─┘ │
320/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
321/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐
322/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
323/// │ │ │ └─┴─┴─┴─┴─┘
324/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
325/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition
326/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘
327/// └─┴─┘ │ │ └─┴─┘
328/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘
329/// Partition 2 Partition 2
330/// ```
331///
332/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the
333/// sort first on a per-partition basis, thereby parallelizing the sort.
334///
335///
336/// The outcome is that plans of the form
337/// ```text
338/// "SortExec: expr=\[a@0 ASC\]",
339/// " ...nodes..."
340/// " CoalescePartitionsExec",
341/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
342/// ```
343/// are transformed into
344/// ```text
345/// "SortPreservingMergeExec: \[a@0 ASC\]",
346/// " SortExec: expr=\[a@0 ASC\]",
347/// " ...nodes..."
348/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
349/// ```
350/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
351/// By performing sorting in parallel, we can increase performance in some scenarios.
352///
353/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
354/// which require single partitioning. Do not parallelize when the following scenario occurs:
355/// ```text
356/// "SortExec: expr=\[a@0 ASC\]",
357/// " ...nodes requiring single partitioning..."
358/// " CoalescePartitionsExec",
359/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
360/// ```
361pub fn parallelize_sorts(
362 mut requirements: PlanWithCorrespondingCoalescePartitions,
363) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
364 update_coalesce_ctx_children(&mut requirements);
365
366 if requirements.children.is_empty() || !requirements.children[0].data {
367 // We only take an action when the plan is either a `SortExec`, a
368 // `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
369 // all have a single child. Therefore, if the first child has no
370 // connection, we can return immediately.
371 Ok(Transformed::no(requirements))
372 } else if (is_sort(&requirements.plan)
373 || is_sort_preserving_merge(&requirements.plan))
374 && requirements.plan.output_partitioning().partition_count() <= 1
375 {
376 // Take the initial sort expressions and requirements
377 let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
378 let sort_reqs = LexRequirement::from(sort_exprs.clone());
379 let sort_exprs = sort_exprs.clone();
380
381 // If there is a connection between a `CoalescePartitionsExec` and a
382 // global sort that satisfy the requirements (i.e. intermediate
383 // executors don't require single partition), then we can replace
384 // the `CoalescePartitionsExec` + `SortExec` cascade with a `SortExec`
385 // + `SortPreservingMergeExec` cascade to parallelize sorting.
386 requirements = remove_bottleneck_in_subplan(requirements)?;
387 // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
388 // deals with the children and their children and so on.
389 requirements = requirements.children.swap_remove(0);
390
391 requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);
392
393 let spm =
394 SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
395 Ok(Transformed::yes(
396 PlanWithCorrespondingCoalescePartitions::new(
397 Arc::new(spm.with_fetch(fetch)),
398 false,
399 vec![requirements],
400 ),
401 ))
402 } else if is_coalesce_partitions(&requirements.plan) {
403 // There is an unnecessary `CoalescePartitionsExec` in the plan.
404 // This will handle the recursive `CoalescePartitionsExec` plans.
405 requirements = remove_bottleneck_in_subplan(requirements)?;
406 // For the removal of self node which is also a `CoalescePartitionsExec`.
407 requirements = requirements.children.swap_remove(0);
408
409 Ok(Transformed::yes(
410 PlanWithCorrespondingCoalescePartitions::new(
411 Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
412 false,
413 vec![requirements],
414 ),
415 ))
416 } else {
417 Ok(Transformed::yes(requirements))
418 }
419}
420
421/// This function enforces sorting requirements and makes optimizations without
422/// violating these requirements whenever possible. Requires a bottom-up traversal.
423pub fn ensure_sorting(
424 mut requirements: PlanWithCorrespondingSort,
425) -> Result<Transformed<PlanWithCorrespondingSort>> {
426 requirements = update_sort_ctx_children_data(requirements, false)?;
427
428 // Perform naive analysis at the beginning -- remove already-satisfied sorts:
429 if requirements.children.is_empty() {
430 return Ok(Transformed::no(requirements));
431 }
432 let maybe_requirements = analyze_immediate_sort_removal(requirements);
433 requirements = if !maybe_requirements.transformed {
434 maybe_requirements.data
435 } else {
436 return Ok(maybe_requirements);
437 };
438
439 let plan = &requirements.plan;
440 let mut updated_children = vec![];
441 for (idx, (required_ordering, mut child)) in plan
442 .required_input_ordering()
443 .into_iter()
444 .zip(requirements.children.into_iter())
445 .enumerate()
446 {
447 let physical_ordering = child.plan.output_ordering();
448
449 if let Some(required) = required_ordering {
450 let eq_properties = child.plan.equivalence_properties();
451 if !eq_properties.ordering_satisfy_requirement(&required) {
452 // Make sure we preserve the ordering requirements:
453 if physical_ordering.is_some() {
454 child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
455 }
456 child = add_sort_above(child, required, None);
457 child = update_sort_ctx_children_data(child, true)?;
458 }
459 } else if physical_ordering.is_none()
460 || !plan.maintains_input_order()[idx]
461 || is_union(plan)
462 {
463 // We have a `SortExec` whose effect may be neutralized by another
464 // order-imposing operator, remove this sort:
465 child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
466 }
467 updated_children.push(child);
468 }
469 requirements.children = updated_children;
470 requirements = requirements.update_plan_from_children()?;
471 // For window expressions, we can remove some sorts when we can
472 // calculate the result in reverse:
473 let child_node = &requirements.children[0];
474 if is_window(&requirements.plan) && child_node.data {
475 return adjust_window_sort_removal(requirements).map(Transformed::yes);
476 } else if is_sort_preserving_merge(&requirements.plan)
477 && child_node.plan.output_partitioning().partition_count() <= 1
478 {
479 // This `SortPreservingMergeExec` is unnecessary, input already has a
480 // single partition and no fetch is required.
481 let mut child_node = requirements.children.swap_remove(0);
482 if let Some(fetch) = requirements.plan.fetch() {
483 // Add the limit exec if the original SPM had a fetch:
484 child_node.plan =
485 Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
486 }
487 return Ok(Transformed::yes(child_node));
488 }
489 update_sort_ctx_children_data(requirements, false).map(Transformed::yes)
490}
491
492/// Analyzes a given [`SortExec`] (`plan`) to determine whether its input
493/// already has a finer ordering than it enforces.
494fn analyze_immediate_sort_removal(
495 mut node: PlanWithCorrespondingSort,
496) -> Transformed<PlanWithCorrespondingSort> {
497 if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
498 let sort_input = sort_exec.input();
499 // If this sort is unnecessary, we should remove it:
500 if sort_input.equivalence_properties().ordering_satisfy(
501 sort_exec
502 .properties()
503 .output_ordering()
504 .unwrap_or(LexOrdering::empty()),
505 ) {
506 node.plan = if !sort_exec.preserve_partitioning()
507 && sort_input.output_partitioning().partition_count() > 1
508 {
509 // Replace the sort with a sort-preserving merge:
510 let expr = LexOrdering::new(sort_exec.expr().to_vec());
511 Arc::new(
512 SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
513 .with_fetch(sort_exec.fetch()),
514 ) as _
515 } else {
516 // Remove the sort:
517 node.children = node.children.swap_remove(0).children;
518 if let Some(fetch) = sort_exec.fetch() {
519 // If the sort has a fetch, we need to add a limit:
520 if sort_exec
521 .properties()
522 .output_partitioning()
523 .partition_count()
524 == 1
525 {
526 Arc::new(GlobalLimitExec::new(
527 Arc::clone(sort_input),
528 0,
529 Some(fetch),
530 ))
531 } else {
532 Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch))
533 }
534 } else {
535 Arc::clone(sort_input)
536 }
537 };
538 for child in node.children.iter_mut() {
539 child.data = false;
540 }
541 node.data = false;
542 return Transformed::yes(node);
543 }
544 }
545 Transformed::no(node)
546}
547
548/// Adjusts a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
549/// whether it may allow removing a sort.
550fn adjust_window_sort_removal(
551 mut window_tree: PlanWithCorrespondingSort,
552) -> Result<PlanWithCorrespondingSort> {
553 // Window operators have a single child we need to adjust:
554 let child_node = remove_corresponding_sort_from_sub_plan(
555 window_tree.children.swap_remove(0),
556 matches!(
557 window_tree.plan.required_input_distribution()[0],
558 Distribution::SinglePartition
559 ),
560 )?;
561 window_tree.children.push(child_node);
562
563 let plan = window_tree.plan.as_any();
564 let child_plan = &window_tree.children[0].plan;
565 let (window_expr, new_window) =
566 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
567 let window_expr = exec.window_expr();
568 let new_window =
569 get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
570 (window_expr, new_window)
571 } else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
572 let window_expr = exec.window_expr();
573 let new_window =
574 get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
575 (window_expr, new_window)
576 } else {
577 return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
578 };
579
580 window_tree.plan = if let Some(new_window) = new_window {
581 // We were able to change the window to accommodate the input, use it:
582 new_window
583 } else {
584 // We were unable to change the window to accommodate the input, so we
585 // will insert a sort.
586 let reqs = window_tree
587 .plan
588 .required_input_ordering()
589 .swap_remove(0)
590 .unwrap_or_default();
591
592 // Satisfy the ordering requirement so that the window can run:
593 let mut child_node = window_tree.children.swap_remove(0);
594 child_node = add_sort_above(child_node, reqs, None);
595 let child_plan = Arc::clone(&child_node.plan);
596 window_tree.children.push(child_node);
597
598 if window_expr.iter().all(|e| e.uses_bounded_memory()) {
599 Arc::new(BoundedWindowAggExec::try_new(
600 window_expr.to_vec(),
601 child_plan,
602 InputOrderMode::Sorted,
603 !window_expr[0].partition_by().is_empty(),
604 )?) as _
605 } else {
606 Arc::new(WindowAggExec::try_new(
607 window_expr.to_vec(),
608 child_plan,
609 !window_expr[0].partition_by().is_empty(),
610 )?) as _
611 }
612 };
613
614 window_tree.data = false;
615 Ok(window_tree)
616}
617
618/// Removes parallelization-reducing, avoidable [`CoalescePartitionsExec`]s from
619/// the plan in `node`. After the removal of such `CoalescePartitionsExec`s from
620/// the plan, some of the remaining `RepartitionExec`s might become unnecessary.
621/// Removes such `RepartitionExec`s from the plan as well.
622fn remove_bottleneck_in_subplan(
623 mut requirements: PlanWithCorrespondingCoalescePartitions,
624) -> Result<PlanWithCorrespondingCoalescePartitions> {
625 let plan = &requirements.plan;
626 let children = &mut requirements.children;
627 if is_coalesce_partitions(&children[0].plan) {
628 // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
629 let mut new_child_node = children[0].children.swap_remove(0);
630 while new_child_node.plan.output_partitioning() == plan.output_partitioning()
631 && is_repartition(&new_child_node.plan)
632 && is_repartition(plan)
633 {
634 new_child_node = new_child_node.children.swap_remove(0)
635 }
636 children[0] = new_child_node;
637 } else {
638 requirements.children = requirements
639 .children
640 .into_iter()
641 .map(|node| {
642 if node.data {
643 remove_bottleneck_in_subplan(node)
644 } else {
645 Ok(node)
646 }
647 })
648 .collect::<Result<_>>()?;
649 }
650 let mut new_reqs = requirements.update_plan_from_children()?;
651 if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
652 let input_partitioning = repartition.input().output_partitioning();
653 // We can remove this repartitioning operator if it is now a no-op:
654 let mut can_remove = input_partitioning.eq(repartition.partitioning());
655 // We can also remove it if we ended up with an ineffective RR:
656 if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
657 can_remove |= *n_out == input_partitioning.partition_count();
658 }
659 if can_remove {
660 new_reqs = new_reqs.children.swap_remove(0)
661 }
662 }
663 Ok(new_reqs)
664}
665
666/// Updates child to remove the unnecessary sort below it.
667fn update_child_to_remove_unnecessary_sort(
668 child_idx: usize,
669 mut node: PlanWithCorrespondingSort,
670 parent: &Arc<dyn ExecutionPlan>,
671) -> Result<PlanWithCorrespondingSort> {
672 if node.data {
673 let requires_single_partition = matches!(
674 parent.required_input_distribution()[child_idx],
675 Distribution::SinglePartition
676 );
677 node = remove_corresponding_sort_from_sub_plan(node, requires_single_partition)?;
678 }
679 node.data = false;
680 Ok(node)
681}
682
683/// Removes the sort from the plan in `node`.
684fn remove_corresponding_sort_from_sub_plan(
685 mut node: PlanWithCorrespondingSort,
686 requires_single_partition: bool,
687) -> Result<PlanWithCorrespondingSort> {
688 // A `SortExec` is always at the bottom of the tree.
689 if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
690 // Do not remove sorts with fetch:
691 if sort_exec.fetch().is_none() {
692 node = node.children.swap_remove(0);
693 }
694 } else {
695 let mut any_connection = false;
696 let required_dist = node.plan.required_input_distribution();
697 node.children = node
698 .children
699 .into_iter()
700 .enumerate()
701 .map(|(idx, child)| {
702 if child.data {
703 any_connection = true;
704 remove_corresponding_sort_from_sub_plan(
705 child,
706 matches!(required_dist[idx], Distribution::SinglePartition),
707 )
708 } else {
709 Ok(child)
710 }
711 })
712 .collect::<Result<_>>()?;
713 node = node.update_plan_from_children()?;
714 if any_connection || node.children.is_empty() {
715 node = update_sort_ctx_children_data(node, false)?;
716 }
717
718 // Replace with variants that do not preserve order.
719 if is_sort_preserving_merge(&node.plan) {
720 node.children = node.children.swap_remove(0).children;
721 node.plan = Arc::clone(node.plan.children().swap_remove(0));
722 } else if let Some(repartition) =
723 node.plan.as_any().downcast_ref::<RepartitionExec>()
724 {
725 node.plan = Arc::new(RepartitionExec::try_new(
726 Arc::clone(&node.children[0].plan),
727 repartition.properties().output_partitioning().clone(),
728 )?) as _;
729 }
730 };
731 // Deleting a merging sort may invalidate distribution requirements.
732 // Ensure that we stay compliant with such requirements:
733 if requires_single_partition && node.plan.output_partitioning().partition_count() > 1
734 {
735 // If there is existing ordering, to preserve ordering use
736 // `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
737 let plan = Arc::clone(&node.plan);
738 let fetch = plan.fetch();
739 let plan = if let Some(ordering) = plan.output_ordering() {
740 Arc::new(
741 SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
742 .with_fetch(fetch),
743 ) as _
744 } else {
745 Arc::new(CoalescePartitionsExec::new(plan)) as _
746 };
747 node = PlanWithCorrespondingSort::new(plan, false, vec![node]);
748 node = update_sort_ctx_children_data(node, false)?;
749 }
750 Ok(node)
751}
752
753/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible.
754fn get_sort_exprs(
755 sort_any: &Arc<dyn ExecutionPlan>,
756) -> Result<(&LexOrdering, Option<usize>)> {
757 if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
758 Ok((sort_exec.expr(), sort_exec.fetch()))
759 } else if let Some(spm) = sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
760 {
761 Ok((spm.expr(), spm.fetch()))
762 } else {
763 plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec")
764 }
765}
766
767// Tests are in tests/cases/enforce_sorting.rs