Skip to main content

datafusion_physical_optimizer/
hash_join_buffering.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::PhysicalOptimizerRule;
19use datafusion_common::JoinSide;
20use datafusion_common::config::ConfigOptions;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_physical_plan::ExecutionPlan;
23use datafusion_physical_plan::buffer::BufferExec;
24use datafusion_physical_plan::joins::HashJoinExec;
25use std::sync::Arc;
26
27/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28/// configured capacity in the probe side:
29///
30/// ```text
31///            ┌───────────────────┐
32///            │   HashJoinExec    │
33///            └─────▲────────▲────┘
34///          ┌───────┘        └─────────┐
35///          │                          │
36/// ┌────────────────┐         ┌─────────────────┐
37/// │   Build side   │       + │   BufferExec    │
38/// └────────────────┘         └────────▲────────┘
39///                                     │
40///                            ┌────────┴────────┐
41///                            │   Probe side    │
42///                            └─────────────────┘
43/// ```
44///
45/// Which allows eagerly pulling it even before the build side has completely finished.
46#[derive(Debug, Default)]
47pub struct HashJoinBuffering {}
48
49impl HashJoinBuffering {
50    pub fn new() -> Self {
51        Self::default()
52    }
53}
54
55impl PhysicalOptimizerRule for HashJoinBuffering {
56    fn optimize(
57        &self,
58        plan: Arc<dyn ExecutionPlan>,
59        config: &ConfigOptions,
60    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61        let capacity = config.execution.hash_join_buffering_capacity;
62        if capacity == 0 {
63            return Ok(plan);
64        }
65
66        plan.transform_down(|plan| {
67            let Some(node) = plan.downcast_ref::<HashJoinExec>() else {
68                return Ok(Transformed::no(plan));
69            };
70            let plan = Arc::clone(&plan);
71            Ok(Transformed::yes(
72                if HashJoinExec::probe_side() == JoinSide::Left {
73                    // Do not stack BufferExec nodes together.
74                    if node.left.is::<BufferExec>() {
75                        return Ok(Transformed::no(plan));
76                    }
77                    plan.with_new_children(vec![
78                        Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79                        Arc::clone(&node.right),
80                    ])?
81                } else {
82                    // Do not stack BufferExec nodes together.
83                    if node.right.is::<BufferExec>() {
84                        return Ok(Transformed::no(plan));
85                    }
86                    plan.with_new_children(vec![
87                        Arc::clone(&node.left),
88                        Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89                    ])?
90                },
91            ))
92        })
93        .data()
94    }
95
96    fn name(&self) -> &str {
97        "HashJoinBuffering"
98    }
99
100    fn schema_check(&self) -> bool {
101        true
102    }
103}