datafusion_physical_plan/joins/piecewise_merge_join/exec.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20 array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21 compute::concat_batches,
22 util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{JoinSide, Result, internal_err};
27use datafusion_execution::{
28 SendableRecordBatchStream,
29 memory_pool::{MemoryConsumer, MemoryReservation},
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34 Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35 PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::Arc;
42use std::sync::atomic::AtomicUsize;
43
44use crate::execution_plan::{EmissionType, boundedness_from_children};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47 ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50 build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::metrics::MetricsSet;
54use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
55use crate::{
56 ExecutionPlan, PlanProperties,
57 joins::{
58 SharedBitmapBuilder,
59 utils::{BuildProbeJoinMetrics, OnceAsync, OnceFut, build_join_schema},
60 },
61 metrics::ExecutionPlanMetricsSet,
62 spill::get_record_batch_memory_size,
63};
64
65/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
66/// better performance for these workloads than `NestedLoopJoin`
67///
68/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
69/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
70/// [`Operator::GtEq`].:
71/// Examples:
72/// - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
73///
74/// # Execution Plan Inputs
75/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
76/// 'buffered' side.
77///
78/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
79/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
80///
81/// # Algorithms
82/// Classic joins are processed differently compared to existence joins.
83///
84/// ## Classic Joins (Inner, Full, Left, Right)
85/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
86/// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures
87/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
88/// probe rows from the match position onward, without rescanning earlier probe rows.
89///
90/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
91/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
92/// monotonically as we stream new batches from the stream side.
93///
94/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
95/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
96/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
97/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
98///
99/// The pseudocode for the algorithm looks like this:
100///
101/// ```text
102/// for stream_row in stream_batch:
103/// for buffer_row in buffer_batch:
104/// if compare(stream_row, probe_row):
105/// output stream_row X buffer_batch[buffer_row:]
106/// else:
107/// continue
108/// ```
109///
110/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
111/// the buffered side to find every first match. By doing this, each match can output more result so that output
112/// handling can be better vectorized for performance.
113///
114/// Here is an example:
115///
116/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
117/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
118/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
119/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
120/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
121/// all be larger values.
122///
123/// ```text
124/// SQL statement:
125/// SELECT *
126/// FROM (VALUES (100), (200), (500)) AS streamed(a)
127/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
128/// ON streamed.a < buffered.b;
129///
130/// Processing Row 1:
131///
132/// Sorted Buffered Side Sorted Streamed Side
133/// ┌──────────────────┐ ┌──────────────────┐
134/// 1 │ 100 │ 1 │ 100 │
135/// ├──────────────────┤ ├──────────────────┤
136/// 2 │ 200 │ ─┐ 2 │ 200 │
137/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤
138/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │
139/// ├──────────────────┤ │ as matches when the operator is └──────────────────┘
140/// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all
141/// ├──────────────────┤ │ rows after the first match (row
142/// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200)
143/// └──────────────────┘
144///
145/// Processing Row 2:
146/// By sorting the streamed side we know
147///
148/// Sorted Buffered Side Sorted Streamed Side
149/// ┌──────────────────┐ ┌──────────────────┐
150/// 1 │ 100 │ 1 │ 100 │
151/// ├──────────────────┤ ├──────────────────┤
152/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │
153/// ├──────────────────┤ streamed side row 2. ├──────────────────┤
154/// 3 │ 200 │ 3 │ 500 │
155/// ├──────────────────┤ └──────────────────┘
156/// 4 │ 300 │
157/// ├──────────────────┤
158/// 5 │ 400 │
159/// └──────────────────┘
160/// ```
161///
162/// ## Existence Joins (Semi, Anti, Mark)
163/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
164/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
165/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
166///
167/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
168/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
169/// need to be sorted due to only needing to find the min/max.
170///
171/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
172///
173/// The pseudocode for the algorithm looks like this:
174///
175/// ```text
176/// // Using the example of a less than `<` operation
177/// let max = max_batch(streamed_batch)
178///
179/// for buffer_row in buffer_batch:
180/// if buffer_row < max:
181/// output buffer_batch[buffer_row:]
182/// ```
183///
184/// Only need to find the min/max value and iterate through the buffered side once.
185///
186/// Here is an example:
187/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
188/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
189/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
190/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
191/// those rows as matched.
192///
193/// ```text
194/// SQL statement:
195/// SELECT *
196/// FROM (VALUES (500), (200), (300)) AS streamed(a)
197/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
198/// ON streamed.a < buffered.b;
199///
200/// Sorted Buffered Side Unsorted Streamed Side
201/// ┌──────────────────┐ ┌──────────────────┐
202/// 1 │ 100 │ 1 │ 500 │
203/// ├──────────────────┤ ├──────────────────┤
204/// 2 │ 200 │ 2 │ 200 │
205/// ├──────────────────┤ ├──────────────────┤
206/// 3 │ 200 │ 3 │ 300 │
207/// ├──────────────────┤ └──────────────────┘
208/// 4 │ 300 │ ─┐
209/// ├──────────────────┤ | We emit matches for row 4 - 5
210/// 5 │ 400 │ ─┘ on the buffered side.
211/// └──────────────────┘
212/// min value: 200
213/// ```
214///
215/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
216/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
217///
218/// # Partitioning Logic
219/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
220/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
221/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
222/// execution will be responsible for outputting the unmatched rows.
223///
224/// # Performance Explanation (cost)
225/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
226///
227/// R: Buffered Side
228/// S: Streamed Side
229///
230/// ## Piecewise Merge Join (PWMJ)
231///
232/// # Classic Join:
233/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
234/// is found.
235/// Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
236///
237/// # Mark Join:
238/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
239/// within that range.
240/// Complexity: `O(|S| + scan(R[range]))`.
241///
242/// ## Nested Loop Join
243/// Compares every row from `S` with every row from `R`.
244/// Complexity: `O(|S| * |R|)`.
245///
246/// ## Nested Loop Join
247/// Always going to be probe (O(S) * O(R)).
248///
249/// # Further Reference Material
250/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
251#[derive(Debug)]
252pub struct PiecewiseMergeJoinExec {
253 /// Left buffered execution plan
254 pub buffered: Arc<dyn ExecutionPlan>,
255 /// Right streamed execution plan
256 pub streamed: Arc<dyn ExecutionPlan>,
257 /// The two expressions being compared
258 pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
259 /// Comparison operator in the range predicate
260 pub operator: Operator,
261 /// How the join is performed
262 pub join_type: JoinType,
263 /// The schema once the join is applied
264 schema: SchemaRef,
265 /// Buffered data
266 buffered_fut: OnceAsync<BufferedSideData>,
267 /// Execution metrics
268 metrics: ExecutionPlanMetricsSet,
269
270 /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
271 ///
272 /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
273 left_child_plan_required_order: LexOrdering,
274 /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
275 /// Unsorted for mark joins
276 #[expect(dead_code)]
277 right_batch_required_orders: LexOrdering,
278
279 /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
280 sort_options: SortOptions,
281 /// Cache holding plan properties like equivalences, output partitioning etc.
282 cache: PlanProperties,
283 /// Number of partitions to process
284 num_partitions: usize,
285}
286
287impl PiecewiseMergeJoinExec {
288 pub fn try_new(
289 buffered: Arc<dyn ExecutionPlan>,
290 streamed: Arc<dyn ExecutionPlan>,
291 on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
292 operator: Operator,
293 join_type: JoinType,
294 num_partitions: usize,
295 ) -> Result<Self> {
296 // TODO: Implement existence joins for PiecewiseMergeJoin
297 if is_existence_join(join_type) {
298 return not_impl_err!(
299 "Existence Joins are currently not supported for PiecewiseMergeJoin"
300 );
301 }
302
303 // Take the operator and enforce a sort order on the streamed + buffered side based on
304 // the operator type.
305 let sort_options = match operator {
306 Operator::Lt | Operator::LtEq => {
307 // For left existence joins the inputs will be swapped so the sort
308 // options are switched
309 if is_right_existence_join(join_type) {
310 SortOptions::new(false, true)
311 } else {
312 SortOptions::new(true, true)
313 }
314 }
315 Operator::Gt | Operator::GtEq => {
316 if is_right_existence_join(join_type) {
317 SortOptions::new(true, true)
318 } else {
319 SortOptions::new(false, true)
320 }
321 }
322 _ => {
323 return internal_err!(
324 "Cannot contain non-range operator in PiecewiseMergeJoinExec"
325 );
326 }
327 };
328
329 // Give the same `sort_option for comparison later`
330 let left_child_plan_required_order =
331 vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
332 let right_batch_required_orders =
333 vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
334
335 let Some(left_child_plan_required_order) =
336 LexOrdering::new(left_child_plan_required_order)
337 else {
338 return internal_err!(
339 "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
340 );
341 };
342 let Some(right_batch_required_orders) =
343 LexOrdering::new(right_batch_required_orders)
344 else {
345 return internal_err!(
346 "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
347 );
348 };
349
350 let buffered_schema = buffered.schema();
351 let streamed_schema = streamed.schema();
352
353 // Create output schema for the join
354 let schema =
355 Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
356 let cache = Self::compute_properties(
357 &buffered,
358 &streamed,
359 Arc::clone(&schema),
360 join_type,
361 &on,
362 )?;
363
364 Ok(Self {
365 streamed,
366 buffered,
367 on,
368 operator,
369 join_type,
370 schema,
371 buffered_fut: Default::default(),
372 metrics: ExecutionPlanMetricsSet::new(),
373 left_child_plan_required_order,
374 right_batch_required_orders,
375 sort_options,
376 cache,
377 num_partitions,
378 })
379 }
380
381 /// Reference to buffered side execution plan
382 pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
383 &self.buffered
384 }
385
386 /// Reference to streamed side execution plan
387 pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
388 &self.streamed
389 }
390
391 /// Join type
392 pub fn join_type(&self) -> JoinType {
393 self.join_type
394 }
395
396 /// Reference to sort options
397 pub fn sort_options(&self) -> &SortOptions {
398 &self.sort_options
399 }
400
401 /// Get probe side (streamed side) for the PiecewiseMergeJoin
402 /// In current implementation, probe side is determined according to join type.
403 pub fn probe_side(join_type: &JoinType) -> JoinSide {
404 match join_type {
405 JoinType::Right
406 | JoinType::Inner
407 | JoinType::Full
408 | JoinType::RightSemi
409 | JoinType::RightAnti
410 | JoinType::RightMark => JoinSide::Right,
411 JoinType::Left
412 | JoinType::LeftAnti
413 | JoinType::LeftSemi
414 | JoinType::LeftMark => JoinSide::Left,
415 }
416 }
417
418 pub fn compute_properties(
419 buffered: &Arc<dyn ExecutionPlan>,
420 streamed: &Arc<dyn ExecutionPlan>,
421 schema: SchemaRef,
422 join_type: JoinType,
423 join_on: &(PhysicalExprRef, PhysicalExprRef),
424 ) -> Result<PlanProperties> {
425 let eq_properties = join_equivalence_properties(
426 buffered.equivalence_properties().clone(),
427 streamed.equivalence_properties().clone(),
428 &join_type,
429 schema,
430 &Self::maintains_input_order(join_type),
431 Some(Self::probe_side(&join_type)),
432 std::slice::from_ref(join_on),
433 )?;
434
435 let output_partitioning =
436 asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
437
438 Ok(PlanProperties::new(
439 eq_properties,
440 output_partitioning,
441 EmissionType::Incremental,
442 boundedness_from_children([buffered, streamed]),
443 ))
444 }
445
446 // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
447 // However, for certain join types the order is maintained. This can be updated in the future after
448 // more testing.
449 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
450 match join_type {
451 // The existence side is expected to come in sorted
452 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
453 vec![false, false]
454 }
455 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
456 vec![false, false]
457 }
458 // Left, Right, Full, Inner Join is not guaranteed to maintain
459 // input order as the streamed side will be sorted during
460 // execution for `PiecewiseMergeJoin`
461 _ => vec![false, false],
462 }
463 }
464
465 // TODO
466 pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
467 todo!()
468 }
469}
470
471impl ExecutionPlan for PiecewiseMergeJoinExec {
472 fn name(&self) -> &str {
473 "PiecewiseMergeJoinExec"
474 }
475
476 fn as_any(&self) -> &dyn std::any::Any {
477 self
478 }
479
480 fn properties(&self) -> &PlanProperties {
481 &self.cache
482 }
483
484 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
485 vec![&self.buffered, &self.streamed]
486 }
487
488 fn required_input_distribution(&self) -> Vec<Distribution> {
489 vec![
490 Distribution::SinglePartition,
491 Distribution::UnspecifiedDistribution,
492 ]
493 }
494
495 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
496 // Existence joins don't need to be sorted on one side.
497 if is_right_existence_join(self.join_type) {
498 unimplemented!()
499 } else {
500 // Sort the right side in memory, so we do not need to enforce any sorting
501 vec![
502 Some(OrderingRequirements::from(
503 self.left_child_plan_required_order.clone(),
504 )),
505 None,
506 ]
507 }
508 }
509
510 fn with_new_children(
511 self: Arc<Self>,
512 children: Vec<Arc<dyn ExecutionPlan>>,
513 ) -> Result<Arc<dyn ExecutionPlan>> {
514 match &children[..] {
515 [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
516 Arc::clone(left),
517 Arc::clone(right),
518 self.on.clone(),
519 self.operator,
520 self.join_type,
521 self.num_partitions,
522 )?)),
523 _ => internal_err!(
524 "PiecewiseMergeJoin should have 2 children, found {}",
525 children.len()
526 ),
527 }
528 }
529
530 fn execute(
531 &self,
532 partition: usize,
533 context: Arc<datafusion_execution::TaskContext>,
534 ) -> Result<SendableRecordBatchStream> {
535 let on_buffered = Arc::clone(&self.on.0);
536 let on_streamed = Arc::clone(&self.on.1);
537
538 let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
539 let buffered_fut = self.buffered_fut.try_once(|| {
540 let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
541 .register(context.memory_pool());
542
543 let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
544 Ok(build_buffered_data(
545 buffered_stream,
546 Arc::clone(&on_buffered),
547 metrics.clone(),
548 reservation,
549 build_visited_indices_map(self.join_type),
550 self.num_partitions,
551 ))
552 })?;
553
554 let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
555
556 let batch_size = context.session_config().batch_size();
557
558 // TODO: Add existence joins + this is guarded at physical planner
559 if is_existence_join(self.join_type()) {
560 unreachable!()
561 } else {
562 Ok(Box::pin(ClassicPWMJStream::try_new(
563 Arc::clone(&self.schema),
564 on_streamed,
565 self.join_type,
566 self.operator,
567 streamed,
568 BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
569 PiecewiseMergeJoinStreamState::WaitBufferedSide,
570 self.sort_options,
571 metrics,
572 batch_size,
573 )))
574 }
575 }
576
577 fn metrics(&self) -> Option<MetricsSet> {
578 Some(self.metrics.clone_inner())
579 }
580}
581
582impl DisplayAs for PiecewiseMergeJoinExec {
583 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
584 let on_str = format!(
585 "({} {} {})",
586 fmt_sql(self.on.0.as_ref()),
587 self.operator,
588 fmt_sql(self.on.1.as_ref())
589 );
590
591 match t {
592 DisplayFormatType::Default | DisplayFormatType::Verbose => {
593 write!(
594 f,
595 "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
596 self.operator, self.join_type, on_str
597 )
598 }
599
600 DisplayFormatType::TreeRender => {
601 writeln!(f, "operator={:?}", self.operator)?;
602 if self.join_type != JoinType::Inner {
603 writeln!(f, "join_type={:?}", self.join_type)?;
604 }
605 writeln!(f, "on={on_str}")
606 }
607 }
608 }
609}
610
611async fn build_buffered_data(
612 buffered: SendableRecordBatchStream,
613 on_buffered: PhysicalExprRef,
614 metrics: BuildProbeJoinMetrics,
615 reservation: MemoryReservation,
616 build_map: bool,
617 remaining_partitions: usize,
618) -> Result<BufferedSideData> {
619 let schema = buffered.schema();
620
621 // Combine batches and record number of rows
622 let initial = (Vec::new(), 0, metrics, reservation);
623 let (batches, num_rows, metrics, mut reservation) = buffered
624 .try_fold(initial, |mut acc, batch| async {
625 let batch_size = get_record_batch_memory_size(&batch);
626 acc.3.try_grow(batch_size)?;
627 acc.2.build_mem_used.add(batch_size);
628 acc.2.build_input_batches.add(1);
629 acc.2.build_input_rows.add(batch.num_rows());
630 // Update row count
631 acc.1 += batch.num_rows();
632 // Push batch to output
633 acc.0.push(batch);
634 Ok(acc)
635 })
636 .await?;
637
638 let single_batch = concat_batches(&schema, batches.iter())?;
639
640 // Evaluate physical expression on the buffered side.
641 let buffered_values = on_buffered
642 .evaluate(&single_batch)?
643 .into_array(single_batch.num_rows())?;
644
645 // We add the single batch size + the memory of the join keys
646 // size of the size estimation
647 let size_estimation = get_record_batch_memory_size(&single_batch)
648 + buffered_values.get_array_memory_size();
649 reservation.try_grow(size_estimation)?;
650 metrics.build_mem_used.add(size_estimation);
651
652 // Created visited indices bitmap only if the join type requires it
653 let visited_indices_bitmap = if build_map {
654 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
655 reservation.try_grow(bitmap_size)?;
656 metrics.build_mem_used.add(bitmap_size);
657
658 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
659 bitmap_buffer.append_n(num_rows, false);
660 bitmap_buffer
661 } else {
662 BooleanBufferBuilder::new(0)
663 };
664
665 let buffered_data = BufferedSideData::new(
666 single_batch,
667 buffered_values,
668 Mutex::new(visited_indices_bitmap),
669 remaining_partitions,
670 reservation,
671 );
672
673 Ok(buffered_data)
674}
675
676pub(super) struct BufferedSideData {
677 pub(super) batch: RecordBatch,
678 values: ArrayRef,
679 pub(super) visited_indices_bitmap: SharedBitmapBuilder,
680 pub(super) remaining_partitions: AtomicUsize,
681 _reservation: MemoryReservation,
682}
683
684impl BufferedSideData {
685 pub(super) fn new(
686 batch: RecordBatch,
687 values: ArrayRef,
688 visited_indices_bitmap: SharedBitmapBuilder,
689 remaining_partitions: usize,
690 reservation: MemoryReservation,
691 ) -> Self {
692 Self {
693 batch,
694 values,
695 visited_indices_bitmap,
696 remaining_partitions: AtomicUsize::new(remaining_partitions),
697 _reservation: reservation,
698 }
699 }
700
701 pub(super) fn batch(&self) -> &RecordBatch {
702 &self.batch
703 }
704
705 pub(super) fn values(&self) -> &ArrayRef {
706 &self.values
707 }
708}
709
710pub(super) enum BufferedSide {
711 /// Indicates that build-side not collected yet
712 Initial(BufferedSideInitialState),
713 /// Indicates that build-side data has been collected
714 Ready(BufferedSideReadyState),
715}
716
717impl BufferedSide {
718 // Takes a mutable state of the buffered row batches
719 pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
720 match self {
721 BufferedSide::Initial(state) => Ok(state),
722 _ => internal_err!("Expected build side in initial state"),
723 }
724 }
725
726 pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
727 match self {
728 BufferedSide::Ready(state) => Ok(state),
729 _ => {
730 internal_err!("Expected build side in ready state")
731 }
732 }
733 }
734
735 /// Tries to extract BuildSideReadyState from BuildSide enum.
736 /// Returns an error if state is not Ready.
737 pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
738 match self {
739 BufferedSide::Ready(state) => Ok(state),
740 _ => internal_err!("Expected build side in ready state"),
741 }
742 }
743}
744
745pub(super) struct BufferedSideInitialState {
746 pub(crate) buffered_fut: OnceFut<BufferedSideData>,
747}
748
749pub(super) struct BufferedSideReadyState {
750 /// Collected build-side data
751 pub(super) buffered_data: Arc<BufferedSideData>,
752}