datafusion_physical_optimizer/hash_join_buffering.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::PhysicalOptimizerRule;
19use datafusion_common::JoinSide;
20use datafusion_common::config::ConfigOptions;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_physical_plan::ExecutionPlan;
23use datafusion_physical_plan::buffer::BufferExec;
24use datafusion_physical_plan::joins::HashJoinExec;
25use std::sync::Arc;
26
27/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28/// configured capacity in the probe side:
29///
30/// ```text
31/// ┌───────────────────┐
32/// │ HashJoinExec │
33/// └─────▲────────▲────┘
34/// ┌───────┘ └─────────┐
35/// │ │
36/// ┌────────────────┐ ┌─────────────────┐
37/// │ Build side │ + │ BufferExec │
38/// └────────────────┘ └────────▲────────┘
39/// │
40/// ┌────────┴────────┐
41/// │ Probe side │
42/// └─────────────────┘
43/// ```
44///
45/// Which allows eagerly pulling it even before the build side has completely finished.
46#[derive(Debug, Default)]
47pub struct HashJoinBuffering {}
48
49impl HashJoinBuffering {
50 pub fn new() -> Self {
51 Self::default()
52 }
53}
54
55impl PhysicalOptimizerRule for HashJoinBuffering {
56 fn optimize(
57 &self,
58 plan: Arc<dyn ExecutionPlan>,
59 config: &ConfigOptions,
60 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61 let capacity = config.execution.hash_join_buffering_capacity;
62 if capacity == 0 {
63 return Ok(plan);
64 }
65
66 plan.transform_down(|plan| {
67 let Some(node) = plan.downcast_ref::<HashJoinExec>() else {
68 return Ok(Transformed::no(plan));
69 };
70 let plan = Arc::clone(&plan);
71 Ok(Transformed::yes(
72 if HashJoinExec::probe_side() == JoinSide::Left {
73 // Do not stack BufferExec nodes together.
74 if node.left.is::<BufferExec>() {
75 return Ok(Transformed::no(plan));
76 }
77 plan.with_new_children(vec![
78 Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79 Arc::clone(&node.right),
80 ])?
81 } else {
82 // Do not stack BufferExec nodes together.
83 if node.right.is::<BufferExec>() {
84 return Ok(Transformed::no(plan));
85 }
86 plan.with_new_children(vec![
87 Arc::clone(&node.left),
88 Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89 ])?
90 },
91 ))
92 })
93 .data()
94 }
95
96 fn name(&self) -> &str {
97 "HashJoinBuffering"
98 }
99
100 fn schema_check(&self) -> bool {
101 true
102 }
103}