datafusion_physical_plan/joins/piecewise_merge_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21    compute::concat_batches,
22    util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{internal_err, JoinSide, Result};
27use datafusion_execution::{
28    memory_pool::{MemoryConsumer, MemoryReservation},
29    SendableRecordBatchStream,
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35    PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::atomic::AtomicUsize;
42use std::sync::Arc;
43
44use crate::execution_plan::{boundedness_from_children, EmissionType};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50    build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::{
54    joins::{
55        utils::{build_join_schema, BuildProbeJoinMetrics, OnceAsync, OnceFut},
56        SharedBitmapBuilder,
57    },
58    metrics::ExecutionPlanMetricsSet,
59    spill::get_record_batch_memory_size,
60    ExecutionPlan, PlanProperties,
61};
62use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
63
64/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
65/// better performance for these workloads than `NestedLoopJoin`
66///
67/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
68/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
69/// [`Operator::GtEq`].:
70/// Examples:
71///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
72///
73/// # Execution Plan Inputs
74/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
75/// 'buffered' side.
76///
77/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
78/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
79///
80/// # Algorithms
81/// Classic joins are processed differently compared to existence joins.
82///
83/// ## Classic Joins (Inner, Full, Left, Right)
84/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
85/// Both sides are sorted so that we can iterate from index 0 to the end on each side.  This ordering ensures
86/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
87/// probe rows from the match position onward, without rescanning earlier probe rows.
88///  
89/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
90/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
91/// monotonically as we stream new batches from the stream side.
92///
93/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
94/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
95/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
96/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
97///
98/// The pseudocode for the algorithm looks like this:
99///
100/// ```text
101/// for stream_row in stream_batch:
102///     for buffer_row in buffer_batch:
103///         if compare(stream_row, probe_row):
104///             output stream_row X buffer_batch[buffer_row:]
105///         else:
106///             continue
107/// ```
108///
109/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
110/// the buffered side to find every first match. By doing this, each match can output more result so that output
111/// handling can be better vectorized for performance.
112///
113/// Here is an example:
114///
115/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
116/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
117/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
118/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
119/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
120/// all be larger values.
121///
122/// ```text
123/// SQL statement:
124/// SELECT *
125/// FROM (VALUES (100), (200), (500)) AS streamed(a)
126/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
127///   ON streamed.a < buffered.b;
128///
129/// Processing Row 1:
130///
131///       Sorted Buffered Side                                         Sorted Streamed Side          
132///       ┌──────────────────┐                                         ┌──────────────────┐         
133///     1 │       100        │                                       1 │       100        │        
134///       ├──────────────────┤                                         ├──────────────────┤         
135///     2 │       200        │ ─┐                                    2 │       200        │        
136///       ├──────────────────┤  │  For row 1 on streamed side with     ├──────────────────┤         
137///     3 │       200        │  │  value 100, we emit rows 2 - 5.    3 │       500        │       
138///       ├──────────────────┤  │  as matches when the operator is     └──────────────────┘
139///     4 │       300        │  │  `Operator::Lt` (<) Emitting all
140///       ├──────────────────┤  │  rows after the first match (row
141///     5 │       400        │ ─┘  2 buffered side; 100 < 200)
142///       └──────────────────┘     
143///
144/// Processing Row 2:
145///   By sorting the streamed side we know
146///
147///       Sorted Buffered Side                                         Sorted Streamed Side          
148///       ┌──────────────────┐                                         ┌──────────────────┐         
149///     1 │       100        │                                       1 │       100        │        
150///       ├──────────────────┤                                         ├──────────────────┤         
151///     2 │       200        │ <- Start here when probing for the    2 │       200        │        
152///       ├──────────────────┤    streamed side row 2.                 ├──────────────────┤         
153///     3 │       200        │                                       3 │       500        │       
154///       ├──────────────────┤                                         └──────────────────┘
155///     4 │       300        │  
156///       ├──────────────────┤  
157///     5 │       400        │
158///       └──────────────────┘     
159/// ```
160///
161/// ## Existence Joins (Semi, Anti, Mark)
162/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
163/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
164/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
165///
166/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
167/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
168/// need to be sorted due to only needing to find the min/max.
169///
170/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
171///
172/// The pseudocode for the algorithm looks like this:
173///
174/// ```text
175/// // Using the example of a less than `<` operation
176/// let max = max_batch(streamed_batch)
177///
178/// for buffer_row in buffer_batch:
179///     if buffer_row < max:
180///         output buffer_batch[buffer_row:]
181/// ```
182///
183/// Only need to find the min/max value and iterate through the buffered side once.
184///
185/// Here is an example:
186/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
187/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
188/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
189/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
190/// those rows as matched.
191///
192/// ```text
193/// SQL statement:
194/// SELECT *
195/// FROM (VALUES (500), (200), (300)) AS streamed(a)
196/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
197///   ON streamed.a < buffered.b;
198///
199///          Sorted Buffered Side             Unsorted Streamed Side
200///            ┌──────────────────┐          ┌──────────────────┐
201///          1 │       100        │        1 │       500        │
202///            ├──────────────────┤          ├──────────────────┤
203///          2 │       200        │        2 │       200        │
204///            ├──────────────────┤          ├──────────────────┤    
205///          3 │       200        │        3 │       300        │
206///            ├──────────────────┤          └──────────────────┘
207///          4 │       300        │ ─┐       
208///            ├──────────────────┤  | We emit matches for row 4 - 5
209///          5 │       400        │ ─┘ on the buffered side.
210///            └──────────────────┘
211///             min value: 200
212/// ```
213///
214/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
215/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
216///
217/// # Partitioning Logic
218/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
219/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
220/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
221/// execution will be responsible for outputting the unmatched rows.
222///
223/// # Performance Explanation (cost)
224/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
225///
226/// R: Buffered Side
227/// S: Streamed Side
228///
229/// ## Piecewise Merge Join (PWMJ)
230///
231/// # Classic Join:
232/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
233/// is found.
234///     Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
235///
236/// # Mark Join:
237/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
238/// within that range.  
239///   Complexity: `O(|S| + scan(R[range]))`.
240///
241/// ## Nested Loop Join
242/// Compares every row from `S` with every row from `R`.  
243///   Complexity: `O(|S| * |R|)`.
244///
245/// ## Nested Loop Join
246///   Always going to be probe (O(S) * O(R)).
247///
248/// # Further Reference Material
249/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
250#[derive(Debug)]
251pub struct PiecewiseMergeJoinExec {
252    /// Left buffered execution plan
253    pub buffered: Arc<dyn ExecutionPlan>,
254    /// Right streamed execution plan
255    pub streamed: Arc<dyn ExecutionPlan>,
256    /// The two expressions being compared
257    pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
258    /// Comparison operator in the range predicate
259    pub operator: Operator,
260    /// How the join is performed
261    pub join_type: JoinType,
262    /// The schema once the join is applied
263    schema: SchemaRef,
264    /// Buffered data
265    buffered_fut: OnceAsync<BufferedSideData>,
266    /// Execution metrics
267    metrics: ExecutionPlanMetricsSet,
268
269    /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
270    ///
271    /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
272    left_child_plan_required_order: LexOrdering,
273    /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
274    /// Unsorted for mark joins
275    #[allow(unused)]
276    right_batch_required_orders: LexOrdering,
277
278    /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
279    sort_options: SortOptions,
280    /// Cache holding plan properties like equivalences, output partitioning etc.
281    cache: PlanProperties,
282    /// Number of partitions to process
283    num_partitions: usize,
284}
285
286impl PiecewiseMergeJoinExec {
287    pub fn try_new(
288        buffered: Arc<dyn ExecutionPlan>,
289        streamed: Arc<dyn ExecutionPlan>,
290        on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
291        operator: Operator,
292        join_type: JoinType,
293        num_partitions: usize,
294    ) -> Result<Self> {
295        // TODO: Implement existence joins for PiecewiseMergeJoin
296        if is_existence_join(join_type) {
297            return not_impl_err!(
298                "Existence Joins are currently not supported for PiecewiseMergeJoin"
299            );
300        }
301
302        // Take the operator and enforce a sort order on the streamed + buffered side based on
303        // the operator type.
304        let sort_options = match operator {
305            Operator::Lt | Operator::LtEq => {
306                // For left existence joins the inputs will be swapped so the sort
307                // options are switched
308                if is_right_existence_join(join_type) {
309                    SortOptions::new(false, true)
310                } else {
311                    SortOptions::new(true, true)
312                }
313            }
314            Operator::Gt | Operator::GtEq => {
315                if is_right_existence_join(join_type) {
316                    SortOptions::new(true, true)
317                } else {
318                    SortOptions::new(false, true)
319                }
320            }
321            _ => {
322                return internal_err!(
323                    "Cannot contain non-range operator in PiecewiseMergeJoinExec"
324                )
325            }
326        };
327
328        // Give the same `sort_option for comparison later`
329        let left_child_plan_required_order =
330            vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
331        let right_batch_required_orders =
332            vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
333
334        let Some(left_child_plan_required_order) =
335            LexOrdering::new(left_child_plan_required_order)
336        else {
337            return internal_err!(
338                "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
339            );
340        };
341        let Some(right_batch_required_orders) =
342            LexOrdering::new(right_batch_required_orders)
343        else {
344            return internal_err!(
345                "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
346            );
347        };
348
349        let buffered_schema = buffered.schema();
350        let streamed_schema = streamed.schema();
351
352        // Create output schema for the join
353        let schema =
354            Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
355        let cache = Self::compute_properties(
356            &buffered,
357            &streamed,
358            Arc::clone(&schema),
359            join_type,
360            &on,
361        )?;
362
363        Ok(Self {
364            streamed,
365            buffered,
366            on,
367            operator,
368            join_type,
369            schema,
370            buffered_fut: Default::default(),
371            metrics: ExecutionPlanMetricsSet::new(),
372            left_child_plan_required_order,
373            right_batch_required_orders,
374            sort_options,
375            cache,
376            num_partitions,
377        })
378    }
379
380    /// Reference to buffered side execution plan
381    pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
382        &self.buffered
383    }
384
385    /// Reference to streamed side execution plan
386    pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
387        &self.streamed
388    }
389
390    /// Join type
391    pub fn join_type(&self) -> JoinType {
392        self.join_type
393    }
394
395    /// Reference to sort options
396    pub fn sort_options(&self) -> &SortOptions {
397        &self.sort_options
398    }
399
400    /// Get probe side (streamed side) for the PiecewiseMergeJoin
401    /// In current implementation, probe side is determined according to join type.
402    pub fn probe_side(join_type: &JoinType) -> JoinSide {
403        match join_type {
404            JoinType::Right
405            | JoinType::Inner
406            | JoinType::Full
407            | JoinType::RightSemi
408            | JoinType::RightAnti
409            | JoinType::RightMark => JoinSide::Right,
410            JoinType::Left
411            | JoinType::LeftAnti
412            | JoinType::LeftSemi
413            | JoinType::LeftMark => JoinSide::Left,
414        }
415    }
416
417    pub fn compute_properties(
418        buffered: &Arc<dyn ExecutionPlan>,
419        streamed: &Arc<dyn ExecutionPlan>,
420        schema: SchemaRef,
421        join_type: JoinType,
422        join_on: &(PhysicalExprRef, PhysicalExprRef),
423    ) -> Result<PlanProperties> {
424        let eq_properties = join_equivalence_properties(
425            buffered.equivalence_properties().clone(),
426            streamed.equivalence_properties().clone(),
427            &join_type,
428            schema,
429            &Self::maintains_input_order(join_type),
430            Some(Self::probe_side(&join_type)),
431            std::slice::from_ref(join_on),
432        )?;
433
434        let output_partitioning =
435            asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
436
437        Ok(PlanProperties::new(
438            eq_properties,
439            output_partitioning,
440            EmissionType::Incremental,
441            boundedness_from_children([buffered, streamed]),
442        ))
443    }
444
445    // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
446    // However, for certain join types the order is maintained. This can be updated in the future after
447    // more testing.
448    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
449        match join_type {
450            // The existence side is expected to come in sorted
451            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
452                vec![false, false]
453            }
454            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
455                vec![false, false]
456            }
457            // Left, Right, Full, Inner Join is not guaranteed to maintain
458            // input order as the streamed side will be sorted during
459            // execution for `PiecewiseMergeJoin`
460            _ => vec![false, false],
461        }
462    }
463
464    // TODO
465    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
466        todo!()
467    }
468}
469
470impl ExecutionPlan for PiecewiseMergeJoinExec {
471    fn name(&self) -> &str {
472        "PiecewiseMergeJoinExec"
473    }
474
475    fn as_any(&self) -> &dyn std::any::Any {
476        self
477    }
478
479    fn properties(&self) -> &PlanProperties {
480        &self.cache
481    }
482
483    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
484        vec![&self.buffered, &self.streamed]
485    }
486
487    fn required_input_distribution(&self) -> Vec<Distribution> {
488        vec![
489            Distribution::SinglePartition,
490            Distribution::UnspecifiedDistribution,
491        ]
492    }
493
494    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
495        // Existence joins don't need to be sorted on one side.
496        if is_right_existence_join(self.join_type) {
497            unimplemented!()
498        } else {
499            // Sort the right side in memory, so we do not need to enforce any sorting
500            vec![
501                Some(OrderingRequirements::from(
502                    self.left_child_plan_required_order.clone(),
503                )),
504                None,
505            ]
506        }
507    }
508
509    fn with_new_children(
510        self: Arc<Self>,
511        children: Vec<Arc<dyn ExecutionPlan>>,
512    ) -> Result<Arc<dyn ExecutionPlan>> {
513        match &children[..] {
514            [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
515                Arc::clone(left),
516                Arc::clone(right),
517                self.on.clone(),
518                self.operator,
519                self.join_type,
520                self.num_partitions,
521            )?)),
522            _ => internal_err!(
523                "PiecewiseMergeJoin should have 2 children, found {}",
524                children.len()
525            ),
526        }
527    }
528
529    fn execute(
530        &self,
531        partition: usize,
532        context: Arc<datafusion_execution::TaskContext>,
533    ) -> Result<SendableRecordBatchStream> {
534        let on_buffered = Arc::clone(&self.on.0);
535        let on_streamed = Arc::clone(&self.on.1);
536
537        let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
538        let buffered_fut = self.buffered_fut.try_once(|| {
539            let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
540                .register(context.memory_pool());
541
542            let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
543            Ok(build_buffered_data(
544                buffered_stream,
545                Arc::clone(&on_buffered),
546                metrics.clone(),
547                reservation,
548                build_visited_indices_map(self.join_type),
549                self.num_partitions,
550            ))
551        })?;
552
553        let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
554
555        let batch_size = context.session_config().batch_size();
556
557        // TODO: Add existence joins + this is guarded at physical planner
558        if is_existence_join(self.join_type()) {
559            unreachable!()
560        } else {
561            Ok(Box::pin(ClassicPWMJStream::try_new(
562                Arc::clone(&self.schema),
563                on_streamed,
564                self.join_type,
565                self.operator,
566                streamed,
567                BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
568                PiecewiseMergeJoinStreamState::WaitBufferedSide,
569                self.sort_options,
570                metrics,
571                batch_size,
572            )))
573        }
574    }
575}
576
577impl DisplayAs for PiecewiseMergeJoinExec {
578    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
579        let on_str = format!(
580            "({} {} {})",
581            fmt_sql(self.on.0.as_ref()),
582            self.operator,
583            fmt_sql(self.on.1.as_ref())
584        );
585
586        match t {
587            DisplayFormatType::Default | DisplayFormatType::Verbose => {
588                write!(
589                    f,
590                    "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
591                    self.operator, self.join_type, on_str
592                )
593            }
594
595            DisplayFormatType::TreeRender => {
596                writeln!(f, "operator={:?}", self.operator)?;
597                if self.join_type != JoinType::Inner {
598                    writeln!(f, "join_type={:?}", self.join_type)?;
599                }
600                writeln!(f, "on={on_str}")
601            }
602        }
603    }
604}
605
606async fn build_buffered_data(
607    buffered: SendableRecordBatchStream,
608    on_buffered: PhysicalExprRef,
609    metrics: BuildProbeJoinMetrics,
610    reservation: MemoryReservation,
611    build_map: bool,
612    remaining_partitions: usize,
613) -> Result<BufferedSideData> {
614    let schema = buffered.schema();
615
616    // Combine batches and record number of rows
617    let initial = (Vec::new(), 0, metrics, reservation);
618    let (batches, num_rows, metrics, mut reservation) = buffered
619        .try_fold(initial, |mut acc, batch| async {
620            let batch_size = get_record_batch_memory_size(&batch);
621            acc.3.try_grow(batch_size)?;
622            acc.2.build_mem_used.add(batch_size);
623            acc.2.build_input_batches.add(1);
624            acc.2.build_input_rows.add(batch.num_rows());
625            // Update row count
626            acc.1 += batch.num_rows();
627            // Push batch to output
628            acc.0.push(batch);
629            Ok(acc)
630        })
631        .await?;
632
633    let single_batch = concat_batches(&schema, batches.iter())?;
634
635    // Evaluate physical expression on the buffered side.
636    let buffered_values = on_buffered
637        .evaluate(&single_batch)?
638        .into_array(single_batch.num_rows())?;
639
640    // We add the single batch size + the memory of the join keys
641    // size of the size estimation
642    let size_estimation = get_record_batch_memory_size(&single_batch)
643        + buffered_values.get_array_memory_size();
644    reservation.try_grow(size_estimation)?;
645    metrics.build_mem_used.add(size_estimation);
646
647    // Created visited indices bitmap only if the join type requires it
648    let visited_indices_bitmap = if build_map {
649        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
650        reservation.try_grow(bitmap_size)?;
651        metrics.build_mem_used.add(bitmap_size);
652
653        let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
654        bitmap_buffer.append_n(num_rows, false);
655        bitmap_buffer
656    } else {
657        BooleanBufferBuilder::new(0)
658    };
659
660    let buffered_data = BufferedSideData::new(
661        single_batch,
662        buffered_values,
663        Mutex::new(visited_indices_bitmap),
664        remaining_partitions,
665        reservation,
666    );
667
668    Ok(buffered_data)
669}
670
671pub(super) struct BufferedSideData {
672    pub(super) batch: RecordBatch,
673    values: ArrayRef,
674    pub(super) visited_indices_bitmap: SharedBitmapBuilder,
675    pub(super) remaining_partitions: AtomicUsize,
676    _reservation: MemoryReservation,
677}
678
679impl BufferedSideData {
680    pub(super) fn new(
681        batch: RecordBatch,
682        values: ArrayRef,
683        visited_indices_bitmap: SharedBitmapBuilder,
684        remaining_partitions: usize,
685        reservation: MemoryReservation,
686    ) -> Self {
687        Self {
688            batch,
689            values,
690            visited_indices_bitmap,
691            remaining_partitions: AtomicUsize::new(remaining_partitions),
692            _reservation: reservation,
693        }
694    }
695
696    pub(super) fn batch(&self) -> &RecordBatch {
697        &self.batch
698    }
699
700    pub(super) fn values(&self) -> &ArrayRef {
701        &self.values
702    }
703}
704
705pub(super) enum BufferedSide {
706    /// Indicates that build-side not collected yet
707    Initial(BufferedSideInitialState),
708    /// Indicates that build-side data has been collected
709    Ready(BufferedSideReadyState),
710}
711
712impl BufferedSide {
713    // Takes a mutable state of the buffered row batches
714    pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
715        match self {
716            BufferedSide::Initial(state) => Ok(state),
717            _ => internal_err!("Expected build side in initial state"),
718        }
719    }
720
721    pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
722        match self {
723            BufferedSide::Ready(state) => Ok(state),
724            _ => {
725                internal_err!("Expected build side in ready state")
726            }
727        }
728    }
729
730    /// Tries to extract BuildSideReadyState from BuildSide enum.
731    /// Returns an error if state is not Ready.
732    pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
733        match self {
734            BufferedSide::Ready(state) => Ok(state),
735            _ => internal_err!("Expected build side in ready state"),
736        }
737    }
738}
739
740pub(super) struct BufferedSideInitialState {
741    pub(crate) buffered_fut: OnceFut<BufferedSideData>,
742}
743
744pub(super) struct BufferedSideReadyState {
745    /// Collected build-side data
746    pub(super) buffered_data: Arc<BufferedSideData>,
747}