datafusion_physical_optimizer/enforce_sorting/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceSorting optimizer rule inspects the physical plan with respect
19//! to local sorting requirements and does the following:
20//! - Adds a [`SortExec`] when a requirement is not met,
21//! - Removes an already-existing [`SortExec`] if it is possible to prove
22//! that this sort is unnecessary
23//!
24//! The rule can work on valid *and* invalid physical plans with respect to
25//! sorting requirements, but always produces a valid physical plan in this sense.
26//!
27//! A non-realistic but easy to follow example for sort removals: Assume that we
28//! somehow get the fragment
29//!
30//! ```text
31//! SortExec: expr=[nullable_col@0 ASC]
32//! SortExec: expr=[non_nullable_col@1 ASC]
33//! ```
34//!
35//! in the physical plan. The first sort is unnecessary since its result is overwritten
36//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
37
38pub mod replace_with_order_preserving_variants;
39pub mod sort_pushdown;
40
41use std::sync::Arc;
42
43use crate::PhysicalOptimizerRule;
44use crate::enforce_sorting::replace_with_order_preserving_variants::{
45 OrderPreservationContext, replace_with_order_preserving_variants,
46};
47use crate::enforce_sorting::sort_pushdown::{
48 SortPushDown, assign_initial_requirements, pushdown_sorts,
49};
50use crate::output_requirements::OutputRequirementExec;
51use crate::utils::{
52 add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
53 is_repartition, is_sort, is_sort_preserving_merge, is_window,
54};
55
56use datafusion_common::Result;
57use datafusion_common::config::ConfigOptions;
58use datafusion_common::plan_err;
59use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
60use datafusion_physical_expr::{Distribution, Partitioning};
61use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
62use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
63use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
64use datafusion_physical_plan::repartition::RepartitionExec;
65use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
66use datafusion_physical_plan::sorts::sort::SortExec;
67use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
68use datafusion_physical_plan::tree_node::PlanContext;
69use datafusion_physical_plan::windows::{
70 BoundedWindowAggExec, WindowAggExec, get_best_fitting_window,
71};
72use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
73
74use itertools::izip;
75
76/// This rule inspects [`SortExec`]'s in the given physical plan in order to
77/// remove unnecessary sorts, and optimize sort performance across the plan.
78#[derive(Default, Debug)]
79pub struct EnforceSorting {}
80
81impl EnforceSorting {
82 #[expect(missing_docs)]
83 pub fn new() -> Self {
84 Self {}
85 }
86}
87
88/// This context object is used within the [`EnforceSorting`] rule to track the closest
89/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
90/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
91/// via its children.
92pub type PlanWithCorrespondingSort = PlanContext<bool>;
93
94/// For a given node, update the `PlanContext.data` attribute.
95///
96/// If the node is a `SortExec`, or any of the node's children are a `SortExec`,
97/// then set the attribute to true.
98///
99/// This requires a bottom-up traversal was previously performed, updating the
100/// children previously.
101fn update_sort_ctx_children_data(
102 mut node_and_ctx: PlanWithCorrespondingSort,
103 data: bool,
104) -> Result<PlanWithCorrespondingSort> {
105 // Update `child.data` for all children.
106 for child_node in node_and_ctx.children.iter_mut() {
107 let child_plan = &child_node.plan;
108 child_node.data = if is_sort(child_plan) {
109 // child is sort
110 true
111 } else if is_limit(child_plan) {
112 // There is no sort linkage for this path, it starts at a limit.
113 false
114 } else {
115 // If a descendent is a sort, and the child maintains the sort.
116 let is_spm = is_sort_preserving_merge(child_plan);
117 let required_orderings = child_plan.required_input_ordering();
118 let flags = child_plan.maintains_input_order();
119 // Add parent node to the tree if there is at least one child with
120 // a sort connection:
121 izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
122 let propagates_ordering =
123 (maintains && required_ordering.is_none()) || is_spm;
124 // `connected_to_sort` only returns the correct answer with bottom-up traversal
125 let connected_to_sort =
126 child_node.children.iter().any(|child| child.data);
127 propagates_ordering && connected_to_sort
128 })
129 }
130 }
131
132 // set data attribute on current node
133 node_and_ctx.data = data;
134
135 Ok(node_and_ctx)
136}
137
138/// This object is used within the [`EnforceSorting`] rule to track the closest
139/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
140/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
141/// connected to a `CoalescePartitionsExec` via its children.
142///
143/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
144///
145/// This requires a bottom-up traversal was previously performed, updating the
146/// children previously.
147pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
148
149/// Discovers the linked Coalesce->Sort cascades.
150///
151/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
152/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added
153/// at the root of the subplan (just after the sort) in order to parallelize sorts.
154/// Refer to the [`parallelize_sorts`] for more details on sort parallelization.
155///
156/// Example of linked Coalesce->Sort:
157/// ```text
158/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
159/// ...nodes... ctx.data=true (e.g. are linked in cascade)
160/// Coalesce ctx.data=true (e.g. is a coalesce)
161/// ```
162///
163/// The link should not be continued (and the coalesce not removed) if the distribution
164/// is changed between the Coalesce->Sort cascade. Example:
165/// ```text
166/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
167/// AggregateExec ctx.data=false, to stop the link
168/// ...nodes... ctx.data=true (e.g. are linked in cascade)
169/// Coalesce ctx.data=true (e.g. is a coalesce)
170/// ```
171fn update_coalesce_ctx_children(
172 coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
173) {
174 let children = &coalesce_context.children;
175 coalesce_context.data = if children.is_empty() {
176 // Plan has no children, it cannot be a `CoalescePartitionsExec`.
177 false
178 } else if is_coalesce_partitions(&coalesce_context.plan) {
179 // Initiate a connection:
180 true
181 } else {
182 children.iter().enumerate().any(|(idx, node)| {
183 // Only consider operators that don't require a single partition,
184 // and connected to some `CoalescePartitionsExec`:
185 node.data
186 && !matches!(
187 coalesce_context.plan.required_input_distribution()[idx],
188 Distribution::SinglePartition
189 )
190 })
191 };
192}
193
194/// Performs optimizations based upon a series of subrules.
195/// Refer to each subrule for detailed descriptions of the optimizations performed:
196/// Subrule application is ordering dependent.
197///
198/// Optimizer consists of 5 main parts which work sequentially
199/// 1. [`ensure_sorting`] Works down-to-top to be able to remove unnecessary [`SortExec`]s, [`SortPreservingMergeExec`]s
200/// add [`SortExec`]s if necessary by a requirement and adjusts window operators.
201/// 2. [`parallelize_sorts`] (Optional, depends on the `repartition_sorts` configuration)
202/// Responsible to identify and remove unnecessary partition unifier operators
203/// such as [`SortPreservingMergeExec`], [`CoalescePartitionsExec`] follows [`SortExec`]s does possible simplifications.
204/// 3. [`replace_with_order_preserving_variants()`] Replaces with alternative operators, for example can merge
205/// a [`SortExec`] and a [`CoalescePartitionsExec`] into one [`SortPreservingMergeExec`]
206/// or a [`SortExec`] + [`RepartitionExec`] combination into an order preserving [`RepartitionExec`]
207/// 4. [`sort_pushdown`] Works top-down. Responsible to push down sort operators as deep as possible in the plan.
208/// 5. `replace_with_partial_sort` Checks if it's possible to replace [`SortExec`]s with [`PartialSortExec`] operators
209impl PhysicalOptimizerRule for EnforceSorting {
210 fn optimize(
211 &self,
212 plan: Arc<dyn ExecutionPlan>,
213 config: &ConfigOptions,
214 ) -> Result<Arc<dyn ExecutionPlan>> {
215 let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
216 // Execute a bottom-up traversal to enforce sorting requirements,
217 // remove unnecessary sorts, and optimize sort-sensitive operators:
218 let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
219 let new_plan = if config.optimizer.repartition_sorts {
220 let plan_with_coalesce_partitions =
221 PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
222 let parallel = plan_with_coalesce_partitions
223 .transform_up(parallelize_sorts)
224 .data()?;
225 parallel.plan
226 } else {
227 adjusted.plan
228 };
229
230 let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
231 let updated_plan = plan_with_pipeline_fixer
232 .transform_up(|plan_with_pipeline_fixer| {
233 replace_with_order_preserving_variants(
234 plan_with_pipeline_fixer,
235 false,
236 true,
237 config,
238 )
239 })
240 .data()?;
241 // Execute a top-down traversal to exploit sort push-down opportunities
242 // missed by the bottom-up traversal:
243 let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
244 assign_initial_requirements(&mut sort_pushdown);
245 let adjusted = pushdown_sorts(sort_pushdown)?;
246 adjusted
247 .plan
248 .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
249 .data()
250 }
251
252 fn name(&self) -> &str {
253 "EnforceSorting"
254 }
255
256 fn schema_check(&self) -> bool {
257 true
258 }
259}
260
261/// Only interested with [`SortExec`]s and their unbounded children.
262/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
263/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
264/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
265fn replace_with_partial_sort(
266 plan: Arc<dyn ExecutionPlan>,
267) -> Result<Arc<dyn ExecutionPlan>> {
268 let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {
269 return Ok(plan);
270 };
271
272 // It's safe to get first child of the SortExec
273 let child = Arc::clone(sort_plan.children()[0]);
274 if !child.boundedness().is_unbounded() {
275 return Ok(plan);
276 }
277
278 // Here we're trying to find the common prefix for sorted columns that is required for the
279 // sort and already satisfied by the given ordering
280 let child_eq_properties = child.equivalence_properties();
281 let sort_exprs = sort_plan.expr().clone();
282
283 let mut common_prefix_length = 0;
284 while child_eq_properties
285 .ordering_satisfy(sort_exprs[0..common_prefix_length + 1].to_vec())?
286 {
287 common_prefix_length += 1;
288 }
289 if common_prefix_length > 0 {
290 return Ok(Arc::new(
291 PartialSortExec::new(
292 sort_exprs,
293 Arc::clone(sort_plan.input()),
294 common_prefix_length,
295 )
296 .with_preserve_partitioning(sort_plan.preserve_partitioning())
297 .with_fetch(sort_plan.fetch()),
298 ));
299 }
300 Ok(plan)
301}
302
303/// Transform [`CoalescePartitionsExec`] + [`SortExec`] cascades into [`SortExec`]
304/// + [`SortPreservingMergeExec`] cascades, as illustrated below.
305///
306/// A [`CoalescePartitionsExec`] + [`SortExec`] cascade combines partitions
307/// first, and then sorts:
308/// ```text
309/// ┌ ─ ─ ─ ─ ─ ┐
310/// ┌─┬─┬─┐
311/// ││B│A│D│... ├──┐
312/// └─┴─┴─┘ │
313/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
314/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐
315/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │
316/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘
317/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
318/// ┌─┬─┐ │ Partition Partition
319/// ││E│C│ ... ├──┘
320/// └─┴─┘
321/// └ ─ ─ ─ ─ ─ ┘
322/// Partition 2
323/// ```
324///
325///
326/// A [`SortExec`] + [`SortPreservingMergeExec`] cascade sorts each partition
327/// first, then merges partitions while preserving the sort:
328/// ```text
329/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐
330/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐
331/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐
332/// └─┴─┴─┘ │ │ └─┴─┴─┘ │
333/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
334/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐
335/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
336/// │ │ │ └─┴─┴─┴─┴─┘
337/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
338/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition
339/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘
340/// └─┴─┘ │ │ └─┴─┘
341/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘
342/// Partition 2 Partition 2
343/// ```
344///
345/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs
346/// sorting first on a per-partition basis, thereby parallelizing the sort.
347///
348/// The outcome is that plans of the form
349/// ```text
350/// "SortExec: expr=\[a@0 ASC\]",
351/// " ...nodes..."
352/// " CoalescePartitionsExec",
353/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
354/// ```
355/// are transformed into
356/// ```text
357/// "SortPreservingMergeExec: \[a@0 ASC\]",
358/// " SortExec: expr=\[a@0 ASC\]",
359/// " ...nodes..."
360/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
361/// ```
362/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
363/// By performing sorting in parallel, we can increase performance in some
364/// scenarios.
365///
366/// This optimization requires that there are no nodes between the [`SortExec`]
367/// and the [`CoalescePartitionsExec`], which requires single partitioning. Do
368/// not parallelize when the following scenario occurs:
369/// ```text
370/// "SortExec: expr=\[a@0 ASC\]",
371/// " ...nodes requiring single partitioning..."
372/// " CoalescePartitionsExec",
373/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
374/// ```
375///
376/// **Steps**
377/// 1. Checks if the plan is either a [`SortExec`], a [`SortPreservingMergeExec`],
378/// or a [`CoalescePartitionsExec`]. Otherwise, does nothing.
379/// 2. If the plan is a [`SortExec`] or a final [`SortPreservingMergeExec`]
380/// (i.e. output partitioning is 1):
381/// - Check for [`CoalescePartitionsExec`] in children. If found, check if
382/// it can be removed (with possible [`RepartitionExec`]s). If so, remove
383/// (see `remove_bottleneck_in_subplan`).
384/// - If the plan is satisfying the ordering requirements, add a `SortExec`.
385/// - Add an SPM above the plan and return.
386/// 3. If the plan is a [`CoalescePartitionsExec`]:
387/// - Check if it can be removed (with possible [`RepartitionExec`]s).
388/// If so, remove (see `remove_bottleneck_in_subplan`).
389pub fn parallelize_sorts(
390 mut requirements: PlanWithCorrespondingCoalescePartitions,
391) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
392 update_coalesce_ctx_children(&mut requirements);
393
394 if requirements.children.is_empty() || !requirements.children[0].data {
395 // We only take an action when the plan is either a `SortExec`, a
396 // `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
397 // all have a single child. Therefore, if the first child has no
398 // connection, we can return immediately.
399 Ok(Transformed::no(requirements))
400 } else if (is_sort(&requirements.plan)
401 || is_sort_preserving_merge(&requirements.plan))
402 && requirements.plan.output_partitioning().partition_count() <= 1
403 {
404 // Take the initial sort expressions and requirements
405 let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
406 let sort_reqs = LexRequirement::from(sort_exprs.clone());
407 let sort_exprs = sort_exprs.clone();
408
409 // If there is a connection between a `CoalescePartitionsExec` and a
410 // global sort that satisfy the requirements (i.e. intermediate
411 // executors don't require single partition), then we can replace
412 // the `CoalescePartitionsExec` + `SortExec` cascade with a `SortExec`
413 // + `SortPreservingMergeExec` cascade to parallelize sorting.
414 requirements = remove_bottleneck_in_subplan(requirements)?;
415 // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
416 // deals with the children and their children and so on.
417 requirements = requirements.children.swap_remove(0);
418
419 requirements = add_sort_above_with_check(requirements, sort_reqs, fetch)?;
420
421 let spm =
422 SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
423 Ok(Transformed::yes(
424 PlanWithCorrespondingCoalescePartitions::new(
425 Arc::new(spm.with_fetch(fetch)),
426 false,
427 vec![requirements],
428 ),
429 ))
430 } else if is_coalesce_partitions(&requirements.plan) {
431 let fetch = requirements.plan.fetch();
432 // There is an unnecessary `CoalescePartitionsExec` in the plan.
433 // This will handle the recursive `CoalescePartitionsExec` plans.
434 requirements = remove_bottleneck_in_subplan(requirements)?;
435 // For the removal of self node which is also a `CoalescePartitionsExec`.
436 requirements = requirements.children.swap_remove(0);
437
438 Ok(Transformed::yes(
439 PlanWithCorrespondingCoalescePartitions::new(
440 Arc::new(
441 CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
442 .with_fetch(fetch),
443 ),
444 false,
445 vec![requirements],
446 ),
447 ))
448 } else {
449 Ok(Transformed::yes(requirements))
450 }
451}
452
453/// This function enforces sorting requirements and makes optimizations without
454/// violating these requirements whenever possible. Requires a bottom-up traversal.
455///
456/// **Steps**
457/// 1. Analyze if there are any immediate removals of [`SortExec`]s. If so,
458/// removes them (see `analyze_immediate_sort_removal`).
459/// 2. For each child of the plan, if the plan requires an input ordering:
460/// - Checks if ordering is satisfied with the child. If not:
461/// - If the child has an output ordering, removes the unnecessary
462/// `SortExec`.
463/// - Adds sort above the child plan.
464/// - (Plan not requires input ordering)
465/// - Checks if the `SortExec` is neutralized in the plan. If so,
466/// removes it.
467/// 3. Check and modify window operator:
468/// - Checks if the plan is a window operator, and connected with a sort.
469/// If so, either tries to update the window definition or removes
470/// unnecessary [`SortExec`]s (see `adjust_window_sort_removal`).
471/// 4. Check and remove possibly unnecessary SPM:
472/// - Checks if the plan is SPM and child 1 output partitions, if so
473/// decides this SPM is unnecessary and removes it from the plan.
474pub fn ensure_sorting(
475 mut requirements: PlanWithCorrespondingSort,
476) -> Result<Transformed<PlanWithCorrespondingSort>> {
477 requirements = update_sort_ctx_children_data(requirements, false)?;
478
479 // Perform naive analysis at the beginning -- remove already-satisfied sorts:
480 if requirements.children.is_empty() {
481 return Ok(Transformed::no(requirements));
482 }
483 let maybe_requirements = analyze_immediate_sort_removal(requirements)?;
484 requirements = if !maybe_requirements.transformed {
485 maybe_requirements.data
486 } else {
487 return Ok(maybe_requirements);
488 };
489
490 let plan = &requirements.plan;
491 let mut updated_children = vec![];
492 for (idx, (required_ordering, mut child)) in plan
493 .required_input_ordering()
494 .into_iter()
495 .zip(requirements.children)
496 .enumerate()
497 {
498 let physical_ordering = child.plan.output_ordering();
499
500 if let Some(required) = required_ordering {
501 let eq_properties = child.plan.equivalence_properties();
502 let req = required.into_single();
503 if !eq_properties.ordering_satisfy_requirement(req.clone())? {
504 // Make sure we preserve the ordering requirements:
505 if physical_ordering.is_some() {
506 child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
507 }
508 child = add_sort_above(
509 child,
510 req,
511 plan.downcast_ref::<OutputRequirementExec>()
512 .map(|output| output.fetch())
513 .unwrap_or(None),
514 );
515 child = update_sort_ctx_children_data(child, true)?;
516 }
517 } else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] {
518 // We have a `SortExec` whose effect may be neutralized by another
519 // order-imposing operator, remove this sort:
520 child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
521 }
522 updated_children.push(child);
523 }
524 requirements.children = updated_children;
525 requirements = requirements.update_plan_from_children()?;
526 // For window expressions, we can remove some sorts when we can
527 // calculate the result in reverse:
528 let child_node = &requirements.children[0];
529 if is_window(&requirements.plan) && child_node.data {
530 return adjust_window_sort_removal(requirements).map(Transformed::yes);
531 } else if is_sort_preserving_merge(&requirements.plan)
532 && child_node.plan.output_partitioning().partition_count() <= 1
533 {
534 // This `SortPreservingMergeExec` is unnecessary, input already has a
535 // single partition and no fetch is required.
536 let mut child_node = requirements.children.swap_remove(0);
537 if let Some(fetch) = requirements.plan.fetch() {
538 // Add the limit exec if the original SPM had a fetch:
539 child_node.plan =
540 Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
541 }
542 return Ok(Transformed::yes(child_node));
543 }
544 update_sort_ctx_children_data(requirements, false).map(Transformed::yes)
545}
546
547/// Analyzes if there are any immediate sort removals by checking the `SortExec`s
548/// and their ordering requirement satisfactions with children
549/// If the sort is unnecessary, either replaces it with
550/// [`SortPreservingMergeExec`] and/or a limit node, or removes the
551/// [`SortExec`].
552/// Otherwise, returns the original plan
553fn analyze_immediate_sort_removal(
554 mut node: PlanWithCorrespondingSort,
555) -> Result<Transformed<PlanWithCorrespondingSort>> {
556 let Some(sort_exec) = node.plan.downcast_ref::<SortExec>() else {
557 return Ok(Transformed::no(node));
558 };
559 let sort_input = sort_exec.input();
560 // Check if the sort is unnecessary:
561 let properties = sort_exec.properties();
562 if let Some(ordering) = properties.output_ordering().cloned() {
563 let eqp = sort_input.equivalence_properties();
564 if !eqp.ordering_satisfy(ordering)? {
565 return Ok(Transformed::no(node));
566 }
567 }
568 node.plan = if !sort_exec.preserve_partitioning()
569 && sort_input.output_partitioning().partition_count() > 1
570 {
571 // Replace the sort with a sort-preserving merge:
572 Arc::new(
573 SortPreservingMergeExec::new(
574 sort_exec.expr().clone(),
575 Arc::clone(sort_input),
576 )
577 .with_fetch(sort_exec.fetch()),
578 ) as _
579 } else {
580 // Remove the sort:
581 node.children = node.children.swap_remove(0).children;
582 if let Some(fetch) = sort_exec.fetch() {
583 let required_ordering = sort_exec.properties().output_ordering().cloned();
584 // If the sort has a fetch, we need to add a limit:
585 if properties.output_partitioning().partition_count() == 1 {
586 let mut global_limit =
587 GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch));
588 global_limit.set_required_ordering(required_ordering);
589 Arc::new(global_limit)
590 } else {
591 let mut local_limit = LocalLimitExec::new(Arc::clone(sort_input), fetch);
592 local_limit.set_required_ordering(required_ordering);
593 Arc::new(local_limit)
594 }
595 } else {
596 Arc::clone(sort_input)
597 }
598 };
599 for child in node.children.iter_mut() {
600 child.data = false;
601 }
602 node.data = false;
603 Ok(Transformed::yes(node))
604}
605
606/// Adjusts a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
607/// whether it may allow removing a sort.
608fn adjust_window_sort_removal(
609 mut window_tree: PlanWithCorrespondingSort,
610) -> Result<PlanWithCorrespondingSort> {
611 // Window operators have a single child we need to adjust:
612 let child_node = remove_corresponding_sort_from_sub_plan(
613 window_tree.children.swap_remove(0),
614 matches!(
615 window_tree.plan.required_input_distribution()[0],
616 Distribution::SinglePartition
617 ),
618 )?;
619 window_tree.children.push(child_node);
620
621 let child_plan = &window_tree.children[0].plan;
622 let (window_expr, new_window) = if let Some(exec) =
623 window_tree.plan.downcast_ref::<WindowAggExec>()
624 {
625 let window_expr = exec.window_expr();
626 let new_window =
627 get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
628 (window_expr, new_window)
629 } else if let Some(exec) = window_tree.plan.downcast_ref::<BoundedWindowAggExec>() {
630 let window_expr = exec.window_expr();
631 let new_window =
632 get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
633 (window_expr, new_window)
634 } else {
635 return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
636 };
637
638 window_tree.plan = if let Some(new_window) = new_window {
639 // We were able to change the window to accommodate the input, use it:
640 new_window
641 } else {
642 // We were unable to change the window to accommodate the input, so we
643 // will insert a sort.
644 let reqs = window_tree.plan.required_input_ordering().swap_remove(0);
645
646 // Satisfy the ordering requirement so that the window can run:
647 let mut child_node = window_tree.children.swap_remove(0);
648 if let Some(reqs) = reqs {
649 child_node = add_sort_above(child_node, reqs.into_single(), None);
650 }
651 let child_plan = Arc::clone(&child_node.plan);
652 window_tree.children.push(child_node);
653
654 if window_expr.iter().all(|e| e.uses_bounded_memory()) {
655 Arc::new(BoundedWindowAggExec::try_new(
656 window_expr.to_vec(),
657 child_plan,
658 InputOrderMode::Sorted,
659 !window_expr[0].partition_by().is_empty(),
660 )?) as _
661 } else {
662 Arc::new(WindowAggExec::try_new(
663 window_expr.to_vec(),
664 child_plan,
665 !window_expr[0].partition_by().is_empty(),
666 )?) as _
667 }
668 };
669
670 window_tree.data = false;
671 Ok(window_tree)
672}
673
674/// Removes parallelization-reducing, avoidable [`CoalescePartitionsExec`]s from
675/// the plan in `node`. After the removal of such `CoalescePartitionsExec`s from
676/// the plan, some of the remaining `RepartitionExec`s might become unnecessary.
677/// Removes such `RepartitionExec`s from the plan as well.
678fn remove_bottleneck_in_subplan(
679 mut requirements: PlanWithCorrespondingCoalescePartitions,
680) -> Result<PlanWithCorrespondingCoalescePartitions> {
681 let plan = &requirements.plan;
682 let children = &mut requirements.children;
683 if is_coalesce_partitions(&children[0].plan) {
684 // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
685 let mut new_child_node = children[0].children.swap_remove(0);
686 while new_child_node.plan.output_partitioning() == plan.output_partitioning()
687 && is_repartition(&new_child_node.plan)
688 && is_repartition(plan)
689 {
690 new_child_node = new_child_node.children.swap_remove(0)
691 }
692 children[0] = new_child_node;
693 } else {
694 requirements.children = requirements
695 .children
696 .into_iter()
697 .map(|node| {
698 if node.data {
699 remove_bottleneck_in_subplan(node)
700 } else {
701 Ok(node)
702 }
703 })
704 .collect::<Result<_>>()?;
705 }
706 let mut new_reqs = requirements.update_plan_from_children()?;
707 if let Some(repartition) = new_reqs.plan.downcast_ref::<RepartitionExec>() {
708 let input_partitioning = repartition.input().output_partitioning();
709 // We can remove this repartitioning operator if it is now a no-op:
710 let mut can_remove = input_partitioning.eq(repartition.partitioning());
711 // We can also remove it if we ended up with an ineffective RR:
712 if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
713 can_remove |= *n_out == input_partitioning.partition_count();
714 }
715 if can_remove {
716 new_reqs = new_reqs.children.swap_remove(0)
717 }
718 }
719 Ok(new_reqs)
720}
721
722/// Updates child to remove the unnecessary sort below it.
723fn update_child_to_remove_unnecessary_sort(
724 child_idx: usize,
725 mut node: PlanWithCorrespondingSort,
726 parent: &Arc<dyn ExecutionPlan>,
727) -> Result<PlanWithCorrespondingSort> {
728 if node.data {
729 let requires_single_partition = matches!(
730 parent.required_input_distribution()[child_idx],
731 Distribution::SinglePartition
732 );
733 node = remove_corresponding_sort_from_sub_plan(node, requires_single_partition)?;
734 }
735 node.data = false;
736 Ok(node)
737}
738
739/// Removes the sort from the plan in `node`.
740fn remove_corresponding_sort_from_sub_plan(
741 mut node: PlanWithCorrespondingSort,
742 requires_single_partition: bool,
743) -> Result<PlanWithCorrespondingSort> {
744 // A `SortExec` is always at the bottom of the tree.
745 if let Some(sort_exec) = node.plan.downcast_ref::<SortExec>() {
746 // Do not remove sorts with fetch:
747 if sort_exec.fetch().is_none() {
748 node = node.children.swap_remove(0);
749 }
750 } else {
751 let mut any_connection = false;
752 let required_dist = node.plan.required_input_distribution();
753 node.children = node
754 .children
755 .into_iter()
756 .enumerate()
757 .map(|(idx, child)| {
758 if child.data {
759 any_connection = true;
760 remove_corresponding_sort_from_sub_plan(
761 child,
762 matches!(required_dist[idx], Distribution::SinglePartition),
763 )
764 } else {
765 Ok(child)
766 }
767 })
768 .collect::<Result<_>>()?;
769 node = node.update_plan_from_children()?;
770 if any_connection || node.children.is_empty() {
771 node = update_sort_ctx_children_data(node, false)?;
772 }
773
774 // Replace with variants that do not preserve order.
775 if is_sort_preserving_merge(&node.plan) {
776 node.children = node.children.swap_remove(0).children;
777 node.plan = Arc::clone(node.plan.children().swap_remove(0));
778 } else if let Some(repartition) = node.plan.downcast_ref::<RepartitionExec>() {
779 node.plan = Arc::new(RepartitionExec::try_new(
780 Arc::clone(&node.children[0].plan),
781 repartition.properties().output_partitioning().clone(),
782 )?) as _;
783 }
784 };
785 // Deleting a merging sort may invalidate distribution requirements.
786 // Ensure that we stay compliant with such requirements:
787 if requires_single_partition && node.plan.output_partitioning().partition_count() > 1
788 {
789 // If there is existing ordering, to preserve ordering use
790 // `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
791 let plan = Arc::clone(&node.plan);
792 let fetch = plan.fetch();
793 let plan = if let Some(ordering) = plan.output_ordering() {
794 Arc::new(
795 SortPreservingMergeExec::new(ordering.clone(), plan).with_fetch(fetch),
796 ) as _
797 } else {
798 Arc::new(CoalescePartitionsExec::new(plan)) as _
799 };
800 node = PlanWithCorrespondingSort::new(plan, false, vec![node]);
801 node = update_sort_ctx_children_data(node, false)?;
802 }
803 Ok(node)
804}
805
806/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible.
807fn get_sort_exprs(
808 sort_any: &Arc<dyn ExecutionPlan>,
809) -> Result<(&LexOrdering, Option<usize>)> {
810 if let Some(sort_exec) = sort_any.downcast_ref::<SortExec>() {
811 Ok((sort_exec.expr(), sort_exec.fetch()))
812 } else if let Some(spm) = sort_any.downcast_ref::<SortPreservingMergeExec>() {
813 Ok((spm.expr(), spm.fetch()))
814 } else {
815 plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec")
816 }
817}
818
819// Tests are in tests/cases/enforce_sorting.rs