Skip to main content

datafusion_physical_plan/joins/piecewise_merge_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21    compute::concat_batches,
22    util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{JoinSide, Result, internal_err};
27use datafusion_execution::{
28    SendableRecordBatchStream,
29    memory_pool::{MemoryConsumer, MemoryReservation},
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35    PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::Arc;
42use std::sync::atomic::AtomicUsize;
43
44use crate::execution_plan::{EmissionType, boundedness_from_children};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50    build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::metrics::MetricsSet;
54use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
55use crate::{
56    ExecutionPlan, PlanProperties,
57    joins::{
58        SharedBitmapBuilder,
59        utils::{BuildProbeJoinMetrics, OnceAsync, OnceFut, build_join_schema},
60    },
61    metrics::ExecutionPlanMetricsSet,
62    spill::get_record_batch_memory_size,
63};
64
65/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
66/// better performance for these workloads than `NestedLoopJoin`
67///
68/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
69/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
70/// [`Operator::GtEq`].:
71/// Examples:
72///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
73///
74/// # Execution Plan Inputs
75/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
76/// 'buffered' side.
77///
78/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
79/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
80///
81/// # Algorithms
82/// Classic joins are processed differently compared to existence joins.
83///
84/// ## Classic Joins (Inner, Full, Left, Right)
85/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
86/// Both sides are sorted so that we can iterate from index 0 to the end on each side.  This ordering ensures
87/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
88/// probe rows from the match position onward, without rescanning earlier probe rows.
89///  
90/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
91/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
92/// monotonically as we stream new batches from the stream side.
93///
94/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
95/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
96/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
97/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
98///
99/// The pseudocode for the algorithm looks like this:
100///
101/// ```text
102/// for stream_row in stream_batch:
103///     for buffer_row in buffer_batch:
104///         if compare(stream_row, probe_row):
105///             output stream_row X buffer_batch[buffer_row:]
106///         else:
107///             continue
108/// ```
109///
110/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
111/// the buffered side to find every first match. By doing this, each match can output more result so that output
112/// handling can be better vectorized for performance.
113///
114/// Here is an example:
115///
116/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
117/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
118/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
119/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
120/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
121/// all be larger values.
122///
123/// ```text
124/// SQL statement:
125/// SELECT *
126/// FROM (VALUES (100), (200), (500)) AS streamed(a)
127/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
128///   ON streamed.a < buffered.b;
129///
130/// Processing Row 1:
131///
132///       Sorted Buffered Side                                         Sorted Streamed Side          
133///       ┌──────────────────┐                                         ┌──────────────────┐         
134///     1 │       100        │                                       1 │       100        │        
135///       ├──────────────────┤                                         ├──────────────────┤         
136///     2 │       200        │ ─┐                                    2 │       200        │        
137///       ├──────────────────┤  │  For row 1 on streamed side with     ├──────────────────┤         
138///     3 │       200        │  │  value 100, we emit rows 2 - 5.    3 │       500        │       
139///       ├──────────────────┤  │  as matches when the operator is     └──────────────────┘
140///     4 │       300        │  │  `Operator::Lt` (<) Emitting all
141///       ├──────────────────┤  │  rows after the first match (row
142///     5 │       400        │ ─┘  2 buffered side; 100 < 200)
143///       └──────────────────┘     
144///
145/// Processing Row 2:
146///   By sorting the streamed side we know
147///
148///       Sorted Buffered Side                                         Sorted Streamed Side          
149///       ┌──────────────────┐                                         ┌──────────────────┐         
150///     1 │       100        │                                       1 │       100        │        
151///       ├──────────────────┤                                         ├──────────────────┤         
152///     2 │       200        │ <- Start here when probing for the    2 │       200        │        
153///       ├──────────────────┤    streamed side row 2.                 ├──────────────────┤         
154///     3 │       200        │                                       3 │       500        │       
155///       ├──────────────────┤                                         └──────────────────┘
156///     4 │       300        │  
157///       ├──────────────────┤  
158///     5 │       400        │
159///       └──────────────────┘     
160/// ```
161///
162/// ## Existence Joins (Semi, Anti, Mark)
163/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
164/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
165/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
166///
167/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
168/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
169/// need to be sorted due to only needing to find the min/max.
170///
171/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
172///
173/// The pseudocode for the algorithm looks like this:
174///
175/// ```text
176/// // Using the example of a less than `<` operation
177/// let max = max_batch(streamed_batch)
178///
179/// for buffer_row in buffer_batch:
180///     if buffer_row < max:
181///         output buffer_batch[buffer_row:]
182/// ```
183///
184/// Only need to find the min/max value and iterate through the buffered side once.
185///
186/// Here is an example:
187/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
188/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
189/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
190/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
191/// those rows as matched.
192///
193/// ```text
194/// SQL statement:
195/// SELECT *
196/// FROM (VALUES (500), (200), (300)) AS streamed(a)
197/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
198///   ON streamed.a < buffered.b;
199///
200///          Sorted Buffered Side             Unsorted Streamed Side
201///            ┌──────────────────┐          ┌──────────────────┐
202///          1 │       100        │        1 │       500        │
203///            ├──────────────────┤          ├──────────────────┤
204///          2 │       200        │        2 │       200        │
205///            ├──────────────────┤          ├──────────────────┤    
206///          3 │       200        │        3 │       300        │
207///            ├──────────────────┤          └──────────────────┘
208///          4 │       300        │ ─┐       
209///            ├──────────────────┤  | We emit matches for row 4 - 5
210///          5 │       400        │ ─┘ on the buffered side.
211///            └──────────────────┘
212///             min value: 200
213/// ```
214///
215/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
216/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
217///
218/// # Partitioning Logic
219/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
220/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
221/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
222/// execution will be responsible for outputting the unmatched rows.
223///
224/// # Performance Explanation (cost)
225/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
226///
227/// R: Buffered Side
228/// S: Streamed Side
229///
230/// ## Piecewise Merge Join (PWMJ)
231///
232/// # Classic Join:
233/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
234/// is found.
235///     Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
236///
237/// # Mark Join:
238/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
239/// within that range.  
240///   Complexity: `O(|S| + scan(R[range]))`.
241///
242/// ## Nested Loop Join
243/// Compares every row from `S` with every row from `R`.  
244///   Complexity: `O(|S| * |R|)`.
245///
246/// ## Nested Loop Join
247///   Always going to be probe (O(S) * O(R)).
248///
249/// # Further Reference Material
250/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
251#[derive(Debug)]
252pub struct PiecewiseMergeJoinExec {
253    /// Left buffered execution plan
254    pub buffered: Arc<dyn ExecutionPlan>,
255    /// Right streamed execution plan
256    pub streamed: Arc<dyn ExecutionPlan>,
257    /// The two expressions being compared
258    pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
259    /// Comparison operator in the range predicate
260    pub operator: Operator,
261    /// How the join is performed
262    pub join_type: JoinType,
263    /// The schema once the join is applied
264    schema: SchemaRef,
265    /// Buffered data
266    buffered_fut: OnceAsync<BufferedSideData>,
267    /// Execution metrics
268    metrics: ExecutionPlanMetricsSet,
269
270    /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
271    ///
272    /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
273    left_child_plan_required_order: LexOrdering,
274    /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
275    /// Unsorted for mark joins
276    #[expect(dead_code)]
277    right_batch_required_orders: LexOrdering,
278
279    /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
280    sort_options: SortOptions,
281    /// Cache holding plan properties like equivalences, output partitioning etc.
282    cache: PlanProperties,
283    /// Number of partitions to process
284    num_partitions: usize,
285}
286
287impl PiecewiseMergeJoinExec {
288    pub fn try_new(
289        buffered: Arc<dyn ExecutionPlan>,
290        streamed: Arc<dyn ExecutionPlan>,
291        on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
292        operator: Operator,
293        join_type: JoinType,
294        num_partitions: usize,
295    ) -> Result<Self> {
296        // TODO: Implement existence joins for PiecewiseMergeJoin
297        if is_existence_join(join_type) {
298            return not_impl_err!(
299                "Existence Joins are currently not supported for PiecewiseMergeJoin"
300            );
301        }
302
303        // Take the operator and enforce a sort order on the streamed + buffered side based on
304        // the operator type.
305        let sort_options = match operator {
306            Operator::Lt | Operator::LtEq => {
307                // For left existence joins the inputs will be swapped so the sort
308                // options are switched
309                if is_right_existence_join(join_type) {
310                    SortOptions::new(false, true)
311                } else {
312                    SortOptions::new(true, true)
313                }
314            }
315            Operator::Gt | Operator::GtEq => {
316                if is_right_existence_join(join_type) {
317                    SortOptions::new(true, true)
318                } else {
319                    SortOptions::new(false, true)
320                }
321            }
322            _ => {
323                return internal_err!(
324                    "Cannot contain non-range operator in PiecewiseMergeJoinExec"
325                );
326            }
327        };
328
329        // Give the same `sort_option for comparison later`
330        let left_child_plan_required_order =
331            vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
332        let right_batch_required_orders =
333            vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
334
335        let Some(left_child_plan_required_order) =
336            LexOrdering::new(left_child_plan_required_order)
337        else {
338            return internal_err!(
339                "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
340            );
341        };
342        let Some(right_batch_required_orders) =
343            LexOrdering::new(right_batch_required_orders)
344        else {
345            return internal_err!(
346                "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
347            );
348        };
349
350        let buffered_schema = buffered.schema();
351        let streamed_schema = streamed.schema();
352
353        // Create output schema for the join
354        let schema =
355            Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
356        let cache = Self::compute_properties(
357            &buffered,
358            &streamed,
359            Arc::clone(&schema),
360            join_type,
361            &on,
362        )?;
363
364        Ok(Self {
365            streamed,
366            buffered,
367            on,
368            operator,
369            join_type,
370            schema,
371            buffered_fut: Default::default(),
372            metrics: ExecutionPlanMetricsSet::new(),
373            left_child_plan_required_order,
374            right_batch_required_orders,
375            sort_options,
376            cache,
377            num_partitions,
378        })
379    }
380
381    /// Reference to buffered side execution plan
382    pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
383        &self.buffered
384    }
385
386    /// Reference to streamed side execution plan
387    pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
388        &self.streamed
389    }
390
391    /// Join type
392    pub fn join_type(&self) -> JoinType {
393        self.join_type
394    }
395
396    /// Reference to sort options
397    pub fn sort_options(&self) -> &SortOptions {
398        &self.sort_options
399    }
400
401    /// Get probe side (streamed side) for the PiecewiseMergeJoin
402    /// In current implementation, probe side is determined according to join type.
403    pub fn probe_side(join_type: &JoinType) -> JoinSide {
404        match join_type {
405            JoinType::Right
406            | JoinType::Inner
407            | JoinType::Full
408            | JoinType::RightSemi
409            | JoinType::RightAnti
410            | JoinType::RightMark => JoinSide::Right,
411            JoinType::Left
412            | JoinType::LeftAnti
413            | JoinType::LeftSemi
414            | JoinType::LeftMark => JoinSide::Left,
415        }
416    }
417
418    pub fn compute_properties(
419        buffered: &Arc<dyn ExecutionPlan>,
420        streamed: &Arc<dyn ExecutionPlan>,
421        schema: SchemaRef,
422        join_type: JoinType,
423        join_on: &(PhysicalExprRef, PhysicalExprRef),
424    ) -> Result<PlanProperties> {
425        let eq_properties = join_equivalence_properties(
426            buffered.equivalence_properties().clone(),
427            streamed.equivalence_properties().clone(),
428            &join_type,
429            schema,
430            &Self::maintains_input_order(join_type),
431            Some(Self::probe_side(&join_type)),
432            std::slice::from_ref(join_on),
433        )?;
434
435        let output_partitioning =
436            asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
437
438        Ok(PlanProperties::new(
439            eq_properties,
440            output_partitioning,
441            EmissionType::Incremental,
442            boundedness_from_children([buffered, streamed]),
443        ))
444    }
445
446    // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
447    // However, for certain join types the order is maintained. This can be updated in the future after
448    // more testing.
449    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
450        match join_type {
451            // The existence side is expected to come in sorted
452            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
453                vec![false, false]
454            }
455            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
456                vec![false, false]
457            }
458            // Left, Right, Full, Inner Join is not guaranteed to maintain
459            // input order as the streamed side will be sorted during
460            // execution for `PiecewiseMergeJoin`
461            _ => vec![false, false],
462        }
463    }
464
465    // TODO
466    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
467        todo!()
468    }
469}
470
471impl ExecutionPlan for PiecewiseMergeJoinExec {
472    fn name(&self) -> &str {
473        "PiecewiseMergeJoinExec"
474    }
475
476    fn as_any(&self) -> &dyn std::any::Any {
477        self
478    }
479
480    fn properties(&self) -> &PlanProperties {
481        &self.cache
482    }
483
484    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
485        vec![&self.buffered, &self.streamed]
486    }
487
488    fn required_input_distribution(&self) -> Vec<Distribution> {
489        vec![
490            Distribution::SinglePartition,
491            Distribution::UnspecifiedDistribution,
492        ]
493    }
494
495    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
496        // Existence joins don't need to be sorted on one side.
497        if is_right_existence_join(self.join_type) {
498            unimplemented!()
499        } else {
500            // Sort the right side in memory, so we do not need to enforce any sorting
501            vec![
502                Some(OrderingRequirements::from(
503                    self.left_child_plan_required_order.clone(),
504                )),
505                None,
506            ]
507        }
508    }
509
510    fn with_new_children(
511        self: Arc<Self>,
512        children: Vec<Arc<dyn ExecutionPlan>>,
513    ) -> Result<Arc<dyn ExecutionPlan>> {
514        match &children[..] {
515            [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
516                Arc::clone(left),
517                Arc::clone(right),
518                self.on.clone(),
519                self.operator,
520                self.join_type,
521                self.num_partitions,
522            )?)),
523            _ => internal_err!(
524                "PiecewiseMergeJoin should have 2 children, found {}",
525                children.len()
526            ),
527        }
528    }
529
530    fn execute(
531        &self,
532        partition: usize,
533        context: Arc<datafusion_execution::TaskContext>,
534    ) -> Result<SendableRecordBatchStream> {
535        let on_buffered = Arc::clone(&self.on.0);
536        let on_streamed = Arc::clone(&self.on.1);
537
538        let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
539        let buffered_fut = self.buffered_fut.try_once(|| {
540            let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
541                .register(context.memory_pool());
542
543            let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
544            Ok(build_buffered_data(
545                buffered_stream,
546                Arc::clone(&on_buffered),
547                metrics.clone(),
548                reservation,
549                build_visited_indices_map(self.join_type),
550                self.num_partitions,
551            ))
552        })?;
553
554        let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
555
556        let batch_size = context.session_config().batch_size();
557
558        // TODO: Add existence joins + this is guarded at physical planner
559        if is_existence_join(self.join_type()) {
560            unreachable!()
561        } else {
562            Ok(Box::pin(ClassicPWMJStream::try_new(
563                Arc::clone(&self.schema),
564                on_streamed,
565                self.join_type,
566                self.operator,
567                streamed,
568                BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
569                PiecewiseMergeJoinStreamState::WaitBufferedSide,
570                self.sort_options,
571                metrics,
572                batch_size,
573            )))
574        }
575    }
576
577    fn metrics(&self) -> Option<MetricsSet> {
578        Some(self.metrics.clone_inner())
579    }
580}
581
582impl DisplayAs for PiecewiseMergeJoinExec {
583    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
584        let on_str = format!(
585            "({} {} {})",
586            fmt_sql(self.on.0.as_ref()),
587            self.operator,
588            fmt_sql(self.on.1.as_ref())
589        );
590
591        match t {
592            DisplayFormatType::Default | DisplayFormatType::Verbose => {
593                write!(
594                    f,
595                    "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
596                    self.operator, self.join_type, on_str
597                )
598            }
599
600            DisplayFormatType::TreeRender => {
601                writeln!(f, "operator={:?}", self.operator)?;
602                if self.join_type != JoinType::Inner {
603                    writeln!(f, "join_type={:?}", self.join_type)?;
604                }
605                writeln!(f, "on={on_str}")
606            }
607        }
608    }
609}
610
611async fn build_buffered_data(
612    buffered: SendableRecordBatchStream,
613    on_buffered: PhysicalExprRef,
614    metrics: BuildProbeJoinMetrics,
615    reservation: MemoryReservation,
616    build_map: bool,
617    remaining_partitions: usize,
618) -> Result<BufferedSideData> {
619    let schema = buffered.schema();
620
621    // Combine batches and record number of rows
622    let initial = (Vec::new(), 0, metrics, reservation);
623    let (batches, num_rows, metrics, mut reservation) = buffered
624        .try_fold(initial, |mut acc, batch| async {
625            let batch_size = get_record_batch_memory_size(&batch);
626            acc.3.try_grow(batch_size)?;
627            acc.2.build_mem_used.add(batch_size);
628            acc.2.build_input_batches.add(1);
629            acc.2.build_input_rows.add(batch.num_rows());
630            // Update row count
631            acc.1 += batch.num_rows();
632            // Push batch to output
633            acc.0.push(batch);
634            Ok(acc)
635        })
636        .await?;
637
638    let single_batch = concat_batches(&schema, batches.iter())?;
639
640    // Evaluate physical expression on the buffered side.
641    let buffered_values = on_buffered
642        .evaluate(&single_batch)?
643        .into_array(single_batch.num_rows())?;
644
645    // We add the single batch size + the memory of the join keys
646    // size of the size estimation
647    let size_estimation = get_record_batch_memory_size(&single_batch)
648        + buffered_values.get_array_memory_size();
649    reservation.try_grow(size_estimation)?;
650    metrics.build_mem_used.add(size_estimation);
651
652    // Created visited indices bitmap only if the join type requires it
653    let visited_indices_bitmap = if build_map {
654        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
655        reservation.try_grow(bitmap_size)?;
656        metrics.build_mem_used.add(bitmap_size);
657
658        let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
659        bitmap_buffer.append_n(num_rows, false);
660        bitmap_buffer
661    } else {
662        BooleanBufferBuilder::new(0)
663    };
664
665    let buffered_data = BufferedSideData::new(
666        single_batch,
667        buffered_values,
668        Mutex::new(visited_indices_bitmap),
669        remaining_partitions,
670        reservation,
671    );
672
673    Ok(buffered_data)
674}
675
676pub(super) struct BufferedSideData {
677    pub(super) batch: RecordBatch,
678    values: ArrayRef,
679    pub(super) visited_indices_bitmap: SharedBitmapBuilder,
680    pub(super) remaining_partitions: AtomicUsize,
681    _reservation: MemoryReservation,
682}
683
684impl BufferedSideData {
685    pub(super) fn new(
686        batch: RecordBatch,
687        values: ArrayRef,
688        visited_indices_bitmap: SharedBitmapBuilder,
689        remaining_partitions: usize,
690        reservation: MemoryReservation,
691    ) -> Self {
692        Self {
693            batch,
694            values,
695            visited_indices_bitmap,
696            remaining_partitions: AtomicUsize::new(remaining_partitions),
697            _reservation: reservation,
698        }
699    }
700
701    pub(super) fn batch(&self) -> &RecordBatch {
702        &self.batch
703    }
704
705    pub(super) fn values(&self) -> &ArrayRef {
706        &self.values
707    }
708}
709
710pub(super) enum BufferedSide {
711    /// Indicates that build-side not collected yet
712    Initial(BufferedSideInitialState),
713    /// Indicates that build-side data has been collected
714    Ready(BufferedSideReadyState),
715}
716
717impl BufferedSide {
718    // Takes a mutable state of the buffered row batches
719    pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
720        match self {
721            BufferedSide::Initial(state) => Ok(state),
722            _ => internal_err!("Expected build side in initial state"),
723        }
724    }
725
726    pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
727        match self {
728            BufferedSide::Ready(state) => Ok(state),
729            _ => {
730                internal_err!("Expected build side in ready state")
731            }
732        }
733    }
734
735    /// Tries to extract BuildSideReadyState from BuildSide enum.
736    /// Returns an error if state is not Ready.
737    pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
738        match self {
739            BufferedSide::Ready(state) => Ok(state),
740            _ => internal_err!("Expected build side in ready state"),
741        }
742    }
743}
744
745pub(super) struct BufferedSideInitialState {
746    pub(crate) buffered_fut: OnceFut<BufferedSideData>,
747}
748
749pub(super) struct BufferedSideReadyState {
750    /// Collected build-side data
751    pub(super) buffered_data: Arc<BufferedSideData>,
752}