datafusion_physical_plan/joins/piecewise_merge_join/exec.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::Array;
19use arrow::{
20 array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
21 compute::concat_batches,
22 util::bit_util,
23};
24use arrow_schema::{SchemaRef, SortOptions};
25use datafusion_common::not_impl_err;
26use datafusion_common::{JoinSide, Result, internal_err};
27use datafusion_execution::{
28 SendableRecordBatchStream,
29 memory_pool::{MemoryConsumer, MemoryReservation},
30};
31use datafusion_expr::{JoinType, Operator};
32use datafusion_physical_expr::equivalence::join_equivalence_properties;
33use datafusion_physical_expr::{
34 Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
35 PhysicalSortExpr,
36};
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use futures::TryStreamExt;
39use parking_lot::Mutex;
40use std::fmt::Formatter;
41use std::sync::Arc;
42use std::sync::atomic::AtomicUsize;
43
44use crate::execution_plan::{EmissionType, boundedness_from_children};
45
46use crate::joins::piecewise_merge_join::classic_join::{
47 ClassicPWMJStream, PiecewiseMergeJoinStreamState,
48};
49use crate::joins::piecewise_merge_join::utils::{
50 build_visited_indices_map, is_existence_join, is_right_existence_join,
51};
52use crate::joins::utils::asymmetric_join_output_partitioning;
53use crate::metrics::MetricsSet;
54use crate::{
55 DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties,
56};
57use crate::{
58 ExecutionPlan, PlanProperties,
59 joins::{
60 SharedBitmapBuilder,
61 utils::{BuildProbeJoinMetrics, OnceAsync, OnceFut, build_join_schema},
62 },
63 metrics::ExecutionPlanMetricsSet,
64 spill::get_record_batch_memory_size,
65};
66
67/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much
68/// better performance for these workloads than `NestedLoopJoin`
69///
70/// The physical planner will choose to evaluate this join when there is only one comparison filter. This
71/// is a binary expression which contains [`Operator::Lt`], [`Operator::LtEq`], [`Operator::Gt`], and
72/// [`Operator::GtEq`].:
73/// Examples:
74/// - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
75///
76/// # Execution Plan Inputs
77/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side and the left outputs as the
78/// 'buffered' side.
79///
80/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
81/// batches during processing. Sorted input must specifically be ascending/descending based on the operator.
82///
83/// # Algorithms
84/// Classic joins are processed differently compared to existence joins.
85///
86/// ## Classic Joins (Inner, Full, Left, Right)
87/// For classic joins we buffer the build side and stream the probe side (the "probe" side).
88/// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures
89/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
90/// probe rows from the match position onward, without rescanning earlier probe rows.
91///
92/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
93/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
94/// monotonically as we stream new batches from the stream side.
95///
96/// The streamed side may arrive unsorted, so this operator sorts each incoming batch in memory before
97/// processing. The buffered side is required to be globally sorted; the plan declares this requirement
98/// in `requires_input_order`, which allows the optimizer to automatically insert a `SortExec` on that side if needed.
99/// By the time this operator runs, the buffered side is guaranteed to be in the proper order.
100///
101/// The pseudocode for the algorithm looks like this:
102///
103/// ```text
104/// for stream_row in stream_batch:
105/// for buffer_row in buffer_batch:
106/// if compare(stream_row, probe_row):
107/// output stream_row X buffer_batch[buffer_row:]
108/// else:
109/// continue
110/// ```
111///
112/// The algorithm uses the streamed side (larger) to drive the loop. This is due to every row on the stream side iterating
113/// the buffered side to find every first match. By doing this, each match can output more result so that output
114/// handling can be better vectorized for performance.
115///
116/// Here is an example:
117///
118/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). For each
119/// row on the streamed side we move a pointer on the buffered until it matches the condition. Once we reach
120/// the row which matches (in this case with row 1 on streamed will have its first match on row 2 on
121/// buffered; 100 < 200 is true), we can emit all rows after that match. We can emit the rows like this because
122/// if the batch is sorted in ascending order, every subsequent row will also satisfy the condition as they will
123/// all be larger values.
124///
125/// ```text
126/// SQL statement:
127/// SELECT *
128/// FROM (VALUES (100), (200), (500)) AS streamed(a)
129/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
130/// ON streamed.a < buffered.b;
131///
132/// Processing Row 1:
133///
134/// Sorted Buffered Side Sorted Streamed Side
135/// ┌──────────────────┐ ┌──────────────────┐
136/// 1 │ 100 │ 1 │ 100 │
137/// ├──────────────────┤ ├──────────────────┤
138/// 2 │ 200 │ ─┐ 2 │ 200 │
139/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤
140/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │
141/// ├──────────────────┤ │ as matches when the operator is └──────────────────┘
142/// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all
143/// ├──────────────────┤ │ rows after the first match (row
144/// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200)
145/// └──────────────────┘
146///
147/// Processing Row 2:
148/// By sorting the streamed side we know
149///
150/// Sorted Buffered Side Sorted Streamed Side
151/// ┌──────────────────┐ ┌──────────────────┐
152/// 1 │ 100 │ 1 │ 100 │
153/// ├──────────────────┤ ├──────────────────┤
154/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │
155/// ├──────────────────┤ streamed side row 2. ├──────────────────┤
156/// 3 │ 200 │ 3 │ 500 │
157/// ├──────────────────┤ └──────────────────┘
158/// 4 │ 300 │
159/// ├──────────────────┤
160/// 5 │ 400 │
161/// └──────────────────┘
162/// ```
163///
164/// ## Existence Joins (Semi, Anti, Mark)
165/// Existence joins are made magnitudes of times faster with a `PiecewiseMergeJoin` as we only need to find
166/// the min/max value of the streamed side to be able to emit all matches on the buffered side. By putting
167/// the side we need to mark onto the sorted buffer side, we can emit all these matches at once.
168///
169/// For less than operations (`<`) both inputs are to be sorted in descending order and vice versa for greater
170/// than (`>`) operations. `SortExec` is used to enforce sorting on the buffered side and streamed side does not
171/// need to be sorted due to only needing to find the min/max.
172///
173/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
174///
175/// The pseudocode for the algorithm looks like this:
176///
177/// ```text
178/// // Using the example of a less than `<` operation
179/// let max = max_batch(streamed_batch)
180///
181/// for buffer_row in buffer_batch:
182/// if buffer_row < max:
183/// output buffer_batch[buffer_row:]
184/// ```
185///
186/// Only need to find the min/max value and iterate through the buffered side once.
187///
188/// Here is an example:
189/// We perform a `JoinType::LeftSemi` with these two batches and the operator being `Operator::Lt`(<). Because
190/// the operator is `Operator::Lt` we can find the minimum value in the streamed side; in this case it is 200.
191/// We can then advance a pointer from the start of the buffer side until we find the first value that satisfies
192/// the predicate. All rows after that first matched value satisfy the condition 200 < x so we can mark all of
193/// those rows as matched.
194///
195/// ```text
196/// SQL statement:
197/// SELECT *
198/// FROM (VALUES (500), (200), (300)) AS streamed(a)
199/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
200/// ON streamed.a < buffered.b;
201///
202/// Sorted Buffered Side Unsorted Streamed Side
203/// ┌──────────────────┐ ┌──────────────────┐
204/// 1 │ 100 │ 1 │ 500 │
205/// ├──────────────────┤ ├──────────────────┤
206/// 2 │ 200 │ 2 │ 200 │
207/// ├──────────────────┤ ├──────────────────┤
208/// 3 │ 200 │ 3 │ 300 │
209/// ├──────────────────┤ └──────────────────┘
210/// 4 │ 300 │ ─┐
211/// ├──────────────────┤ | We emit matches for row 4 - 5
212/// 5 │ 400 │ ─┘ on the buffered side.
213/// └──────────────────┘
214/// min value: 200
215/// ```
216///
217/// For both types of joins, the buffered side must be sorted ascending for `Operator::Lt` (<) or
218/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or `Operator::GtEq` (>=).
219///
220/// # Partitioning Logic
221/// Piecewise Merge Join requires one buffered side partition + round robin partitioned stream side. A counter
222/// is used in the buffered side to coordinate when all streamed partitions are finished execution. This allows
223/// for processing the rest of the unmatched rows for Left and Full joins. The last partition that finishes
224/// execution will be responsible for outputting the unmatched rows.
225///
226/// # Performance Explanation (cost)
227/// Piecewise Merge Join is used over Nested Loop Join due to its superior performance. Here is the breakdown:
228///
229/// R: Buffered Side
230/// S: Streamed Side
231///
232/// ## Piecewise Merge Join (PWMJ)
233///
234/// # Classic Join:
235/// Requires sorting the probe side and, for each probe row, scanning the buffered side until the first match
236/// is found.
237/// Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
238///
239/// # Mark Join:
240/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
241/// within that range.
242/// Complexity: `O(|S| + scan(R[range]))`.
243///
244/// ## Nested Loop Join
245/// Compares every row from `S` with every row from `R`.
246/// Complexity: `O(|S| * |R|)`.
247///
248/// ## Nested Loop Join
249/// Always going to be probe (O(S) * O(R)).
250///
251/// # Further Reference Material
252/// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
253#[derive(Debug)]
254pub struct PiecewiseMergeJoinExec {
255 /// Left buffered execution plan
256 pub buffered: Arc<dyn ExecutionPlan>,
257 /// Right streamed execution plan
258 pub streamed: Arc<dyn ExecutionPlan>,
259 /// The two expressions being compared
260 pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
261 /// Comparison operator in the range predicate
262 pub operator: Operator,
263 /// How the join is performed
264 pub join_type: JoinType,
265 /// The schema once the join is applied
266 schema: SchemaRef,
267 /// Buffered data
268 buffered_fut: OnceAsync<BufferedSideData>,
269 /// Execution metrics
270 metrics: ExecutionPlanMetricsSet,
271
272 /// Sort expressions - See above for more details [`PiecewiseMergeJoinExec`]
273 ///
274 /// The left sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
275 left_child_plan_required_order: LexOrdering,
276 /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
277 /// Unsorted for mark joins
278 right_batch_required_orders: LexOrdering,
279
280 /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
281 sort_options: SortOptions,
282 /// Cache holding plan properties like equivalences, output partitioning etc.
283 cache: Arc<PlanProperties>,
284 /// Number of partitions to process
285 num_partitions: usize,
286}
287
288impl PiecewiseMergeJoinExec {
289 pub fn try_new(
290 buffered: Arc<dyn ExecutionPlan>,
291 streamed: Arc<dyn ExecutionPlan>,
292 on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
293 operator: Operator,
294 join_type: JoinType,
295 num_partitions: usize,
296 ) -> Result<Self> {
297 // TODO: Implement existence joins for PiecewiseMergeJoin
298 if is_existence_join(join_type) {
299 return not_impl_err!(
300 "Existence Joins are currently not supported for PiecewiseMergeJoin"
301 );
302 }
303
304 // Take the operator and enforce a sort order on the streamed + buffered side based on
305 // the operator type.
306 let sort_options = match operator {
307 Operator::Lt | Operator::LtEq => {
308 // For left existence joins the inputs will be swapped so the sort
309 // options are switched
310 if is_right_existence_join(join_type) {
311 SortOptions::new(false, true)
312 } else {
313 SortOptions::new(true, true)
314 }
315 }
316 Operator::Gt | Operator::GtEq => {
317 if is_right_existence_join(join_type) {
318 SortOptions::new(true, true)
319 } else {
320 SortOptions::new(false, true)
321 }
322 }
323 _ => {
324 return internal_err!(
325 "Cannot contain non-range operator in PiecewiseMergeJoinExec"
326 );
327 }
328 };
329
330 // Give the same `sort_option for comparison later`
331 let left_child_plan_required_order =
332 vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
333 let right_batch_required_orders =
334 vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
335
336 let Some(left_child_plan_required_order) =
337 LexOrdering::new(left_child_plan_required_order)
338 else {
339 return internal_err!(
340 "PiecewiseMergeJoinExec requires valid sort expressions for its left side"
341 );
342 };
343 let Some(right_batch_required_orders) =
344 LexOrdering::new(right_batch_required_orders)
345 else {
346 return internal_err!(
347 "PiecewiseMergeJoinExec requires valid sort expressions for its right side"
348 );
349 };
350
351 let buffered_schema = buffered.schema();
352 let streamed_schema = streamed.schema();
353
354 // Create output schema for the join
355 let schema =
356 Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
357 let cache = Self::compute_properties(
358 &buffered,
359 &streamed,
360 Arc::clone(&schema),
361 join_type,
362 &on,
363 )?;
364
365 Ok(Self {
366 streamed,
367 buffered,
368 on,
369 operator,
370 join_type,
371 schema,
372 buffered_fut: Default::default(),
373 metrics: ExecutionPlanMetricsSet::new(),
374 left_child_plan_required_order,
375 right_batch_required_orders,
376 sort_options,
377 cache: Arc::new(cache),
378 num_partitions,
379 })
380 }
381
382 /// Reference to buffered side execution plan
383 pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
384 &self.buffered
385 }
386
387 /// Reference to streamed side execution plan
388 pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
389 &self.streamed
390 }
391
392 /// Join type
393 pub fn join_type(&self) -> JoinType {
394 self.join_type
395 }
396
397 /// Reference to sort options
398 pub fn sort_options(&self) -> &SortOptions {
399 &self.sort_options
400 }
401
402 /// Get probe side (streamed side) for the PiecewiseMergeJoin
403 /// In current implementation, probe side is determined according to join type.
404 pub fn probe_side(join_type: &JoinType) -> JoinSide {
405 match join_type {
406 JoinType::Right
407 | JoinType::Inner
408 | JoinType::Full
409 | JoinType::RightSemi
410 | JoinType::RightAnti
411 | JoinType::RightMark => JoinSide::Right,
412 JoinType::Left
413 | JoinType::LeftAnti
414 | JoinType::LeftSemi
415 | JoinType::LeftMark => JoinSide::Left,
416 }
417 }
418
419 pub fn compute_properties(
420 buffered: &Arc<dyn ExecutionPlan>,
421 streamed: &Arc<dyn ExecutionPlan>,
422 schema: SchemaRef,
423 join_type: JoinType,
424 join_on: &(PhysicalExprRef, PhysicalExprRef),
425 ) -> Result<PlanProperties> {
426 let eq_properties = join_equivalence_properties(
427 buffered.equivalence_properties().clone(),
428 streamed.equivalence_properties().clone(),
429 &join_type,
430 schema,
431 &Self::maintains_input_order(join_type),
432 Some(Self::probe_side(&join_type)),
433 std::slice::from_ref(join_on),
434 )?;
435
436 let output_partitioning =
437 asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
438
439 Ok(PlanProperties::new(
440 eq_properties,
441 output_partitioning,
442 EmissionType::Incremental,
443 boundedness_from_children([buffered, streamed]),
444 ))
445 }
446
447 // TODO: Add input order. Now they're all `false` indicating it will not maintain the input order.
448 // However, for certain join types the order is maintained. This can be updated in the future after
449 // more testing.
450 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
451 match join_type {
452 // The existence side is expected to come in sorted
453 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
454 vec![false, false]
455 }
456 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
457 vec![false, false]
458 }
459 // Left, Right, Full, Inner Join is not guaranteed to maintain
460 // input order as the streamed side will be sorted during
461 // execution for `PiecewiseMergeJoin`
462 _ => vec![false, false],
463 }
464 }
465
466 // TODO
467 pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
468 todo!()
469 }
470
471 fn with_new_children_and_same_properties(
472 &self,
473 mut children: Vec<Arc<dyn ExecutionPlan>>,
474 ) -> Self {
475 let buffered = children.swap_remove(0);
476 let streamed = children.swap_remove(0);
477 Self {
478 buffered,
479 streamed,
480 on: self.on.clone(),
481 operator: self.operator,
482 join_type: self.join_type,
483 schema: Arc::clone(&self.schema),
484 left_child_plan_required_order: self.left_child_plan_required_order.clone(),
485 right_batch_required_orders: self.right_batch_required_orders.clone(),
486 sort_options: self.sort_options,
487 cache: Arc::clone(&self.cache),
488 num_partitions: self.num_partitions,
489
490 // Re-set state.
491 metrics: ExecutionPlanMetricsSet::new(),
492 buffered_fut: Default::default(),
493 }
494 }
495}
496
497impl ExecutionPlan for PiecewiseMergeJoinExec {
498 fn name(&self) -> &str {
499 "PiecewiseMergeJoinExec"
500 }
501
502 fn as_any(&self) -> &dyn std::any::Any {
503 self
504 }
505
506 fn properties(&self) -> &Arc<PlanProperties> {
507 &self.cache
508 }
509
510 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
511 vec![&self.buffered, &self.streamed]
512 }
513
514 fn required_input_distribution(&self) -> Vec<Distribution> {
515 vec![
516 Distribution::SinglePartition,
517 Distribution::UnspecifiedDistribution,
518 ]
519 }
520
521 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
522 // Existence joins don't need to be sorted on one side.
523 if is_right_existence_join(self.join_type) {
524 unimplemented!()
525 } else {
526 // Sort the right side in memory, so we do not need to enforce any sorting
527 vec![
528 Some(OrderingRequirements::from(
529 self.left_child_plan_required_order.clone(),
530 )),
531 None,
532 ]
533 }
534 }
535
536 fn with_new_children(
537 self: Arc<Self>,
538 children: Vec<Arc<dyn ExecutionPlan>>,
539 ) -> Result<Arc<dyn ExecutionPlan>> {
540 check_if_same_properties!(self, children);
541 match &children[..] {
542 [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
543 Arc::clone(left),
544 Arc::clone(right),
545 self.on.clone(),
546 self.operator,
547 self.join_type,
548 self.num_partitions,
549 )?)),
550 _ => internal_err!(
551 "PiecewiseMergeJoin should have 2 children, found {}",
552 children.len()
553 ),
554 }
555 }
556
557 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
558 Ok(Arc::new(self.with_new_children_and_same_properties(vec![
559 Arc::clone(&self.buffered),
560 Arc::clone(&self.streamed),
561 ])))
562 }
563
564 fn execute(
565 &self,
566 partition: usize,
567 context: Arc<datafusion_execution::TaskContext>,
568 ) -> Result<SendableRecordBatchStream> {
569 let on_buffered = Arc::clone(&self.on.0);
570 let on_streamed = Arc::clone(&self.on.1);
571
572 let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
573 let buffered_fut = self.buffered_fut.try_once(|| {
574 let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
575 .register(context.memory_pool());
576
577 let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
578 Ok(build_buffered_data(
579 buffered_stream,
580 Arc::clone(&on_buffered),
581 metrics.clone(),
582 reservation,
583 build_visited_indices_map(self.join_type),
584 self.num_partitions,
585 ))
586 })?;
587
588 let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
589
590 let batch_size = context.session_config().batch_size();
591
592 // TODO: Add existence joins + this is guarded at physical planner
593 if is_existence_join(self.join_type()) {
594 unreachable!()
595 } else {
596 Ok(Box::pin(ClassicPWMJStream::try_new(
597 Arc::clone(&self.schema),
598 on_streamed,
599 self.join_type,
600 self.operator,
601 streamed,
602 BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
603 PiecewiseMergeJoinStreamState::WaitBufferedSide,
604 self.sort_options,
605 metrics,
606 batch_size,
607 )))
608 }
609 }
610
611 fn metrics(&self) -> Option<MetricsSet> {
612 Some(self.metrics.clone_inner())
613 }
614}
615
616impl DisplayAs for PiecewiseMergeJoinExec {
617 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
618 let on_str = format!(
619 "({} {} {})",
620 fmt_sql(self.on.0.as_ref()),
621 self.operator,
622 fmt_sql(self.on.1.as_ref())
623 );
624
625 match t {
626 DisplayFormatType::Default | DisplayFormatType::Verbose => {
627 write!(
628 f,
629 "PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
630 self.operator, self.join_type, on_str
631 )
632 }
633
634 DisplayFormatType::TreeRender => {
635 writeln!(f, "operator={:?}", self.operator)?;
636 if self.join_type != JoinType::Inner {
637 writeln!(f, "join_type={:?}", self.join_type)?;
638 }
639 writeln!(f, "on={on_str}")
640 }
641 }
642 }
643}
644
645async fn build_buffered_data(
646 buffered: SendableRecordBatchStream,
647 on_buffered: PhysicalExprRef,
648 metrics: BuildProbeJoinMetrics,
649 reservation: MemoryReservation,
650 build_map: bool,
651 remaining_partitions: usize,
652) -> Result<BufferedSideData> {
653 let schema = buffered.schema();
654
655 // Combine batches and record number of rows
656 let initial = (Vec::new(), 0, metrics, reservation);
657 let (batches, num_rows, metrics, reservation) = buffered
658 .try_fold(initial, |mut acc, batch| async {
659 let batch_size = get_record_batch_memory_size(&batch);
660 acc.3.try_grow(batch_size)?;
661 acc.2.build_mem_used.add(batch_size);
662 acc.2.build_input_batches.add(1);
663 acc.2.build_input_rows.add(batch.num_rows());
664 // Update row count
665 acc.1 += batch.num_rows();
666 // Push batch to output
667 acc.0.push(batch);
668 Ok(acc)
669 })
670 .await?;
671
672 let single_batch = concat_batches(&schema, batches.iter())?;
673
674 // Evaluate physical expression on the buffered side.
675 let buffered_values = on_buffered
676 .evaluate(&single_batch)?
677 .into_array(single_batch.num_rows())?;
678
679 // We add the single batch size + the memory of the join keys
680 // size of the size estimation
681 let size_estimation = get_record_batch_memory_size(&single_batch)
682 + buffered_values.get_array_memory_size();
683 reservation.try_grow(size_estimation)?;
684 metrics.build_mem_used.add(size_estimation);
685
686 // Created visited indices bitmap only if the join type requires it
687 let visited_indices_bitmap = if build_map {
688 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
689 reservation.try_grow(bitmap_size)?;
690 metrics.build_mem_used.add(bitmap_size);
691
692 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
693 bitmap_buffer.append_n(num_rows, false);
694 bitmap_buffer
695 } else {
696 BooleanBufferBuilder::new(0)
697 };
698
699 let buffered_data = BufferedSideData::new(
700 single_batch,
701 buffered_values,
702 Mutex::new(visited_indices_bitmap),
703 remaining_partitions,
704 reservation,
705 );
706
707 Ok(buffered_data)
708}
709
710pub(super) struct BufferedSideData {
711 pub(super) batch: RecordBatch,
712 values: ArrayRef,
713 pub(super) visited_indices_bitmap: SharedBitmapBuilder,
714 pub(super) remaining_partitions: AtomicUsize,
715 _reservation: MemoryReservation,
716}
717
718impl BufferedSideData {
719 pub(super) fn new(
720 batch: RecordBatch,
721 values: ArrayRef,
722 visited_indices_bitmap: SharedBitmapBuilder,
723 remaining_partitions: usize,
724 reservation: MemoryReservation,
725 ) -> Self {
726 Self {
727 batch,
728 values,
729 visited_indices_bitmap,
730 remaining_partitions: AtomicUsize::new(remaining_partitions),
731 _reservation: reservation,
732 }
733 }
734
735 pub(super) fn batch(&self) -> &RecordBatch {
736 &self.batch
737 }
738
739 pub(super) fn values(&self) -> &ArrayRef {
740 &self.values
741 }
742}
743
744pub(super) enum BufferedSide {
745 /// Indicates that build-side not collected yet
746 Initial(BufferedSideInitialState),
747 /// Indicates that build-side data has been collected
748 Ready(BufferedSideReadyState),
749}
750
751impl BufferedSide {
752 // Takes a mutable state of the buffered row batches
753 pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
754 match self {
755 BufferedSide::Initial(state) => Ok(state),
756 _ => internal_err!("Expected build side in initial state"),
757 }
758 }
759
760 pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
761 match self {
762 BufferedSide::Ready(state) => Ok(state),
763 _ => {
764 internal_err!("Expected build side in ready state")
765 }
766 }
767 }
768
769 /// Tries to extract BuildSideReadyState from BuildSide enum.
770 /// Returns an error if state is not Ready.
771 pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
772 match self {
773 BufferedSide::Ready(state) => Ok(state),
774 _ => internal_err!("Expected build side in ready state"),
775 }
776 }
777}
778
779pub(super) struct BufferedSideInitialState {
780 pub(crate) buffered_fut: OnceFut<BufferedSideData>,
781}
782
783pub(super) struct BufferedSideReadyState {
784 /// Collected build-side data
785 pub(super) buffered_data: Arc<BufferedSideData>,
786}