Skip to main content

datafusion_physical_plan/joins/piecewise_merge_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21    compute::concat_batches,
22    util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{JoinSide, Result, internal_err};
27use datafusion_execution::{
28    SendableRecordBatchStream,
29    memory_pool::{MemoryConsumer, MemoryReservation},
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35    PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::Arc;
42use std::sync::atomic::AtomicUsize;
43
44use crate::execution_plan::{EmissionType, boundedness_from_children};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50    build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::metrics::MetricsSet;
54use crate::{
55    DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties,
56};
57use crate::{
58    ExecutionPlan, PlanProperties,
59    joins::{
60        SharedBitmapBuilder,
61        utils::{BuildProbeJoinMetrics, OnceAsync, OnceFut, build_join_schema},
62    },
63    metrics::ExecutionPlanMetricsSet,
64    spill::get_record_batch_memory_size,
65};
66
67/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
68/// better performance for these workloads than `NestedLoopJoin`
69///
70/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
71/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
72/// [`Operator::GtEq`].:
73/// Examples:
74///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
75///
76/// # Execution Plan Inputs
77/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
78/// 'buffered' side.
79///
80/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
81/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
82///
83/// # Algorithms
84/// Classic joins are processed differently compared to existence joins.
85///
86/// ## Classic Joins (Inner, Full, Left, Right)
87/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
88/// Both sides are sorted so that we can iterate from index 0 to the end on each side.  This ordering ensures
89/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
90/// probe rows from the match position onward, without rescanning earlier probe rows.
91///
92/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
93/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
94/// monotonically as we stream new batches from the stream side.
95///
96/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
97/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
98/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
99/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
100///
101/// The pseudocode for the algorithm looks like this:
102///
103/// ```text
104/// for stream_row in stream_batch:
105///     for buffer_row in buffer_batch:
106///         if compare(stream_row, probe_row):
107///             output stream_row X buffer_batch[buffer_row:]
108///         else:
109///             continue
110/// ```
111///
112/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
113/// the buffered side to find every first match. By doing this, each match can output more result so that output
114/// handling can be better vectorized for performance.
115///
116/// Here is an example:
117///
118/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
119/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
120/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
121/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
122/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
123/// all be larger values.
124///
125/// ```text
126/// SQL statement:
127/// SELECT *
128/// FROM (VALUES (100), (200), (500)) AS streamed(a)
129/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
130///   ON streamed.a < buffered.b;
131///
132/// Processing Row 1:
133///
134///       Sorted Buffered Side                                         Sorted Streamed Side
135///       ┌──────────────────┐                                         ┌──────────────────┐
136///     1 │       100        │                                       1 │       100        │
137///       ├──────────────────┤                                         ├──────────────────┤
138///     2 │       200        │ ─┐                                    2 │       200        │
139///       ├──────────────────┤  │  For row 1 on streamed side with     ├──────────────────┤
140///     3 │       200        │  │  value 100, we emit rows 2 - 5.    3 │       500        │
141///       ├──────────────────┤  │  as matches when the operator is     └──────────────────┘
142///     4 │       300        │  │  `Operator::Lt` (<) Emitting all
143///       ├──────────────────┤  │  rows after the first match (row
144///     5 │       400        │ ─┘  2 buffered side; 100 < 200)
145///       └──────────────────┘
146///
147/// Processing Row 2:
148///   By sorting the streamed side we know
149///
150///       Sorted Buffered Side                                         Sorted Streamed Side
151///       ┌──────────────────┐                                         ┌──────────────────┐
152///     1 │       100        │                                       1 │       100        │
153///       ├──────────────────┤                                         ├──────────────────┤
154///     2 │       200        │ <- Start here when probing for the    2 │       200        │
155///       ├──────────────────┤    streamed side row 2.                 ├──────────────────┤
156///     3 │       200        │                                       3 │       500        │
157///       ├──────────────────┤                                         └──────────────────┘
158///     4 │       300        │
159///       ├──────────────────┤
160///     5 │       400        │
161///       └──────────────────┘
162/// ```
163///
164/// ## Existence Joins (Semi, Anti, Mark)
165/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
166/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
167/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
168///
169/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
170/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
171/// need to be sorted due to only needing to find the min/max.
172///
173/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
174///
175/// The pseudocode for the algorithm looks like this:
176///
177/// ```text
178/// // Using the example of a less than `<` operation
179/// let max = max_batch(streamed_batch)
180///
181/// for buffer_row in buffer_batch:
182///     if buffer_row < max:
183///         output buffer_batch[buffer_row:]
184/// ```
185///
186/// Only need to find the min/max value and iterate through the buffered side once.
187///
188/// Here is an example:
189/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
190/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
191/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
192/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
193/// those rows as matched.
194///
195/// ```text
196/// SQL statement:
197/// SELECT *
198/// FROM (VALUES (500), (200), (300)) AS streamed(a)
199/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
200///   ON streamed.a < buffered.b;
201///
202///          Sorted Buffered Side             Unsorted Streamed Side
203///            ┌──────────────────┐          ┌──────────────────┐
204///          1 │       100        │        1 │       500        │
205///            ├──────────────────┤          ├──────────────────┤
206///          2 │       200        │        2 │       200        │
207///            ├──────────────────┤          ├──────────────────┤
208///          3 │       200        │        3 │       300        │
209///            ├──────────────────┤          └──────────────────┘
210///          4 │       300        │ ─┐
211///            ├──────────────────┤  | We emit matches for row 4 - 5
212///          5 │       400        │ ─┘ on the buffered side.
213///            └──────────────────┘
214///             min value: 200
215/// ```
216///
217/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
218/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
219///
220/// # Partitioning Logic
221/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
222/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
223/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
224/// execution will be responsible for outputting the unmatched rows.
225///
226/// # Performance Explanation (cost)
227/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
228///
229/// R: Buffered Side
230/// S: Streamed Side
231///
232/// ## Piecewise Merge Join (PWMJ)
233///
234/// # Classic Join:
235/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
236/// is found.
237///     Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
238///
239/// # Mark Join:
240/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
241/// within that range.
242///   Complexity: `O(|S| + scan(R[range]))`.
243///
244/// ## Nested Loop Join
245/// Compares every row from `S` with every row from `R`.
246///   Complexity: `O(|S| * |R|)`.
247///
248/// ## Nested Loop Join
249///   Always going to be probe (O(S) * O(R)).
250///
251/// # Further Reference Material
252/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
253#[derive(Debug)]
254pub struct PiecewiseMergeJoinExec {
255    /// Left buffered execution plan
256    pub buffered: Arc<dyn ExecutionPlan>,
257    /// Right streamed execution plan
258    pub streamed: Arc<dyn ExecutionPlan>,
259    /// The two expressions being compared
260    pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
261    /// Comparison operator in the range predicate
262    pub operator: Operator,
263    /// How the join is performed
264    pub join_type: JoinType,
265    /// The schema once the join is applied
266    schema: SchemaRef,
267    /// Buffered data
268    buffered_fut: OnceAsync<BufferedSideData>,
269    /// Execution metrics
270    metrics: ExecutionPlanMetricsSet,
271
272    /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
273    ///
274    /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
275    left_child_plan_required_order: LexOrdering,
276    /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
277    /// Unsorted for mark joins
278    right_batch_required_orders: LexOrdering,
279
280    /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
281    sort_options: SortOptions,
282    /// Cache holding plan properties like equivalences, output partitioning etc.
283    cache: Arc<PlanProperties>,
284    /// Number of partitions to process
285    num_partitions: usize,
286}
287
288impl PiecewiseMergeJoinExec {
289    pub fn try_new(
290        buffered: Arc<dyn ExecutionPlan>,
291        streamed: Arc<dyn ExecutionPlan>,
292        on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
293        operator: Operator,
294        join_type: JoinType,
295        num_partitions: usize,
296    ) -> Result<Self> {
297        // TODO: Implement existence joins for PiecewiseMergeJoin
298        if is_existence_join(join_type) {
299            return not_impl_err!(
300                "Existence Joins are currently not supported for PiecewiseMergeJoin"
301            );
302        }
303
304        // Take the operator and enforce a sort order on the streamed + buffered side based on
305        // the operator type.
306        let sort_options = match operator {
307            Operator::Lt | Operator::LtEq => {
308                // For left existence joins the inputs will be swapped so the sort
309                // options are switched
310                if is_right_existence_join(join_type) {
311                    SortOptions::new(false, true)
312                } else {
313                    SortOptions::new(true, true)
314                }
315            }
316            Operator::Gt | Operator::GtEq => {
317                if is_right_existence_join(join_type) {
318                    SortOptions::new(true, true)
319                } else {
320                    SortOptions::new(false, true)
321                }
322            }
323            _ => {
324                return internal_err!(
325                    "Cannot contain non-range operator in PiecewiseMergeJoinExec"
326                );
327            }
328        };
329
330        // Give the same `sort_option for comparison later`
331        let left_child_plan_required_order =
332            vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
333        let right_batch_required_orders =
334            vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
335
336        let Some(left_child_plan_required_order) =
337            LexOrdering::new(left_child_plan_required_order)
338        else {
339            return internal_err!(
340                "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
341            );
342        };
343        let Some(right_batch_required_orders) =
344            LexOrdering::new(right_batch_required_orders)
345        else {
346            return internal_err!(
347                "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
348            );
349        };
350
351        let buffered_schema = buffered.schema();
352        let streamed_schema = streamed.schema();
353
354        // Create output schema for the join
355        let schema =
356            Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
357        let cache = Self::compute_properties(
358            &buffered,
359            &streamed,
360            Arc::clone(&schema),
361            join_type,
362            &on,
363        )?;
364
365        Ok(Self {
366            streamed,
367            buffered,
368            on,
369            operator,
370            join_type,
371            schema,
372            buffered_fut: Default::default(),
373            metrics: ExecutionPlanMetricsSet::new(),
374            left_child_plan_required_order,
375            right_batch_required_orders,
376            sort_options,
377            cache: Arc::new(cache),
378            num_partitions,
379        })
380    }
381
382    /// Reference to buffered side execution plan
383    pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
384        &self.buffered
385    }
386
387    /// Reference to streamed side execution plan
388    pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
389        &self.streamed
390    }
391
392    /// Join type
393    pub fn join_type(&self) -> JoinType {
394        self.join_type
395    }
396
397    /// Reference to sort options
398    pub fn sort_options(&self) -> &SortOptions {
399        &self.sort_options
400    }
401
402    /// Get probe side (streamed side) for the PiecewiseMergeJoin
403    /// In current implementation, probe side is determined according to join type.
404    pub fn probe_side(join_type: &JoinType) -> JoinSide {
405        match join_type {
406            JoinType::Right
407            | JoinType::Inner
408            | JoinType::Full
409            | JoinType::RightSemi
410            | JoinType::RightAnti
411            | JoinType::RightMark => JoinSide::Right,
412            JoinType::Left
413            | JoinType::LeftAnti
414            | JoinType::LeftSemi
415            | JoinType::LeftMark => JoinSide::Left,
416        }
417    }
418
419    pub fn compute_properties(
420        buffered: &Arc<dyn ExecutionPlan>,
421        streamed: &Arc<dyn ExecutionPlan>,
422        schema: SchemaRef,
423        join_type: JoinType,
424        join_on: &(PhysicalExprRef, PhysicalExprRef),
425    ) -> Result<PlanProperties> {
426        let eq_properties = join_equivalence_properties(
427            buffered.equivalence_properties().clone(),
428            streamed.equivalence_properties().clone(),
429            &join_type,
430            schema,
431            &Self::maintains_input_order(join_type),
432            Some(Self::probe_side(&join_type)),
433            std::slice::from_ref(join_on),
434        )?;
435
436        let output_partitioning =
437            asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
438
439        Ok(PlanProperties::new(
440            eq_properties,
441            output_partitioning,
442            EmissionType::Incremental,
443            boundedness_from_children([buffered, streamed]),
444        ))
445    }
446
447    // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
448    // However, for certain join types the order is maintained. This can be updated in the future after
449    // more testing.
450    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
451        match join_type {
452            // The existence side is expected to come in sorted
453            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
454                vec![false, false]
455            }
456            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
457                vec![false, false]
458            }
459            // Left, Right, Full, Inner Join is not guaranteed to maintain
460            // input order as the streamed side will be sorted during
461            // execution for `PiecewiseMergeJoin`
462            _ => vec![false, false],
463        }
464    }
465
466    // TODO
467    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
468        todo!()
469    }
470
471    fn with_new_children_and_same_properties(
472        &self,
473        mut children: Vec<Arc<dyn ExecutionPlan>>,
474    ) -> Self {
475        let buffered = children.swap_remove(0);
476        let streamed = children.swap_remove(0);
477        Self {
478            buffered,
479            streamed,
480            on: self.on.clone(),
481            operator: self.operator,
482            join_type: self.join_type,
483            schema: Arc::clone(&self.schema),
484            left_child_plan_required_order: self.left_child_plan_required_order.clone(),
485            right_batch_required_orders: self.right_batch_required_orders.clone(),
486            sort_options: self.sort_options,
487            cache: Arc::clone(&self.cache),
488            num_partitions: self.num_partitions,
489
490            // Re-set state.
491            metrics: ExecutionPlanMetricsSet::new(),
492            buffered_fut: Default::default(),
493        }
494    }
495}
496
497impl ExecutionPlan for PiecewiseMergeJoinExec {
498    fn name(&self) -> &str {
499        "PiecewiseMergeJoinExec"
500    }
501
502    fn as_any(&self) -> &dyn std::any::Any {
503        self
504    }
505
506    fn properties(&self) -> &Arc<PlanProperties> {
507        &self.cache
508    }
509
510    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
511        vec![&self.buffered, &self.streamed]
512    }
513
514    fn required_input_distribution(&self) -> Vec<Distribution> {
515        vec![
516            Distribution::SinglePartition,
517            Distribution::UnspecifiedDistribution,
518        ]
519    }
520
521    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
522        // Existence joins don't need to be sorted on one side.
523        if is_right_existence_join(self.join_type) {
524            unimplemented!()
525        } else {
526            // Sort the right side in memory, so we do not need to enforce any sorting
527            vec![
528                Some(OrderingRequirements::from(
529                    self.left_child_plan_required_order.clone(),
530                )),
531                None,
532            ]
533        }
534    }
535
536    fn with_new_children(
537        self: Arc<Self>,
538        children: Vec<Arc<dyn ExecutionPlan>>,
539    ) -> Result<Arc<dyn ExecutionPlan>> {
540        check_if_same_properties!(self, children);
541        match &children[..] {
542            [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
543                Arc::clone(left),
544                Arc::clone(right),
545                self.on.clone(),
546                self.operator,
547                self.join_type,
548                self.num_partitions,
549            )?)),
550            _ => internal_err!(
551                "PiecewiseMergeJoin should have 2 children, found {}",
552                children.len()
553            ),
554        }
555    }
556
557    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
558        Ok(Arc::new(self.with_new_children_and_same_properties(vec![
559            Arc::clone(&self.buffered),
560            Arc::clone(&self.streamed),
561        ])))
562    }
563
564    fn execute(
565        &self,
566        partition: usize,
567        context: Arc<datafusion_execution::TaskContext>,
568    ) -> Result<SendableRecordBatchStream> {
569        let on_buffered = Arc::clone(&self.on.0);
570        let on_streamed = Arc::clone(&self.on.1);
571
572        let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
573        let buffered_fut = self.buffered_fut.try_once(|| {
574            let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
575                .register(context.memory_pool());
576
577            let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
578            Ok(build_buffered_data(
579                buffered_stream,
580                Arc::clone(&on_buffered),
581                metrics.clone(),
582                reservation,
583                build_visited_indices_map(self.join_type),
584                self.num_partitions,
585            ))
586        })?;
587
588        let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
589
590        let batch_size = context.session_config().batch_size();
591
592        // TODO: Add existence joins + this is guarded at physical planner
593        if is_existence_join(self.join_type()) {
594            unreachable!()
595        } else {
596            Ok(Box::pin(ClassicPWMJStream::try_new(
597                Arc::clone(&self.schema),
598                on_streamed,
599                self.join_type,
600                self.operator,
601                streamed,
602                BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
603                PiecewiseMergeJoinStreamState::WaitBufferedSide,
604                self.sort_options,
605                metrics,
606                batch_size,
607            )))
608        }
609    }
610
611    fn metrics(&self) -> Option<MetricsSet> {
612        Some(self.metrics.clone_inner())
613    }
614}
615
616impl DisplayAs for PiecewiseMergeJoinExec {
617    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
618        let on_str = format!(
619            "({} {} {})",
620            fmt_sql(self.on.0.as_ref()),
621            self.operator,
622            fmt_sql(self.on.1.as_ref())
623        );
624
625        match t {
626            DisplayFormatType::Default | DisplayFormatType::Verbose => {
627                write!(
628                    f,
629                    "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
630                    self.operator, self.join_type, on_str
631                )
632            }
633
634            DisplayFormatType::TreeRender => {
635                writeln!(f, "operator={:?}", self.operator)?;
636                if self.join_type != JoinType::Inner {
637                    writeln!(f, "join_type={:?}", self.join_type)?;
638                }
639                writeln!(f, "on={on_str}")
640            }
641        }
642    }
643}
644
645async fn build_buffered_data(
646    buffered: SendableRecordBatchStream,
647    on_buffered: PhysicalExprRef,
648    metrics: BuildProbeJoinMetrics,
649    reservation: MemoryReservation,
650    build_map: bool,
651    remaining_partitions: usize,
652) -> Result<BufferedSideData> {
653    let schema = buffered.schema();
654
655    // Combine batches and record number of rows
656    let initial = (Vec::new(), 0, metrics, reservation);
657    let (batches, num_rows, metrics, reservation) = buffered
658        .try_fold(initial, |mut acc, batch| async {
659            let batch_size = get_record_batch_memory_size(&batch);
660            acc.3.try_grow(batch_size)?;
661            acc.2.build_mem_used.add(batch_size);
662            acc.2.build_input_batches.add(1);
663            acc.2.build_input_rows.add(batch.num_rows());
664            // Update row count
665            acc.1 += batch.num_rows();
666            // Push batch to output
667            acc.0.push(batch);
668            Ok(acc)
669        })
670        .await?;
671
672    let single_batch = concat_batches(&schema, batches.iter())?;
673
674    // Evaluate physical expression on the buffered side.
675    let buffered_values = on_buffered
676        .evaluate(&single_batch)?
677        .into_array(single_batch.num_rows())?;
678
679    // We add the single batch size + the memory of the join keys
680    // size of the size estimation
681    let size_estimation = get_record_batch_memory_size(&single_batch)
682        + buffered_values.get_array_memory_size();
683    reservation.try_grow(size_estimation)?;
684    metrics.build_mem_used.add(size_estimation);
685
686    // Created visited indices bitmap only if the join type requires it
687    let visited_indices_bitmap = if build_map {
688        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
689        reservation.try_grow(bitmap_size)?;
690        metrics.build_mem_used.add(bitmap_size);
691
692        let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
693        bitmap_buffer.append_n(num_rows, false);
694        bitmap_buffer
695    } else {
696        BooleanBufferBuilder::new(0)
697    };
698
699    let buffered_data = BufferedSideData::new(
700        single_batch,
701        buffered_values,
702        Mutex::new(visited_indices_bitmap),
703        remaining_partitions,
704        reservation,
705    );
706
707    Ok(buffered_data)
708}
709
710pub(super) struct BufferedSideData {
711    pub(super) batch: RecordBatch,
712    values: ArrayRef,
713    pub(super) visited_indices_bitmap: SharedBitmapBuilder,
714    pub(super) remaining_partitions: AtomicUsize,
715    _reservation: MemoryReservation,
716}
717
718impl BufferedSideData {
719    pub(super) fn new(
720        batch: RecordBatch,
721        values: ArrayRef,
722        visited_indices_bitmap: SharedBitmapBuilder,
723        remaining_partitions: usize,
724        reservation: MemoryReservation,
725    ) -> Self {
726        Self {
727            batch,
728            values,
729            visited_indices_bitmap,
730            remaining_partitions: AtomicUsize::new(remaining_partitions),
731            _reservation: reservation,
732        }
733    }
734
735    pub(super) fn batch(&self) -> &RecordBatch {
736        &self.batch
737    }
738
739    pub(super) fn values(&self) -> &ArrayRef {
740        &self.values
741    }
742}
743
744pub(super) enum BufferedSide {
745    /// Indicates that build-side not collected yet
746    Initial(BufferedSideInitialState),
747    /// Indicates that build-side data has been collected
748    Ready(BufferedSideReadyState),
749}
750
751impl BufferedSide {
752    // Takes a mutable state of the buffered row batches
753    pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
754        match self {
755            BufferedSide::Initial(state) => Ok(state),
756            _ => internal_err!("Expected build side in initial state"),
757        }
758    }
759
760    pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
761        match self {
762            BufferedSide::Ready(state) => Ok(state),
763            _ => {
764                internal_err!("Expected build side in ready state")
765            }
766        }
767    }
768
769    /// Tries to extract BuildSideReadyState from BuildSide enum.
770    /// Returns an error if state is not Ready.
771    pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
772        match self {
773            BufferedSide::Ready(state) => Ok(state),
774            _ => internal_err!("Expected build side in ready state"),
775        }
776    }
777}
778
779pub(super) struct BufferedSideInitialState {
780    pub(crate) buffered_fut: OnceFut<BufferedSideData>,
781}
782
783pub(super) struct BufferedSideReadyState {
784    /// Collected build-side data
785    pub(super) buffered_data: Arc<BufferedSideData>,
786}