datafusion_physical_plan/joins/piecewise_merge_join/exec.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20 array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21 compute::concat_batches,
22 util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{internal_err, JoinSide, Result};
27use datafusion_execution::{
28 memory_pool::{MemoryConsumer, MemoryReservation},
29 SendableRecordBatchStream,
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34 Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35 PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::atomic::AtomicUsize;
42use std::sync::Arc;
43
44use crate::execution_plan::{boundedness_from_children, EmissionType};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47 ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50 build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::{
54 joins::{
55 utils::{build_join_schema, BuildProbeJoinMetrics, OnceAsync, OnceFut},
56 SharedBitmapBuilder,
57 },
58 metrics::ExecutionPlanMetricsSet,
59 spill::get_record_batch_memory_size,
60 ExecutionPlan, PlanProperties,
61};
62use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
63
64/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
65/// better performance for these workloads than `NestedLoopJoin`
66///
67/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
68/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
69/// [`Operator::GtEq`].:
70/// Examples:
71/// - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
72///
73/// # Execution Plan Inputs
74/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
75/// 'buffered' side.
76///
77/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
78/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
79///
80/// # Algorithms
81/// Classic joins are processed differently compared to existence joins.
82///
83/// ## Classic Joins (Inner, Full, Left, Right)
84/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
85/// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures
86/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
87/// probe rows from the match position onward, without rescanning earlier probe rows.
88///
89/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
90/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
91/// monotonically as we stream new batches from the stream side.
92///
93/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
94/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
95/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
96/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
97///
98/// The pseudocode for the algorithm looks like this:
99///
100/// ```text
101/// for stream_row in stream_batch:
102/// for buffer_row in buffer_batch:
103/// if compare(stream_row, probe_row):
104/// output stream_row X buffer_batch[buffer_row:]
105/// else:
106/// continue
107/// ```
108///
109/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
110/// the buffered side to find every first match. By doing this, each match can output more result so that output
111/// handling can be better vectorized for performance.
112///
113/// Here is an example:
114///
115/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
116/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
117/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
118/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
119/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
120/// all be larger values.
121///
122/// ```text
123/// SQL statement:
124/// SELECT *
125/// FROM (VALUES (100), (200), (500)) AS streamed(a)
126/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
127/// ON streamed.a < buffered.b;
128///
129/// Processing Row 1:
130///
131/// Sorted Buffered Side Sorted Streamed Side
132/// ┌──────────────────┐ ┌──────────────────┐
133/// 1 │ 100 │ 1 │ 100 │
134/// ├──────────────────┤ ├──────────────────┤
135/// 2 │ 200 │ ─┐ 2 │ 200 │
136/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤
137/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │
138/// ├──────────────────┤ │ as matches when the operator is └──────────────────┘
139/// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all
140/// ├──────────────────┤ │ rows after the first match (row
141/// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200)
142/// └──────────────────┘
143///
144/// Processing Row 2:
145/// By sorting the streamed side we know
146///
147/// Sorted Buffered Side Sorted Streamed Side
148/// ┌──────────────────┐ ┌──────────────────┐
149/// 1 │ 100 │ 1 │ 100 │
150/// ├──────────────────┤ ├──────────────────┤
151/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │
152/// ├──────────────────┤ streamed side row 2. ├──────────────────┤
153/// 3 │ 200 │ 3 │ 500 │
154/// ├──────────────────┤ └──────────────────┘
155/// 4 │ 300 │
156/// ├──────────────────┤
157/// 5 │ 400 │
158/// └──────────────────┘
159/// ```
160///
161/// ## Existence Joins (Semi, Anti, Mark)
162/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
163/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
164/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
165///
166/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
167/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
168/// need to be sorted due to only needing to find the min/max.
169///
170/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
171///
172/// The pseudocode for the algorithm looks like this:
173///
174/// ```text
175/// // Using the example of a less than `<` operation
176/// let max = max_batch(streamed_batch)
177///
178/// for buffer_row in buffer_batch:
179/// if buffer_row < max:
180/// output buffer_batch[buffer_row:]
181/// ```
182///
183/// Only need to find the min/max value and iterate through the buffered side once.
184///
185/// Here is an example:
186/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
187/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
188/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
189/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
190/// those rows as matched.
191///
192/// ```text
193/// SQL statement:
194/// SELECT *
195/// FROM (VALUES (500), (200), (300)) AS streamed(a)
196/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
197/// ON streamed.a < buffered.b;
198///
199/// Sorted Buffered Side Unsorted Streamed Side
200/// ┌──────────────────┐ ┌──────────────────┐
201/// 1 │ 100 │ 1 │ 500 │
202/// ├──────────────────┤ ├──────────────────┤
203/// 2 │ 200 │ 2 │ 200 │
204/// ├──────────────────┤ ├──────────────────┤
205/// 3 │ 200 │ 3 │ 300 │
206/// ├──────────────────┤ └──────────────────┘
207/// 4 │ 300 │ ─┐
208/// ├──────────────────┤ | We emit matches for row 4 - 5
209/// 5 │ 400 │ ─┘ on the buffered side.
210/// └──────────────────┘
211/// min value: 200
212/// ```
213///
214/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
215/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
216///
217/// # Partitioning Logic
218/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
219/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
220/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
221/// execution will be responsible for outputting the unmatched rows.
222///
223/// # Performance Explanation (cost)
224/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
225///
226/// R: Buffered Side
227/// S: Streamed Side
228///
229/// ## Piecewise Merge Join (PWMJ)
230///
231/// # Classic Join:
232/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
233/// is found.
234/// Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
235///
236/// # Mark Join:
237/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
238/// within that range.
239/// Complexity: `O(|S| + scan(R[range]))`.
240///
241/// ## Nested Loop Join
242/// Compares every row from `S` with every row from `R`.
243/// Complexity: `O(|S| * |R|)`.
244///
245/// ## Nested Loop Join
246/// Always going to be probe (O(S) * O(R)).
247///
248/// # Further Reference Material
249/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
250#[derive(Debug)]
251pub struct PiecewiseMergeJoinExec {
252 /// Left buffered execution plan
253 pub buffered: Arc<dyn ExecutionPlan>,
254 /// Right streamed execution plan
255 pub streamed: Arc<dyn ExecutionPlan>,
256 /// The two expressions being compared
257 pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
258 /// Comparison operator in the range predicate
259 pub operator: Operator,
260 /// How the join is performed
261 pub join_type: JoinType,
262 /// The schema once the join is applied
263 schema: SchemaRef,
264 /// Buffered data
265 buffered_fut: OnceAsync<BufferedSideData>,
266 /// Execution metrics
267 metrics: ExecutionPlanMetricsSet,
268
269 /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
270 ///
271 /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
272 left_child_plan_required_order: LexOrdering,
273 /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
274 /// Unsorted for mark joins
275 #[allow(unused)]
276 right_batch_required_orders: LexOrdering,
277
278 /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
279 sort_options: SortOptions,
280 /// Cache holding plan properties like equivalences, output partitioning etc.
281 cache: PlanProperties,
282 /// Number of partitions to process
283 num_partitions: usize,
284}
285
286impl PiecewiseMergeJoinExec {
287 pub fn try_new(
288 buffered: Arc<dyn ExecutionPlan>,
289 streamed: Arc<dyn ExecutionPlan>,
290 on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
291 operator: Operator,
292 join_type: JoinType,
293 num_partitions: usize,
294 ) -> Result<Self> {
295 // TODO: Implement existence joins for PiecewiseMergeJoin
296 if is_existence_join(join_type) {
297 return not_impl_err!(
298 "Existence Joins are currently not supported for PiecewiseMergeJoin"
299 );
300 }
301
302 // Take the operator and enforce a sort order on the streamed + buffered side based on
303 // the operator type.
304 let sort_options = match operator {
305 Operator::Lt | Operator::LtEq => {
306 // For left existence joins the inputs will be swapped so the sort
307 // options are switched
308 if is_right_existence_join(join_type) {
309 SortOptions::new(false, true)
310 } else {
311 SortOptions::new(true, true)
312 }
313 }
314 Operator::Gt | Operator::GtEq => {
315 if is_right_existence_join(join_type) {
316 SortOptions::new(true, true)
317 } else {
318 SortOptions::new(false, true)
319 }
320 }
321 _ => {
322 return internal_err!(
323 "Cannot contain non-range operator in PiecewiseMergeJoinExec"
324 )
325 }
326 };
327
328 // Give the same `sort_option for comparison later`
329 let left_child_plan_required_order =
330 vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
331 let right_batch_required_orders =
332 vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
333
334 let Some(left_child_plan_required_order) =
335 LexOrdering::new(left_child_plan_required_order)
336 else {
337 return internal_err!(
338 "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
339 );
340 };
341 let Some(right_batch_required_orders) =
342 LexOrdering::new(right_batch_required_orders)
343 else {
344 return internal_err!(
345 "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
346 );
347 };
348
349 let buffered_schema = buffered.schema();
350 let streamed_schema = streamed.schema();
351
352 // Create output schema for the join
353 let schema =
354 Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
355 let cache = Self::compute_properties(
356 &buffered,
357 &streamed,
358 Arc::clone(&schema),
359 join_type,
360 &on,
361 )?;
362
363 Ok(Self {
364 streamed,
365 buffered,
366 on,
367 operator,
368 join_type,
369 schema,
370 buffered_fut: Default::default(),
371 metrics: ExecutionPlanMetricsSet::new(),
372 left_child_plan_required_order,
373 right_batch_required_orders,
374 sort_options,
375 cache,
376 num_partitions,
377 })
378 }
379
380 /// Reference to buffered side execution plan
381 pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
382 &self.buffered
383 }
384
385 /// Reference to streamed side execution plan
386 pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
387 &self.streamed
388 }
389
390 /// Join type
391 pub fn join_type(&self) -> JoinType {
392 self.join_type
393 }
394
395 /// Reference to sort options
396 pub fn sort_options(&self) -> &SortOptions {
397 &self.sort_options
398 }
399
400 /// Get probe side (streamed side) for the PiecewiseMergeJoin
401 /// In current implementation, probe side is determined according to join type.
402 pub fn probe_side(join_type: &JoinType) -> JoinSide {
403 match join_type {
404 JoinType::Right
405 | JoinType::Inner
406 | JoinType::Full
407 | JoinType::RightSemi
408 | JoinType::RightAnti
409 | JoinType::RightMark => JoinSide::Right,
410 JoinType::Left
411 | JoinType::LeftAnti
412 | JoinType::LeftSemi
413 | JoinType::LeftMark => JoinSide::Left,
414 }
415 }
416
417 pub fn compute_properties(
418 buffered: &Arc<dyn ExecutionPlan>,
419 streamed: &Arc<dyn ExecutionPlan>,
420 schema: SchemaRef,
421 join_type: JoinType,
422 join_on: &(PhysicalExprRef, PhysicalExprRef),
423 ) -> Result<PlanProperties> {
424 let eq_properties = join_equivalence_properties(
425 buffered.equivalence_properties().clone(),
426 streamed.equivalence_properties().clone(),
427 &join_type,
428 schema,
429 &Self::maintains_input_order(join_type),
430 Some(Self::probe_side(&join_type)),
431 std::slice::from_ref(join_on),
432 )?;
433
434 let output_partitioning =
435 asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
436
437 Ok(PlanProperties::new(
438 eq_properties,
439 output_partitioning,
440 EmissionType::Incremental,
441 boundedness_from_children([buffered, streamed]),
442 ))
443 }
444
445 // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
446 // However, for certain join types the order is maintained. This can be updated in the future after
447 // more testing.
448 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
449 match join_type {
450 // The existence side is expected to come in sorted
451 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
452 vec![false, false]
453 }
454 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
455 vec![false, false]
456 }
457 // Left, Right, Full, Inner Join is not guaranteed to maintain
458 // input order as the streamed side will be sorted during
459 // execution for `PiecewiseMergeJoin`
460 _ => vec![false, false],
461 }
462 }
463
464 // TODO
465 pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
466 todo!()
467 }
468}
469
470impl ExecutionPlan for PiecewiseMergeJoinExec {
471 fn name(&self) -> &str {
472 "PiecewiseMergeJoinExec"
473 }
474
475 fn as_any(&self) -> &dyn std::any::Any {
476 self
477 }
478
479 fn properties(&self) -> &PlanProperties {
480 &self.cache
481 }
482
483 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
484 vec![&self.buffered, &self.streamed]
485 }
486
487 fn required_input_distribution(&self) -> Vec<Distribution> {
488 vec![
489 Distribution::SinglePartition,
490 Distribution::UnspecifiedDistribution,
491 ]
492 }
493
494 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
495 // Existence joins don't need to be sorted on one side.
496 if is_right_existence_join(self.join_type) {
497 unimplemented!()
498 } else {
499 // Sort the right side in memory, so we do not need to enforce any sorting
500 vec![
501 Some(OrderingRequirements::from(
502 self.left_child_plan_required_order.clone(),
503 )),
504 None,
505 ]
506 }
507 }
508
509 fn with_new_children(
510 self: Arc<Self>,
511 children: Vec<Arc<dyn ExecutionPlan>>,
512 ) -> Result<Arc<dyn ExecutionPlan>> {
513 match &children[..] {
514 [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
515 Arc::clone(left),
516 Arc::clone(right),
517 self.on.clone(),
518 self.operator,
519 self.join_type,
520 self.num_partitions,
521 )?)),
522 _ => internal_err!(
523 "PiecewiseMergeJoin should have 2 children, found {}",
524 children.len()
525 ),
526 }
527 }
528
529 fn execute(
530 &self,
531 partition: usize,
532 context: Arc<datafusion_execution::TaskContext>,
533 ) -> Result<SendableRecordBatchStream> {
534 let on_buffered = Arc::clone(&self.on.0);
535 let on_streamed = Arc::clone(&self.on.1);
536
537 let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
538 let buffered_fut = self.buffered_fut.try_once(|| {
539 let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
540 .register(context.memory_pool());
541
542 let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
543 Ok(build_buffered_data(
544 buffered_stream,
545 Arc::clone(&on_buffered),
546 metrics.clone(),
547 reservation,
548 build_visited_indices_map(self.join_type),
549 self.num_partitions,
550 ))
551 })?;
552
553 let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
554
555 let batch_size = context.session_config().batch_size();
556
557 // TODO: Add existence joins + this is guarded at physical planner
558 if is_existence_join(self.join_type()) {
559 unreachable!()
560 } else {
561 Ok(Box::pin(ClassicPWMJStream::try_new(
562 Arc::clone(&self.schema),
563 on_streamed,
564 self.join_type,
565 self.operator,
566 streamed,
567 BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
568 PiecewiseMergeJoinStreamState::WaitBufferedSide,
569 self.sort_options,
570 metrics,
571 batch_size,
572 )))
573 }
574 }
575}
576
577impl DisplayAs for PiecewiseMergeJoinExec {
578 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
579 let on_str = format!(
580 "({} {} {})",
581 fmt_sql(self.on.0.as_ref()),
582 self.operator,
583 fmt_sql(self.on.1.as_ref())
584 );
585
586 match t {
587 DisplayFormatType::Default | DisplayFormatType::Verbose => {
588 write!(
589 f,
590 "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
591 self.operator, self.join_type, on_str
592 )
593 }
594
595 DisplayFormatType::TreeRender => {
596 writeln!(f, "operator={:?}", self.operator)?;
597 if self.join_type != JoinType::Inner {
598 writeln!(f, "join_type={:?}", self.join_type)?;
599 }
600 writeln!(f, "on={on_str}")
601 }
602 }
603 }
604}
605
606async fn build_buffered_data(
607 buffered: SendableRecordBatchStream,
608 on_buffered: PhysicalExprRef,
609 metrics: BuildProbeJoinMetrics,
610 reservation: MemoryReservation,
611 build_map: bool,
612 remaining_partitions: usize,
613) -> Result<BufferedSideData> {
614 let schema = buffered.schema();
615
616 // Combine batches and record number of rows
617 let initial = (Vec::new(), 0, metrics, reservation);
618 let (batches, num_rows, metrics, mut reservation) = buffered
619 .try_fold(initial, |mut acc, batch| async {
620 let batch_size = get_record_batch_memory_size(&batch);
621 acc.3.try_grow(batch_size)?;
622 acc.2.build_mem_used.add(batch_size);
623 acc.2.build_input_batches.add(1);
624 acc.2.build_input_rows.add(batch.num_rows());
625 // Update row count
626 acc.1 += batch.num_rows();
627 // Push batch to output
628 acc.0.push(batch);
629 Ok(acc)
630 })
631 .await?;
632
633 let single_batch = concat_batches(&schema, batches.iter())?;
634
635 // Evaluate physical expression on the buffered side.
636 let buffered_values = on_buffered
637 .evaluate(&single_batch)?
638 .into_array(single_batch.num_rows())?;
639
640 // We add the single batch size + the memory of the join keys
641 // size of the size estimation
642 let size_estimation = get_record_batch_memory_size(&single_batch)
643 + buffered_values.get_array_memory_size();
644 reservation.try_grow(size_estimation)?;
645 metrics.build_mem_used.add(size_estimation);
646
647 // Created visited indices bitmap only if the join type requires it
648 let visited_indices_bitmap = if build_map {
649 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
650 reservation.try_grow(bitmap_size)?;
651 metrics.build_mem_used.add(bitmap_size);
652
653 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
654 bitmap_buffer.append_n(num_rows, false);
655 bitmap_buffer
656 } else {
657 BooleanBufferBuilder::new(0)
658 };
659
660 let buffered_data = BufferedSideData::new(
661 single_batch,
662 buffered_values,
663 Mutex::new(visited_indices_bitmap),
664 remaining_partitions,
665 reservation,
666 );
667
668 Ok(buffered_data)
669}
670
671pub(super) struct BufferedSideData {
672 pub(super) batch: RecordBatch,
673 values: ArrayRef,
674 pub(super) visited_indices_bitmap: SharedBitmapBuilder,
675 pub(super) remaining_partitions: AtomicUsize,
676 _reservation: MemoryReservation,
677}
678
679impl BufferedSideData {
680 pub(super) fn new(
681 batch: RecordBatch,
682 values: ArrayRef,
683 visited_indices_bitmap: SharedBitmapBuilder,
684 remaining_partitions: usize,
685 reservation: MemoryReservation,
686 ) -> Self {
687 Self {
688 batch,
689 values,
690 visited_indices_bitmap,
691 remaining_partitions: AtomicUsize::new(remaining_partitions),
692 _reservation: reservation,
693 }
694 }
695
696 pub(super) fn batch(&self) -> &RecordBatch {
697 &self.batch
698 }
699
700 pub(super) fn values(&self) -> &ArrayRef {
701 &self.values
702 }
703}
704
705pub(super) enum BufferedSide {
706 /// Indicates that build-side not collected yet
707 Initial(BufferedSideInitialState),
708 /// Indicates that build-side data has been collected
709 Ready(BufferedSideReadyState),
710}
711
712impl BufferedSide {
713 // Takes a mutable state of the buffered row batches
714 pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
715 match self {
716 BufferedSide::Initial(state) => Ok(state),
717 _ => internal_err!("Expected build side in initial state"),
718 }
719 }
720
721 pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
722 match self {
723 BufferedSide::Ready(state) => Ok(state),
724 _ => {
725 internal_err!("Expected build side in ready state")
726 }
727 }
728 }
729
730 /// Tries to extract BuildSideReadyState from BuildSide enum.
731 /// Returns an error if state is not Ready.
732 pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
733 match self {
734 BufferedSide::Ready(state) => Ok(state),
735 _ => internal_err!("Expected build side in ready state"),
736 }
737 }
738}
739
740pub(super) struct BufferedSideInitialState {
741 pub(crate) buffered_fut: OnceFut<BufferedSideData>,
742}
743
744pub(super) struct BufferedSideReadyState {
745 /// Collected build-side data
746 pub(super) buffered_data: Arc<BufferedSideData>,
747}