datafusion_physical_optimizer/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_physical_expr::LexRequirement;
21use datafusion_physical_expr_common::sort_expr::LexOrdering;
22use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
23use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
24use datafusion_physical_plan::repartition::RepartitionExec;
25use datafusion_physical_plan::sorts::sort::SortExec;
26use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27use datafusion_physical_plan::tree_node::PlanContext;
28use datafusion_physical_plan::union::UnionExec;
29use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
30use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
31
32/// This utility function adds a `SortExec` above an operator according to the
33/// given ordering requirements while preserving the original partitioning.
34///
35/// Note that this updates the plan in both the [`PlanContext.children`] and
36/// the [`PlanContext.plan`]'s children. Therefore its not required to sync
37/// the child plans with [`PlanContext::update_plan_from_children`].
38pub fn add_sort_above<T: Clone + Default>(
39    node: PlanContext<T>,
40    sort_requirements: LexRequirement,
41    fetch: Option<usize>,
42) -> PlanContext<T> {
43    let mut sort_expr = LexOrdering::from(sort_requirements);
44    sort_expr.retain(|sort_expr| {
45        !node
46            .plan
47            .equivalence_properties()
48            .is_expr_constant(&sort_expr.expr)
49    });
50    let mut new_sort = SortExec::new(sort_expr, Arc::clone(&node.plan)).with_fetch(fetch);
51    if node.plan.output_partitioning().partition_count() > 1 {
52        new_sort = new_sort.with_preserve_partitioning(true);
53    }
54    PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
55}
56
57/// This utility function adds a `SortExec` above an operator according to the
58/// given ordering requirements while preserving the original partitioning. If
59/// requirement is already satisfied no `SortExec` is added.
60pub fn add_sort_above_with_check<T: Clone + Default>(
61    node: PlanContext<T>,
62    sort_requirements: LexRequirement,
63    fetch: Option<usize>,
64) -> PlanContext<T> {
65    if !node
66        .plan
67        .equivalence_properties()
68        .ordering_satisfy_requirement(&sort_requirements)
69    {
70        add_sort_above(node, sort_requirements, fetch)
71    } else {
72        node
73    }
74}
75
76/// Checks whether the given operator is a [`SortExec`].
77pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
78    plan.as_any().is::<SortExec>()
79}
80
81/// Checks whether the given operator is a window;
82/// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`].
83pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
84    plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>()
85}
86
87/// Checks whether the given operator is a [`UnionExec`].
88pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
89    plan.as_any().is::<UnionExec>()
90}
91
92/// Checks whether the given operator is a [`SortPreservingMergeExec`].
93pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
94    plan.as_any().is::<SortPreservingMergeExec>()
95}
96
97/// Checks whether the given operator is a [`CoalescePartitionsExec`].
98pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
99    plan.as_any().is::<CoalescePartitionsExec>()
100}
101
102/// Checks whether the given operator is a [`RepartitionExec`].
103pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
104    plan.as_any().is::<RepartitionExec>()
105}
106
107/// Checks whether the given operator is a limit;
108/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
109pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
110    plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
111}