Skip to main content

datafusion_physical_optimizer/
join_selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`JoinSelection`] rule tries to modify a given plan so that it can
19//! accommodate infinite sources and utilize statistical information (if there
20//! is any) to obtain more performant plans. To achieve the first goal, it
21//! tries to transform a non-runnable query (with the given infinite sources)
22//! into a runnable query by replacing pipeline-breaking join operations with
23//! pipeline-friendly ones. To achieve the second goal, it selects the proper
24//! `PartitionMode` and the build side using the available statistics for hash joins.
25
26use crate::PhysicalOptimizerRule;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::error::Result;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{JoinSide, JoinType, internal_err};
31use datafusion_expr_common::sort_properties::SortProperties;
32use datafusion_physical_expr::LexOrdering;
33use datafusion_physical_expr::expressions::Column;
34use datafusion_physical_plan::execution_plan::EmissionType;
35use datafusion_physical_plan::joins::utils::ColumnIndex;
36use datafusion_physical_plan::joins::{
37    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
38    StreamJoinPartitionMode, SymmetricHashJoinExec,
39};
40use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
41use std::sync::Arc;
42
43/// The [`JoinSelection`] rule tries to modify a given plan so that it can
44/// accommodate infinite sources and optimize joins in the plan according to
45/// available statistical information, if there is any.
46#[derive(Default, Debug)]
47pub struct JoinSelection {}
48
49impl JoinSelection {
50    #[expect(missing_docs)]
51    pub fn new() -> Self {
52        Self {}
53    }
54}
55
56// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
57// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
58/// Checks statistics for join swap.
59pub(crate) fn should_swap_join_order(
60    left: &dyn ExecutionPlan,
61    right: &dyn ExecutionPlan,
62) -> Result<bool> {
63    // Get the left and right table's total bytes
64    // If both the left and right tables contain total_byte_size statistics,
65    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
66    let left_stats = left.partition_statistics(None)?;
67    let right_stats = right.partition_statistics(None)?;
68    // First compare `total_byte_size` of left and right side,
69    // if information in this field is insufficient fallback to the `num_rows`
70    match (
71        left_stats.total_byte_size.get_value(),
72        right_stats.total_byte_size.get_value(),
73    ) {
74        (Some(l), Some(r)) => Ok(l > r),
75        _ => match (
76            left_stats.num_rows.get_value(),
77            right_stats.num_rows.get_value(),
78        ) {
79            (Some(l), Some(r)) => Ok(l > r),
80            _ => Ok(false),
81        },
82    }
83}
84
85fn supports_collect_by_thresholds(
86    plan: &dyn ExecutionPlan,
87    threshold_byte_size: usize,
88    threshold_num_rows: usize,
89) -> bool {
90    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
91    // TODO check the logic in datasource::get_statistics_with_limit()
92    let Ok(stats) = plan.partition_statistics(None) else {
93        return false;
94    };
95
96    if let Some(byte_size) = stats.total_byte_size.get_value() {
97        *byte_size != 0 && *byte_size < threshold_byte_size
98    } else if let Some(num_rows) = stats.num_rows.get_value() {
99        *num_rows != 0 && *num_rows < threshold_num_rows
100    } else {
101        false
102    }
103}
104
105impl PhysicalOptimizerRule for JoinSelection {
106    fn optimize(
107        &self,
108        plan: Arc<dyn ExecutionPlan>,
109        config: &ConfigOptions,
110    ) -> Result<Arc<dyn ExecutionPlan>> {
111        // First, we make pipeline-fixing modifications to joins so as to accommodate
112        // unbounded inputs. Each pipeline-fixing subrule, which is a function
113        // of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
114        // argument storing state variables that indicate the unboundedness status
115        // of the current [`ExecutionPlan`] as we traverse the plan tree.
116        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
117            Box::new(hash_join_convert_symmetric_subrule),
118            Box::new(hash_join_swap_subrule),
119        ];
120        let new_plan = plan
121            .transform_up(|p| apply_subrules(p, &subrules, config))
122            .data()?;
123        // Next, we apply another subrule that tries to optimize joins using any
124        // statistics their inputs might have.
125        // - For a hash join with partition mode [`PartitionMode::Auto`], we will
126        //   make a cost-based decision to select which `PartitionMode` mode
127        //   (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
128        //   is not available, we will fall back to [`PartitionMode::Partitioned`].
129        // - We optimize/swap join sides so that the left (build) side of the join
130        //   is the small side. If the statistics information is not available, we
131        //   do not modify join sides.
132        // - We will also swap left and right sides for cross joins so that the left
133        //   side is the small side.
134        let config = &config.optimizer;
135        let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
136        let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
137        new_plan
138            .transform_up(|plan| {
139                statistical_join_selection_subrule(
140                    plan,
141                    collect_threshold_byte_size,
142                    collect_threshold_num_rows,
143                )
144            })
145            .data()
146    }
147
148    fn name(&self) -> &str {
149        "join_selection"
150    }
151
152    fn schema_check(&self) -> bool {
153        true
154    }
155}
156
157/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible.
158///
159/// This function will first consider the given join type and check whether the
160/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
161/// When the `ignore_threshold` is false, this function will also check left
162/// and right sizes in bytes or rows.
163pub(crate) fn try_collect_left(
164    hash_join: &HashJoinExec,
165    ignore_threshold: bool,
166    threshold_byte_size: usize,
167    threshold_num_rows: usize,
168) -> Result<Option<Arc<dyn ExecutionPlan>>> {
169    let left = hash_join.left();
170    let right = hash_join.right();
171
172    let left_can_collect = ignore_threshold
173        || supports_collect_by_thresholds(
174            &**left,
175            threshold_byte_size,
176            threshold_num_rows,
177        );
178    let right_can_collect = ignore_threshold
179        || supports_collect_by_thresholds(
180            &**right,
181            threshold_byte_size,
182            threshold_num_rows,
183        );
184
185    match (left_can_collect, right_can_collect) {
186        (true, true) => {
187            // Don't swap null-aware anti joins as they have specific side requirements
188            if hash_join.join_type().supports_swap()
189                && !hash_join.null_aware
190                && should_swap_join_order(&**left, &**right)?
191            {
192                Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
193            } else {
194                Ok(Some(Arc::new(
195                    hash_join
196                        .builder()
197                        .with_partition_mode(PartitionMode::CollectLeft)
198                        .build()?,
199                )))
200            }
201        }
202        (true, false) => Ok(Some(Arc::new(
203            hash_join
204                .builder()
205                .with_partition_mode(PartitionMode::CollectLeft)
206                .build()?,
207        ))),
208        (false, true) => {
209            // Don't swap null-aware anti joins as they have specific side requirements
210            if hash_join.join_type().supports_swap() && !hash_join.null_aware {
211                hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
212            } else {
213                Ok(None)
214            }
215        }
216        (false, false) => Ok(None),
217    }
218}
219
220/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
221///
222/// Checks if the join order should be swapped based on the join type and input statistics.
223/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
224/// creates a standard partitioned hash join.
225pub(crate) fn partitioned_hash_join(
226    hash_join: &HashJoinExec,
227) -> Result<Arc<dyn ExecutionPlan>> {
228    let left = hash_join.left();
229    let right = hash_join.right();
230    // Don't swap null-aware anti joins as they have specific side requirements
231    if hash_join.join_type().supports_swap()
232        && !hash_join.null_aware
233        && should_swap_join_order(&**left, &**right)?
234    {
235        hash_join.swap_inputs(PartitionMode::Partitioned)
236    } else {
237        // Null-aware anti joins must use CollectLeft mode because they track probe-side state
238        // (probe_side_non_empty, probe_side_has_null) per-partition, but need global knowledge
239        // for correct null handling. With partitioning, a partition might not see probe rows
240        // even if the probe side is globally non-empty, leading to incorrect NULL row handling.
241        let partition_mode = if hash_join.null_aware {
242            PartitionMode::CollectLeft
243        } else {
244            PartitionMode::Partitioned
245        };
246
247        Ok(Arc::new(
248            hash_join
249                .builder()
250                .with_partition_mode(partition_mode)
251                .build()?,
252        ))
253    }
254}
255
256/// This subrule tries to modify a given plan so that it can
257/// optimize hash and cross joins in the plan according to available statistical information.
258fn statistical_join_selection_subrule(
259    plan: Arc<dyn ExecutionPlan>,
260    collect_threshold_byte_size: usize,
261    collect_threshold_num_rows: usize,
262) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
263    let transformed =
264        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
265            match hash_join.partition_mode() {
266                PartitionMode::Auto => try_collect_left(
267                    hash_join,
268                    false,
269                    collect_threshold_byte_size,
270                    collect_threshold_num_rows,
271                )?
272                .map_or_else(
273                    || partitioned_hash_join(hash_join).map(Some),
274                    |v| Ok(Some(v)),
275                )?,
276                PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
277                    .map_or_else(
278                        || partitioned_hash_join(hash_join).map(Some),
279                        |v| Ok(Some(v)),
280                    )?,
281                PartitionMode::Partitioned => {
282                    let left = hash_join.left();
283                    let right = hash_join.right();
284                    // Don't swap null-aware anti joins as they have specific side requirements
285                    if hash_join.join_type().supports_swap()
286                        && !hash_join.null_aware
287                        && should_swap_join_order(&**left, &**right)?
288                    {
289                        hash_join
290                            .swap_inputs(PartitionMode::Partitioned)
291                            .map(Some)?
292                    } else {
293                        None
294                    }
295                }
296            }
297        } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
298            let left = cross_join.left();
299            let right = cross_join.right();
300            if should_swap_join_order(&**left, &**right)? {
301                cross_join.swap_inputs().map(Some)?
302            } else {
303                None
304            }
305        } else if let Some(nl_join) = plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
306            let left = nl_join.left();
307            let right = nl_join.right();
308            if nl_join.join_type().supports_swap()
309                && should_swap_join_order(&**left, &**right)?
310            {
311                nl_join.swap_inputs().map(Some)?
312            } else {
313                None
314            }
315        } else {
316            None
317        };
318
319    Ok(if let Some(transformed) = transformed {
320        Transformed::yes(transformed)
321    } else {
322        Transformed::no(plan)
323    })
324}
325
326/// Pipeline-fixing join selection subrule.
327pub type PipelineFixerSubrule =
328    dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;
329
330/// Converts a hash join to a symmetric hash join if both its inputs are
331/// unbounded and incremental.
332///
333/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
334/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
335/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
336/// otherwise, it leaves the input unchanged.
337///
338/// # Arguments
339/// * `input` - The current state of the pipeline, including the execution plan.
340/// * `config_options` - Configuration options that might affect the transformation logic.
341///
342/// # Returns
343/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
344/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
345/// or `Some(Err(...))` if an error occurs during the transformation.
346fn hash_join_convert_symmetric_subrule(
347    input: Arc<dyn ExecutionPlan>,
348    config_options: &ConfigOptions,
349) -> Result<Arc<dyn ExecutionPlan>> {
350    // Check if the current plan node is a HashJoinExec.
351    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
352        let left_unbounded = hash_join.left.boundedness().is_unbounded();
353        let left_incremental = matches!(
354            hash_join.left.pipeline_behavior(),
355            EmissionType::Incremental | EmissionType::Both
356        );
357        let right_unbounded = hash_join.right.boundedness().is_unbounded();
358        let right_incremental = matches!(
359            hash_join.right.pipeline_behavior(),
360            EmissionType::Incremental | EmissionType::Both
361        );
362        // Process only if both left and right sides are unbounded and incrementally emit.
363        if left_unbounded && right_unbounded & left_incremental & right_incremental {
364            // Determine the partition mode based on configuration.
365            let mode = if config_options.optimizer.repartition_joins {
366                StreamJoinPartitionMode::Partitioned
367            } else {
368                StreamJoinPartitionMode::SinglePartition
369            };
370            // A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
371            // This function checks if the columns involved in the filter have any specific ordering requirements.
372            // If the child nodes (left or right side of the join) already have a defined order and the columns used in the
373            // filter predicate are ordered, this function captures that ordering requirement. The identified order is then
374            // used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
375            // However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
376            // the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
377            // ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
378            // based on the properties of the child nodes and the filter condition.
379            let determine_order = |side: JoinSide| -> Option<LexOrdering> {
380                hash_join
381                    .filter()
382                    .map(|filter| {
383                        filter.column_indices().iter().any(
384                            |ColumnIndex {
385                                 index,
386                                 side: column_side,
387                             }| {
388                                // Skip if column side does not match the join side.
389                                if *column_side != side {
390                                    return false;
391                                }
392                                // Retrieve equivalence properties and schema based on the side.
393                                let (equivalence, schema) = match side {
394                                    JoinSide::Left => (
395                                        hash_join.left().equivalence_properties(),
396                                        hash_join.left().schema(),
397                                    ),
398                                    JoinSide::Right => (
399                                        hash_join.right().equivalence_properties(),
400                                        hash_join.right().schema(),
401                                    ),
402                                    JoinSide::None => return false,
403                                };
404
405                                let name = schema.field(*index).name();
406                                let col = Arc::new(Column::new(name, *index)) as _;
407                                // Check if the column is ordered.
408                                equivalence.get_expr_properties(col).sort_properties
409                                    != SortProperties::Unordered
410                            },
411                        )
412                    })
413                    .unwrap_or(false)
414                    .then(|| {
415                        match side {
416                            JoinSide::Left => hash_join.left().output_ordering(),
417                            JoinSide::Right => hash_join.right().output_ordering(),
418                            JoinSide::None => unreachable!(),
419                        }
420                        .cloned()
421                    })
422                    .flatten()
423            };
424
425            // Determine the sort order for both left and right sides.
426            let left_order = determine_order(JoinSide::Left);
427            let right_order = determine_order(JoinSide::Right);
428
429            return SymmetricHashJoinExec::try_new(
430                Arc::clone(hash_join.left()),
431                Arc::clone(hash_join.right()),
432                hash_join.on().to_vec(),
433                hash_join.filter().cloned(),
434                hash_join.join_type(),
435                hash_join.null_equality(),
436                left_order,
437                right_order,
438                mode,
439            )
440            .map(|exec| Arc::new(exec) as _);
441        }
442    }
443    Ok(input)
444}
445
446/// This subrule will swap build/probe sides of a hash join depending on whether
447/// one of its inputs may produce an infinite stream of records. The rule ensures
448/// that the left (build) side of the hash join always operates on an input stream
449/// that will produce a finite set of records. If the left side can not be chosen
450/// to be "finite", the join sides stay the same as the original query.
451/// ```text
452/// For example, this rule makes the following transformation:
453///
454///
455///
456///           +--------------+              +--------------+
457///           |              |  unbounded   |              |
458///    Left   | Infinite     |    true      | Hash         |\true
459///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
460///           |              |              |              |  \  |              |       |              |
461///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |
462///                                                            - |              |       |              |
463///           +--------------+              +--------------+  /  +--------------+       +--------------+
464///           |              |  unbounded   |              | /
465///    Right  | Finite       |    false     | Hash         |/false
466///           | Data Source  |--------------| Repartition  |
467///           |              |              |              |
468///           +--------------+              +--------------+
469///
470///
471///
472///           +--------------+              +--------------+
473///           |              |  unbounded   |              |
474///    Left   | Finite       |    false     | Hash         |\false
475///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
476///           |              |              |              |  \  |              | true  |              | true
477///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |-----
478///                                                            - |              |       |              |
479///           +--------------+              +--------------+  /  +--------------+       +--------------+
480///           |              |  unbounded   |              | /
481///    Right  | Infinite     |    true      | Hash         |/true
482///           | Data Source  |--------------| Repartition  |
483///           |              |              |              |
484///           +--------------+              +--------------+
485/// ```
486pub fn hash_join_swap_subrule(
487    mut input: Arc<dyn ExecutionPlan>,
488    _config_options: &ConfigOptions,
489) -> Result<Arc<dyn ExecutionPlan>> {
490    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>()
491        && hash_join.left.boundedness().is_unbounded()
492        && !hash_join.right.boundedness().is_unbounded()
493        && !hash_join.null_aware // Don't swap null-aware anti joins
494        && matches!(
495            *hash_join.join_type(),
496            JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti
497        )
498    {
499        input = swap_join_according_to_unboundedness(hash_join)?;
500    }
501    Ok(input)
502}
503
504/// This function swaps sides of a hash join to make it runnable even if one of
505/// its inputs are infinite. Note that this is not always possible; i.e.
506/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
507/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
508/// we swap join sides. Therefore, we do not consider them here.
509/// This function is crate public as it is useful for downstream projects
510/// to implement, or experiment with, their own join selection rules.
511pub(crate) fn swap_join_according_to_unboundedness(
512    hash_join: &HashJoinExec,
513) -> Result<Arc<dyn ExecutionPlan>> {
514    let partition_mode = hash_join.partition_mode();
515    let join_type = hash_join.join_type();
516    match (*partition_mode, *join_type) {
517        (
518            _,
519            JoinType::Right
520            | JoinType::RightSemi
521            | JoinType::RightAnti
522            | JoinType::RightMark
523            | JoinType::Full,
524        ) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
525        (PartitionMode::Partitioned, _) => {
526            hash_join.swap_inputs(PartitionMode::Partitioned)
527        }
528        (PartitionMode::CollectLeft, _) => {
529            hash_join.swap_inputs(PartitionMode::CollectLeft)
530        }
531        (PartitionMode::Auto, _) => {
532            // Use `PartitionMode::Partitioned` as default if `Auto` is selected.
533            hash_join.swap_inputs(PartitionMode::Partitioned)
534        }
535    }
536}
537
538/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
539/// auxiliary boundedness information, is in the `PipelineStatePropagator` object.
540fn apply_subrules(
541    mut input: Arc<dyn ExecutionPlan>,
542    subrules: &Vec<Box<PipelineFixerSubrule>>,
543    config_options: &ConfigOptions,
544) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
545    for subrule in subrules {
546        input = subrule(input, config_options)?;
547    }
548    Ok(Transformed::yes(input))
549}
550
551// See tests in datafusion/core/tests/physical_optimizer