datafusion_physical_optimizer/
join_selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`JoinSelection`] rule tries to modify a given plan so that it can
19//! accommodate infinite sources and utilize statistical information (if there
20//! is any) to obtain more performant plans. To achieve the first goal, it
21//! tries to transform a non-runnable query (with the given infinite sources)
22//! into a runnable query by replacing pipeline-breaking join operations with
23//! pipeline-friendly ones. To achieve the second goal, it selects the proper
24//! `PartitionMode` and the build side using the available statistics for hash joins.
25
26use std::sync::Arc;
27
28use crate::PhysicalOptimizerRule;
29
30use datafusion_common::config::ConfigOptions;
31use datafusion_common::error::Result;
32use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
33use datafusion_common::{internal_err, JoinSide, JoinType};
34use datafusion_expr_common::sort_properties::SortProperties;
35use datafusion_physical_expr::expressions::Column;
36use datafusion_physical_expr::LexOrdering;
37use datafusion_physical_plan::execution_plan::EmissionType;
38use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
39use datafusion_physical_plan::joins::{
40    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
41    StreamJoinPartitionMode, SymmetricHashJoinExec,
42};
43use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
44
45/// The [`JoinSelection`] rule tries to modify a given plan so that it can
46/// accommodate infinite sources and optimize joins in the plan according to
47/// available statistical information, if there is any.
48#[derive(Default, Debug)]
49pub struct JoinSelection {}
50
51impl JoinSelection {
52    #[allow(missing_docs)]
53    pub fn new() -> Self {
54        Self {}
55    }
56}
57
58// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
59// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
60/// Checks statistics for join swap.
61pub(crate) fn should_swap_join_order(
62    left: &dyn ExecutionPlan,
63    right: &dyn ExecutionPlan,
64) -> Result<bool> {
65    // Get the left and right table's total bytes
66    // If both the left and right tables contain total_byte_size statistics,
67    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
68    let left_stats = left.statistics()?;
69    let right_stats = right.statistics()?;
70    // First compare `total_byte_size` of left and right side,
71    // if information in this field is insufficient fallback to the `num_rows`
72    match (
73        left_stats.total_byte_size.get_value(),
74        right_stats.total_byte_size.get_value(),
75    ) {
76        (Some(l), Some(r)) => Ok(l > r),
77        _ => match (
78            left_stats.num_rows.get_value(),
79            right_stats.num_rows.get_value(),
80        ) {
81            (Some(l), Some(r)) => Ok(l > r),
82            _ => Ok(false),
83        },
84    }
85}
86
87fn supports_collect_by_thresholds(
88    plan: &dyn ExecutionPlan,
89    threshold_byte_size: usize,
90    threshold_num_rows: usize,
91) -> bool {
92    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
93    // TODO check the logic in datasource::get_statistics_with_limit()
94    let Ok(stats) = plan.statistics() else {
95        return false;
96    };
97
98    if let Some(byte_size) = stats.total_byte_size.get_value() {
99        *byte_size != 0 && *byte_size < threshold_byte_size
100    } else if let Some(num_rows) = stats.num_rows.get_value() {
101        *num_rows != 0 && *num_rows < threshold_num_rows
102    } else {
103        false
104    }
105}
106
107/// Predicate that checks whether the given join type supports input swapping.
108#[deprecated(since = "45.0.0", note = "use JoinType::supports_swap instead")]
109#[allow(dead_code)]
110pub(crate) fn supports_swap(join_type: JoinType) -> bool {
111    join_type.supports_swap()
112}
113
114/// This function returns the new join type we get after swapping the given
115/// join's inputs.
116#[deprecated(since = "45.0.0", note = "use datafusion-functions-nested instead")]
117#[allow(dead_code)]
118pub(crate) fn swap_join_type(join_type: JoinType) -> JoinType {
119    join_type.swap()
120}
121
122/// This function swaps the inputs of the given join operator.
123/// This function is public so other downstream projects can use it
124/// to construct `HashJoinExec` with right side as the build side.
125#[deprecated(since = "45.0.0", note = "use HashJoinExec::swap_inputs instead")]
126pub fn swap_hash_join(
127    hash_join: &HashJoinExec,
128    partition_mode: PartitionMode,
129) -> Result<Arc<dyn ExecutionPlan>> {
130    hash_join.swap_inputs(partition_mode)
131}
132
133/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required
134#[deprecated(since = "45.0.0", note = "use NestedLoopJoinExec::swap_inputs")]
135#[allow(dead_code)]
136pub(crate) fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
137    join.swap_inputs()
138}
139
140/// Swaps join sides for filter column indices and produces new `JoinFilter` (if exists).
141#[deprecated(since = "45.0.0", note = "use filter.map(JoinFilter::swap) instead")]
142#[allow(dead_code)]
143fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
144    filter.map(JoinFilter::swap)
145}
146
147#[deprecated(since = "45.0.0", note = "use JoinFilter::swap instead")]
148#[allow(dead_code)]
149pub(crate) fn swap_filter(filter: &JoinFilter) -> JoinFilter {
150    filter.swap()
151}
152
153impl PhysicalOptimizerRule for JoinSelection {
154    fn optimize(
155        &self,
156        plan: Arc<dyn ExecutionPlan>,
157        config: &ConfigOptions,
158    ) -> Result<Arc<dyn ExecutionPlan>> {
159        // First, we make pipeline-fixing modifications to joins so as to accommodate
160        // unbounded inputs. Each pipeline-fixing subrule, which is a function
161        // of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
162        // argument storing state variables that indicate the unboundedness status
163        // of the current [`ExecutionPlan`] as we traverse the plan tree.
164        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
165            Box::new(hash_join_convert_symmetric_subrule),
166            Box::new(hash_join_swap_subrule),
167        ];
168        let new_plan = plan
169            .transform_up(|p| apply_subrules(p, &subrules, config))
170            .data()?;
171        // Next, we apply another subrule that tries to optimize joins using any
172        // statistics their inputs might have.
173        // - For a hash join with partition mode [`PartitionMode::Auto`], we will
174        //   make a cost-based decision to select which `PartitionMode` mode
175        //   (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
176        //   is not available, we will fall back to [`PartitionMode::Partitioned`].
177        // - We optimize/swap join sides so that the left (build) side of the join
178        //   is the small side. If the statistics information is not available, we
179        //   do not modify join sides.
180        // - We will also swap left and right sides for cross joins so that the left
181        //   side is the small side.
182        let config = &config.optimizer;
183        let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
184        let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
185        new_plan
186            .transform_up(|plan| {
187                statistical_join_selection_subrule(
188                    plan,
189                    collect_threshold_byte_size,
190                    collect_threshold_num_rows,
191                )
192            })
193            .data()
194    }
195
196    fn name(&self) -> &str {
197        "join_selection"
198    }
199
200    fn schema_check(&self) -> bool {
201        true
202    }
203}
204
205/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible.
206///
207/// This function will first consider the given join type and check whether the
208/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
209/// When the `ignore_threshold` is false, this function will also check left
210/// and right sizes in bytes or rows.
211pub(crate) fn try_collect_left(
212    hash_join: &HashJoinExec,
213    ignore_threshold: bool,
214    threshold_byte_size: usize,
215    threshold_num_rows: usize,
216) -> Result<Option<Arc<dyn ExecutionPlan>>> {
217    let left = hash_join.left();
218    let right = hash_join.right();
219
220    let left_can_collect = ignore_threshold
221        || supports_collect_by_thresholds(
222            &**left,
223            threshold_byte_size,
224            threshold_num_rows,
225        );
226    let right_can_collect = ignore_threshold
227        || supports_collect_by_thresholds(
228            &**right,
229            threshold_byte_size,
230            threshold_num_rows,
231        );
232
233    match (left_can_collect, right_can_collect) {
234        (true, true) => {
235            if hash_join.join_type().supports_swap()
236                && should_swap_join_order(&**left, &**right)?
237            {
238                Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
239            } else {
240                Ok(Some(Arc::new(HashJoinExec::try_new(
241                    Arc::clone(left),
242                    Arc::clone(right),
243                    hash_join.on().to_vec(),
244                    hash_join.filter().cloned(),
245                    hash_join.join_type(),
246                    hash_join.projection.clone(),
247                    PartitionMode::CollectLeft,
248                    hash_join.null_equals_null(),
249                )?)))
250            }
251        }
252        (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
253            Arc::clone(left),
254            Arc::clone(right),
255            hash_join.on().to_vec(),
256            hash_join.filter().cloned(),
257            hash_join.join_type(),
258            hash_join.projection.clone(),
259            PartitionMode::CollectLeft,
260            hash_join.null_equals_null(),
261        )?))),
262        (false, true) => {
263            if hash_join.join_type().supports_swap() {
264                hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
265            } else {
266                Ok(None)
267            }
268        }
269        (false, false) => Ok(None),
270    }
271}
272
273/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
274///
275/// Checks if the join order should be swapped based on the join type and input statistics.
276/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
277/// creates a standard partitioned hash join.
278pub(crate) fn partitioned_hash_join(
279    hash_join: &HashJoinExec,
280) -> Result<Arc<dyn ExecutionPlan>> {
281    let left = hash_join.left();
282    let right = hash_join.right();
283    if hash_join.join_type().supports_swap() && should_swap_join_order(&**left, &**right)?
284    {
285        hash_join.swap_inputs(PartitionMode::Partitioned)
286    } else {
287        Ok(Arc::new(HashJoinExec::try_new(
288            Arc::clone(left),
289            Arc::clone(right),
290            hash_join.on().to_vec(),
291            hash_join.filter().cloned(),
292            hash_join.join_type(),
293            hash_join.projection.clone(),
294            PartitionMode::Partitioned,
295            hash_join.null_equals_null(),
296        )?))
297    }
298}
299
300/// This subrule tries to modify a given plan so that it can
301/// optimize hash and cross joins in the plan according to available statistical information.
302fn statistical_join_selection_subrule(
303    plan: Arc<dyn ExecutionPlan>,
304    collect_threshold_byte_size: usize,
305    collect_threshold_num_rows: usize,
306) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
307    let transformed =
308        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
309            match hash_join.partition_mode() {
310                PartitionMode::Auto => try_collect_left(
311                    hash_join,
312                    false,
313                    collect_threshold_byte_size,
314                    collect_threshold_num_rows,
315                )?
316                .map_or_else(
317                    || partitioned_hash_join(hash_join).map(Some),
318                    |v| Ok(Some(v)),
319                )?,
320                PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
321                    .map_or_else(
322                        || partitioned_hash_join(hash_join).map(Some),
323                        |v| Ok(Some(v)),
324                    )?,
325                PartitionMode::Partitioned => {
326                    let left = hash_join.left();
327                    let right = hash_join.right();
328                    if hash_join.join_type().supports_swap()
329                        && should_swap_join_order(&**left, &**right)?
330                    {
331                        hash_join
332                            .swap_inputs(PartitionMode::Partitioned)
333                            .map(Some)?
334                    } else {
335                        None
336                    }
337                }
338            }
339        } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
340            let left = cross_join.left();
341            let right = cross_join.right();
342            if should_swap_join_order(&**left, &**right)? {
343                cross_join.swap_inputs().map(Some)?
344            } else {
345                None
346            }
347        } else if let Some(nl_join) = plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
348            let left = nl_join.left();
349            let right = nl_join.right();
350            if nl_join.join_type().supports_swap()
351                && should_swap_join_order(&**left, &**right)?
352            {
353                nl_join.swap_inputs().map(Some)?
354            } else {
355                None
356            }
357        } else {
358            None
359        };
360
361    Ok(if let Some(transformed) = transformed {
362        Transformed::yes(transformed)
363    } else {
364        Transformed::no(plan)
365    })
366}
367
368/// Pipeline-fixing join selection subrule.
369pub type PipelineFixerSubrule =
370    dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;
371
372/// Converts a hash join to a symmetric hash join if both its inputs are
373/// unbounded and incremental.
374///
375/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
376/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
377/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
378/// otherwise, it leaves the input unchanged.
379///
380/// # Arguments
381/// * `input` - The current state of the pipeline, including the execution plan.
382/// * `config_options` - Configuration options that might affect the transformation logic.
383///
384/// # Returns
385/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
386/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
387/// or `Some(Err(...))` if an error occurs during the transformation.
388fn hash_join_convert_symmetric_subrule(
389    input: Arc<dyn ExecutionPlan>,
390    config_options: &ConfigOptions,
391) -> Result<Arc<dyn ExecutionPlan>> {
392    // Check if the current plan node is a HashJoinExec.
393    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
394        let left_unbounded = hash_join.left.boundedness().is_unbounded();
395        let left_incremental = matches!(
396            hash_join.left.pipeline_behavior(),
397            EmissionType::Incremental | EmissionType::Both
398        );
399        let right_unbounded = hash_join.right.boundedness().is_unbounded();
400        let right_incremental = matches!(
401            hash_join.right.pipeline_behavior(),
402            EmissionType::Incremental | EmissionType::Both
403        );
404        // Process only if both left and right sides are unbounded and incrementally emit.
405        if left_unbounded && right_unbounded & left_incremental & right_incremental {
406            // Determine the partition mode based on configuration.
407            let mode = if config_options.optimizer.repartition_joins {
408                StreamJoinPartitionMode::Partitioned
409            } else {
410                StreamJoinPartitionMode::SinglePartition
411            };
412            // A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
413            // This function checks if the columns involved in the filter have any specific ordering requirements.
414            // If the child nodes (left or right side of the join) already have a defined order and the columns used in the
415            // filter predicate are ordered, this function captures that ordering requirement. The identified order is then
416            // used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
417            // However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
418            // the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
419            // ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
420            // based on the properties of the child nodes and the filter condition.
421            let determine_order = |side: JoinSide| -> Option<LexOrdering> {
422                hash_join
423                    .filter()
424                    .map(|filter| {
425                        filter.column_indices().iter().any(
426                            |ColumnIndex {
427                                 index,
428                                 side: column_side,
429                             }| {
430                                // Skip if column side does not match the join side.
431                                if *column_side != side {
432                                    return false;
433                                }
434                                // Retrieve equivalence properties and schema based on the side.
435                                let (equivalence, schema) = match side {
436                                    JoinSide::Left => (
437                                        hash_join.left().equivalence_properties(),
438                                        hash_join.left().schema(),
439                                    ),
440                                    JoinSide::Right => (
441                                        hash_join.right().equivalence_properties(),
442                                        hash_join.right().schema(),
443                                    ),
444                                    JoinSide::None => return false,
445                                };
446
447                                let name = schema.field(*index).name();
448                                let col = Arc::new(Column::new(name, *index)) as _;
449                                // Check if the column is ordered.
450                                equivalence.get_expr_properties(col).sort_properties
451                                    != SortProperties::Unordered
452                            },
453                        )
454                    })
455                    .unwrap_or(false)
456                    .then(|| {
457                        match side {
458                            JoinSide::Left => hash_join.left().output_ordering(),
459                            JoinSide::Right => hash_join.right().output_ordering(),
460                            JoinSide::None => unreachable!(),
461                        }
462                        .map(|p| LexOrdering::new(p.to_vec()))
463                    })
464                    .flatten()
465            };
466
467            // Determine the sort order for both left and right sides.
468            let left_order = determine_order(JoinSide::Left);
469            let right_order = determine_order(JoinSide::Right);
470
471            return SymmetricHashJoinExec::try_new(
472                Arc::clone(hash_join.left()),
473                Arc::clone(hash_join.right()),
474                hash_join.on().to_vec(),
475                hash_join.filter().cloned(),
476                hash_join.join_type(),
477                hash_join.null_equals_null(),
478                left_order,
479                right_order,
480                mode,
481            )
482            .map(|exec| Arc::new(exec) as _);
483        }
484    }
485    Ok(input)
486}
487
488/// This subrule will swap build/probe sides of a hash join depending on whether
489/// one of its inputs may produce an infinite stream of records. The rule ensures
490/// that the left (build) side of the hash join always operates on an input stream
491/// that will produce a finite set of records. If the left side can not be chosen
492/// to be "finite", the join sides stay the same as the original query.
493/// ```text
494/// For example, this rule makes the following transformation:
495///
496///
497///
498///           +--------------+              +--------------+
499///           |              |  unbounded   |              |
500///    Left   | Infinite     |    true      | Hash         |\true
501///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
502///           |              |              |              |  \  |              |       |              |
503///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |
504///                                                            - |              |       |              |
505///           +--------------+              +--------------+  /  +--------------+       +--------------+
506///           |              |  unbounded   |              | /
507///    Right  | Finite       |    false     | Hash         |/false
508///           | Data Source  |--------------| Repartition  |
509///           |              |              |              |
510///           +--------------+              +--------------+
511///
512///
513///
514///           +--------------+              +--------------+
515///           |              |  unbounded   |              |
516///    Left   | Finite       |    false     | Hash         |\false
517///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
518///           |              |              |              |  \  |              | true  |              | true
519///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |-----
520///                                                            - |              |       |              |
521///           +--------------+              +--------------+  /  +--------------+       +--------------+
522///           |              |  unbounded   |              | /
523///    Right  | Infinite     |    true      | Hash         |/true
524///           | Data Source  |--------------| Repartition  |
525///           |              |              |              |
526///           +--------------+              +--------------+
527///
528/// ```
529pub fn hash_join_swap_subrule(
530    mut input: Arc<dyn ExecutionPlan>,
531    _config_options: &ConfigOptions,
532) -> Result<Arc<dyn ExecutionPlan>> {
533    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
534        if hash_join.left.boundedness().is_unbounded()
535            && !hash_join.right.boundedness().is_unbounded()
536            && matches!(
537                *hash_join.join_type(),
538                JoinType::Inner
539                    | JoinType::Left
540                    | JoinType::LeftSemi
541                    | JoinType::LeftAnti
542            )
543        {
544            input = swap_join_according_to_unboundedness(hash_join)?;
545        }
546    }
547    Ok(input)
548}
549
550/// This function swaps sides of a hash join to make it runnable even if one of
551/// its inputs are infinite. Note that this is not always possible; i.e.
552/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
553/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
554/// we swap join sides. Therefore, we do not consider them here.
555/// This function is crate public as it is useful for downstream projects
556/// to implement, or experiment with, their own join selection rules.
557pub(crate) fn swap_join_according_to_unboundedness(
558    hash_join: &HashJoinExec,
559) -> Result<Arc<dyn ExecutionPlan>> {
560    let partition_mode = hash_join.partition_mode();
561    let join_type = hash_join.join_type();
562    match (*partition_mode, *join_type) {
563        (
564            _,
565            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
566        ) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
567        (PartitionMode::Partitioned, _) => {
568            hash_join.swap_inputs(PartitionMode::Partitioned)
569        }
570        (PartitionMode::CollectLeft, _) => {
571            hash_join.swap_inputs(PartitionMode::CollectLeft)
572        }
573        (PartitionMode::Auto, _) => {
574            internal_err!("Auto is not acceptable for unbounded input here.")
575        }
576    }
577}
578
579/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
580/// auxiliary boundedness information, is in the `PipelineStatePropagator` object.
581fn apply_subrules(
582    mut input: Arc<dyn ExecutionPlan>,
583    subrules: &Vec<Box<PipelineFixerSubrule>>,
584    config_options: &ConfigOptions,
585) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
586    for subrule in subrules {
587        input = subrule(input, config_options)?;
588    }
589    Ok(Transformed::yes(input))
590}
591
592// See tests in datafusion/core/tests/physical_optimizer