Skip to main content

datafusion_physical_optimizer/
join_selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`JoinSelection`] rule tries to modify a given plan so that it can
19//! accommodate infinite sources and utilize statistical information (if there
20//! is any) to obtain more performant plans. To achieve the first goal, it
21//! tries to transform a non-runnable query (with the given infinite sources)
22//! into a runnable query by replacing pipeline-breaking join operations with
23//! pipeline-friendly ones. To achieve the second goal, it selects the proper
24//! `PartitionMode` and the build side using the available statistics for hash joins.
25
26use crate::PhysicalOptimizerRule;
27use crate::optimizer::{ConfigOnlyContext, PhysicalOptimizerContext};
28use datafusion_common::Statistics;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::error::Result;
31use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32use datafusion_common::{JoinSide, JoinType, internal_err};
33use datafusion_expr_common::sort_properties::SortProperties;
34use datafusion_physical_expr::LexOrdering;
35use datafusion_physical_expr::expressions::Column;
36use datafusion_physical_plan::execution_plan::EmissionType;
37use datafusion_physical_plan::joins::utils::ColumnIndex;
38use datafusion_physical_plan::joins::{
39    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
40    StreamJoinPartitionMode, SymmetricHashJoinExec,
41};
42use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
43use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
44use std::sync::Arc;
45
46/// The [`JoinSelection`] rule tries to modify a given plan so that it can
47/// accommodate infinite sources and optimize joins in the plan according to
48/// available statistical information, if there is any.
49#[derive(Default, Debug)]
50pub struct JoinSelection {}
51
52impl JoinSelection {
53    #[expect(missing_docs)]
54    pub fn new() -> Self {
55        Self {}
56    }
57}
58
59/// Get statistics for a plan node, using the registry if available.
60fn get_stats(
61    plan: &dyn ExecutionPlan,
62    registry: Option<&StatisticsRegistry>,
63) -> Result<Arc<Statistics>> {
64    if let Some(reg) = registry {
65        reg.compute(plan)
66            .map(|s| Arc::<Statistics>::clone(s.base_arc()))
67    } else {
68        plan.partition_statistics(None)
69    }
70}
71
72// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
73// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
74/// Checks whether join inputs should be swapped using available statistics.
75///
76/// It follows these steps:
77/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78///    (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
80///    the left (build) side.
81/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82/// 4. Do not reorder the join if neither statistic is available, or if
83///    `datafusion.optimizer.join_reordering` is disabled.
84///
85/// Used configurations inside arg `config`
86/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
87pub(crate) fn should_swap_join_order(
88    left: &dyn ExecutionPlan,
89    right: &dyn ExecutionPlan,
90    config: &ConfigOptions,
91    registry: Option<&StatisticsRegistry>,
92) -> Result<bool> {
93    if !config.optimizer.join_reordering {
94        return Ok(false);
95    }
96
97    let left_stats = get_stats(left, registry)?;
98    let right_stats = get_stats(right, registry)?;
99
100    // First compare total_byte_size, then fall back to num_rows if byte
101    // sizes are unavailable.
102    match (
103        left_stats.total_byte_size.get_value(),
104        right_stats.total_byte_size.get_value(),
105    ) {
106        (Some(l), Some(r)) => Ok(l > r),
107        _ => match (
108            left_stats.num_rows.get_value(),
109            right_stats.num_rows.get_value(),
110        ) {
111            (Some(l), Some(r)) => Ok(l > r),
112            _ => Ok(false),
113        },
114    }
115}
116
117fn supports_collect_by_thresholds(
118    plan: &dyn ExecutionPlan,
119    threshold_byte_size: usize,
120    threshold_num_rows: usize,
121    registry: Option<&StatisticsRegistry>,
122) -> bool {
123    let Ok(stats) = get_stats(plan, registry) else {
124        return false;
125    };
126
127    // Stats use `Precision<T>` to represent stats, where `Absent` means unknown.
128    // `Exact(0)` and `Inexact(0)` are both valid stats, and we should not treat
129    // them as unknown, `Absent` will return None (this is in regards to why
130    // `!=0` is not checked)
131    if let Some(byte_size) = stats.total_byte_size.get_value() {
132        *byte_size < threshold_byte_size
133    } else if let Some(num_rows) = stats.num_rows.get_value() {
134        *num_rows < threshold_num_rows
135    } else {
136        false
137    }
138}
139
140impl PhysicalOptimizerRule for JoinSelection {
141    fn optimize(
142        &self,
143        plan: Arc<dyn ExecutionPlan>,
144        config: &ConfigOptions,
145    ) -> Result<Arc<dyn ExecutionPlan>> {
146        self.optimize_with_context(plan, &ConfigOnlyContext::new(config))
147    }
148
149    fn optimize_with_context(
150        &self,
151        plan: Arc<dyn ExecutionPlan>,
152        context: &dyn PhysicalOptimizerContext,
153    ) -> Result<Arc<dyn ExecutionPlan>> {
154        let config = context.config_options();
155        let mut default_registry = None;
156        let registry: Option<&StatisticsRegistry> =
157            if config.optimizer.use_statistics_registry {
158                Some(context.statistics_registry().unwrap_or_else(|| {
159                    default_registry
160                        .insert(StatisticsRegistry::default_with_builtin_providers())
161                }))
162            } else {
163                None
164            };
165        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
166            Box::new(hash_join_convert_symmetric_subrule),
167            Box::new(hash_join_swap_subrule),
168        ];
169        let new_plan = plan
170            .transform_up(|p| apply_subrules(p, &subrules, config))
171            .data()?;
172        new_plan
173            .transform_up(|plan| {
174                statistical_join_selection_subrule(plan, config, registry)
175            })
176            .data()
177    }
178
179    fn name(&self) -> &str {
180        "join_selection"
181    }
182
183    fn schema_check(&self) -> bool {
184        true
185    }
186}
187
188/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible.
189///
190/// This function will first consider the given join type and check whether the
191/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
192/// When the `ignore_threshold` is false, this function will also check left
193/// and right sizes in bytes or rows.
194///
195/// Used configurations inside arg `config`
196/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
197/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
198/// - `config.optimizer.join_reordering`: allows or forbids input swapping
199pub(crate) fn try_collect_left(
200    hash_join: &HashJoinExec,
201    ignore_threshold: bool,
202    config: &ConfigOptions,
203    registry: Option<&StatisticsRegistry>,
204) -> Result<Option<Arc<dyn ExecutionPlan>>> {
205    let left = hash_join.left();
206    let right = hash_join.right();
207    let optimizer_config = &config.optimizer;
208
209    let left_can_collect = ignore_threshold
210        || supports_collect_by_thresholds(
211            &**left,
212            optimizer_config.hash_join_single_partition_threshold,
213            optimizer_config.hash_join_single_partition_threshold_rows,
214            registry,
215        );
216    let right_can_collect = ignore_threshold
217        || supports_collect_by_thresholds(
218            &**right,
219            optimizer_config.hash_join_single_partition_threshold,
220            optimizer_config.hash_join_single_partition_threshold_rows,
221            registry,
222        );
223
224    match (left_can_collect, right_can_collect) {
225        (true, true) => {
226            // Don't swap null-aware anti joins as they have specific side requirements
227            if hash_join.join_type().supports_swap()
228                && !hash_join.null_aware
229                && should_swap_join_order(&**left, &**right, config, registry)?
230            {
231                Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
232            } else {
233                Ok(Some(Arc::new(
234                    hash_join
235                        .builder()
236                        .with_partition_mode(PartitionMode::CollectLeft)
237                        .build()?,
238                )))
239            }
240        }
241        (true, false) => Ok(Some(Arc::new(
242            hash_join
243                .builder()
244                .with_partition_mode(PartitionMode::CollectLeft)
245                .build()?,
246        ))),
247        (false, true) => {
248            // Don't swap null-aware anti joins as they have specific side requirements
249            if optimizer_config.join_reordering
250                && hash_join.join_type().supports_swap()
251                && !hash_join.null_aware
252            {
253                hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
254            } else {
255                Ok(None)
256            }
257        }
258        (false, false) => Ok(None),
259    }
260}
261
262/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
263///
264/// Checks if the join order should be swapped based on the join type and input statistics.
265/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
266/// creates a standard partitioned hash join.
267///
268/// Used configurations inside arg `config`
269/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
270pub(crate) fn partitioned_hash_join(
271    hash_join: &HashJoinExec,
272    config: &ConfigOptions,
273    registry: Option<&StatisticsRegistry>,
274) -> Result<Arc<dyn ExecutionPlan>> {
275    let left = hash_join.left();
276    let right = hash_join.right();
277    // Don't swap null-aware anti joins as they have specific side requirements
278    if hash_join.join_type().supports_swap()
279        && !hash_join.null_aware
280        && should_swap_join_order(&**left, &**right, config, registry)?
281    {
282        hash_join.swap_inputs(PartitionMode::Partitioned)
283    } else {
284        // Null-aware anti joins must use CollectLeft mode because they track probe-side state
285        // (probe_side_non_empty, probe_side_has_null) per-partition, but need global knowledge
286        // for correct null handling. With partitioning, a partition might not see probe rows
287        // even if the probe side is globally non-empty, leading to incorrect NULL row handling.
288        let partition_mode = if hash_join.null_aware {
289            PartitionMode::CollectLeft
290        } else {
291            PartitionMode::Partitioned
292        };
293
294        Ok(Arc::new(
295            hash_join
296                .builder()
297                .with_partition_mode(partition_mode)
298                .build()?,
299        ))
300    }
301}
302
303/// This subrule tries to modify a given plan so that it can
304/// optimize hash and cross joins in the plan according to available statistical
305/// information.
306///
307/// Used configurations inside arg `config`
308/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
309/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
310/// - `config.optimizer.join_reordering`: allows or forbids input swapping
311fn statistical_join_selection_subrule(
312    plan: Arc<dyn ExecutionPlan>,
313    config: &ConfigOptions,
314    registry: Option<&StatisticsRegistry>,
315) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
316    let transformed = if let Some(hash_join) = plan.downcast_ref::<HashJoinExec>() {
317        match hash_join.partition_mode() {
318            PartitionMode::Auto => try_collect_left(hash_join, false, config, registry)?
319                .map_or_else(
320                    || partitioned_hash_join(hash_join, config, registry).map(Some),
321                    |v| Ok(Some(v)),
322                )?,
323            PartitionMode::CollectLeft => {
324                try_collect_left(hash_join, true, config, registry)?.map_or_else(
325                    || partitioned_hash_join(hash_join, config, registry).map(Some),
326                    |v| Ok(Some(v)),
327                )?
328            }
329            PartitionMode::Partitioned => {
330                let left = hash_join.left();
331                let right = hash_join.right();
332                // Don't swap null-aware anti joins as they have specific side requirements
333                if hash_join.join_type().supports_swap()
334                    && !hash_join.null_aware
335                    && should_swap_join_order(&**left, &**right, config, registry)?
336                {
337                    hash_join
338                        .swap_inputs(PartitionMode::Partitioned)
339                        .map(Some)?
340                } else {
341                    None
342                }
343            }
344        }
345    } else if let Some(cross_join) = plan.downcast_ref::<CrossJoinExec>() {
346        let left = cross_join.left();
347        let right = cross_join.right();
348        if should_swap_join_order(&**left, &**right, config, registry)? {
349            cross_join.swap_inputs().map(Some)?
350        } else {
351            None
352        }
353    } else if let Some(nl_join) = plan.downcast_ref::<NestedLoopJoinExec>() {
354        let left = nl_join.left();
355        let right = nl_join.right();
356        if nl_join.join_type().supports_swap()
357            && should_swap_join_order(&**left, &**right, config, registry)?
358        {
359            nl_join.swap_inputs().map(Some)?
360        } else {
361            None
362        }
363    } else {
364        None
365    };
366
367    Ok(if let Some(transformed) = transformed {
368        Transformed::yes(transformed)
369    } else {
370        Transformed::no(plan)
371    })
372}
373
374/// Pipeline-fixing join selection subrule.
375pub type PipelineFixerSubrule =
376    dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;
377
378/// Converts a hash join to a symmetric hash join if both its inputs are
379/// unbounded and incremental.
380///
381/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
382/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
383/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
384/// otherwise, it leaves the input unchanged.
385///
386/// # Arguments
387/// * `input` - The current state of the pipeline, including the execution plan.
388/// * `config_options` - Configuration options that might affect the transformation logic.
389///
390/// # Returns
391/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
392/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
393/// or `Some(Err(...))` if an error occurs during the transformation.
394fn hash_join_convert_symmetric_subrule(
395    input: Arc<dyn ExecutionPlan>,
396    config_options: &ConfigOptions,
397) -> Result<Arc<dyn ExecutionPlan>> {
398    // Check if the current plan node is a HashJoinExec.
399    if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() {
400        let left_unbounded = hash_join.left.boundedness().is_unbounded();
401        let left_incremental = matches!(
402            hash_join.left.pipeline_behavior(),
403            EmissionType::Incremental | EmissionType::Both
404        );
405        let right_unbounded = hash_join.right.boundedness().is_unbounded();
406        let right_incremental = matches!(
407            hash_join.right.pipeline_behavior(),
408            EmissionType::Incremental | EmissionType::Both
409        );
410        // Process only if both left and right sides are unbounded and incrementally emit.
411        if left_unbounded && right_unbounded & left_incremental & right_incremental {
412            // Determine the partition mode based on configuration.
413            let mode = if config_options.optimizer.repartition_joins {
414                StreamJoinPartitionMode::Partitioned
415            } else {
416                StreamJoinPartitionMode::SinglePartition
417            };
418            // A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
419            // This function checks if the columns involved in the filter have any specific ordering requirements.
420            // If the child nodes (left or right side of the join) already have a defined order and the columns used in the
421            // filter predicate are ordered, this function captures that ordering requirement. The identified order is then
422            // used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
423            // However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
424            // the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
425            // ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
426            // based on the properties of the child nodes and the filter condition.
427            let determine_order = |side: JoinSide| -> Option<LexOrdering> {
428                hash_join
429                    .filter()
430                    .map(|filter| {
431                        filter.column_indices().iter().any(
432                            |ColumnIndex {
433                                 index,
434                                 side: column_side,
435                             }| {
436                                // Skip if column side does not match the join side.
437                                if *column_side != side {
438                                    return false;
439                                }
440                                // Retrieve equivalence properties and schema based on the side.
441                                let (equivalence, schema) = match side {
442                                    JoinSide::Left => (
443                                        hash_join.left().equivalence_properties(),
444                                        hash_join.left().schema(),
445                                    ),
446                                    JoinSide::Right => (
447                                        hash_join.right().equivalence_properties(),
448                                        hash_join.right().schema(),
449                                    ),
450                                    JoinSide::None => return false,
451                                };
452
453                                let name = schema.field(*index).name();
454                                let col = Arc::new(Column::new(name, *index)) as _;
455                                // Check if the column is ordered.
456                                equivalence.get_expr_properties(col).sort_properties
457                                    != SortProperties::Unordered
458                            },
459                        )
460                    })
461                    .unwrap_or(false)
462                    .then(|| {
463                        match side {
464                            JoinSide::Left => hash_join.left().output_ordering(),
465                            JoinSide::Right => hash_join.right().output_ordering(),
466                            JoinSide::None => unreachable!(),
467                        }
468                        .cloned()
469                    })
470                    .flatten()
471            };
472
473            // Determine the sort order for both left and right sides.
474            let left_order = determine_order(JoinSide::Left);
475            let right_order = determine_order(JoinSide::Right);
476
477            return SymmetricHashJoinExec::try_new(
478                Arc::clone(hash_join.left()),
479                Arc::clone(hash_join.right()),
480                hash_join.on().to_vec(),
481                hash_join.filter().cloned(),
482                hash_join.join_type(),
483                hash_join.null_equality(),
484                left_order,
485                right_order,
486                mode,
487            )
488            .map(|exec| Arc::new(exec) as _);
489        }
490    }
491    Ok(input)
492}
493
494/// This subrule will swap build/probe sides of a hash join depending on whether
495/// one of its inputs may produce an infinite stream of records. The rule ensures
496/// that the left (build) side of the hash join always operates on an input stream
497/// that will produce a finite set of records. If the left side can not be chosen
498/// to be "finite", the join sides stay the same as the original query.
499/// ```text
500/// For example, this rule makes the following transformation:
501///
502///
503///
504///           +--------------+              +--------------+
505///           |              |  unbounded   |              |
506///    Left   | Infinite     |    true      | Hash         |\true
507///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
508///           |              |              |              |  \  |              |       |              |
509///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |
510///                                                            - |              |       |              |
511///           +--------------+              +--------------+  /  +--------------+       +--------------+
512///           |              |  unbounded   |              | /
513///    Right  | Finite       |    false     | Hash         |/false
514///           | Data Source  |--------------| Repartition  |
515///           |              |              |              |
516///           +--------------+              +--------------+
517///
518///
519///
520///           +--------------+              +--------------+
521///           |              |  unbounded   |              |
522///    Left   | Finite       |    false     | Hash         |\false
523///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
524///           |              |              |              |  \  |              | true  |              | true
525///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |-----
526///                                                            - |              |       |              |
527///           +--------------+              +--------------+  /  +--------------+       +--------------+
528///           |              |  unbounded   |              | /
529///    Right  | Infinite     |    true      | Hash         |/true
530///           | Data Source  |--------------| Repartition  |
531///           |              |              |              |
532///           +--------------+              +--------------+
533/// ```
534pub fn hash_join_swap_subrule(
535    mut input: Arc<dyn ExecutionPlan>,
536    _config_options: &ConfigOptions,
537) -> Result<Arc<dyn ExecutionPlan>> {
538    if let Some(hash_join) = input.downcast_ref::<HashJoinExec>()
539        && hash_join.left.boundedness().is_unbounded()
540        && !hash_join.right.boundedness().is_unbounded()
541        && !hash_join.null_aware // Don't swap null-aware anti joins
542        && matches!(
543            *hash_join.join_type(),
544            JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti
545        )
546    {
547        input = swap_join_according_to_unboundedness(hash_join)?;
548    }
549    Ok(input)
550}
551
552/// This function swaps sides of a hash join to make it runnable even if one of
553/// its inputs are infinite. Note that this is not always possible; i.e.
554/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
555/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
556/// we swap join sides. Therefore, we do not consider them here.
557/// This function is crate public as it is useful for downstream projects
558/// to implement, or experiment with, their own join selection rules.
559pub(crate) fn swap_join_according_to_unboundedness(
560    hash_join: &HashJoinExec,
561) -> Result<Arc<dyn ExecutionPlan>> {
562    let partition_mode = hash_join.partition_mode();
563    let join_type = hash_join.join_type();
564    match (*partition_mode, *join_type) {
565        (
566            _,
567            JoinType::Right
568            | JoinType::RightSemi
569            | JoinType::RightAnti
570            | JoinType::RightMark
571            | JoinType::Full,
572        ) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
573        (PartitionMode::Partitioned, _) => {
574            hash_join.swap_inputs(PartitionMode::Partitioned)
575        }
576        (PartitionMode::CollectLeft, _) => {
577            hash_join.swap_inputs(PartitionMode::CollectLeft)
578        }
579        (PartitionMode::Auto, _) => {
580            // Use `PartitionMode::Partitioned` as default if `Auto` is selected.
581            hash_join.swap_inputs(PartitionMode::Partitioned)
582        }
583    }
584}
585
586/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
587/// auxiliary boundedness information, is in the `PipelineStatePropagator` object.
588fn apply_subrules(
589    mut input: Arc<dyn ExecutionPlan>,
590    subrules: &Vec<Box<PipelineFixerSubrule>>,
591    config_options: &ConfigOptions,
592) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
593    let original = Arc::clone(&input);
594    for subrule in subrules {
595        input = subrule(input, config_options)?;
596    }
597
598    let transformed = !Arc::ptr_eq(&original, &input);
599
600    Ok(Transformed::new_transformed(input, transformed))
601}
602
603// See tests in datafusion/core/tests/physical_optimizer