Skip to main content

datafusion_physical_plan/joins/
nested_loop_join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates).
19
20use std::any::Any;
21use std::fmt::Formatter;
22use std::ops::{BitOr, ControlFlow};
23use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::task::Poll;
26
27use super::utils::{
28    asymmetric_join_output_partitioning, need_produce_result_in_final,
29    reorder_output_after_swap, swap_join_projection,
30};
31use crate::common::can_project;
32use crate::execution_plan::{EmissionType, boundedness_from_children};
33use crate::joins::SharedBitmapBuilder;
34use crate::joins::utils::{
35    BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut,
36    build_join_schema, check_join_is_valid, estimate_join_statistics,
37    need_produce_right_in_final,
38};
39use crate::metrics::{
40    Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricsSet, RatioMetrics,
41};
42use crate::projection::{
43    EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
44    try_pushdown_through_join,
45};
46use crate::{
47    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
48    PlanProperties, RecordBatchStream, SendableRecordBatchStream,
49};
50
51use arrow::array::{
52    Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, UInt32Array,
53    UInt64Array, new_null_array,
54};
55use arrow::buffer::BooleanBuffer;
56use arrow::compute::{
57    BatchCoalescer, concat_batches, filter, filter_record_batch, not, take,
58};
59use arrow::datatypes::{Schema, SchemaRef};
60use arrow::record_batch::RecordBatch;
61use arrow_schema::DataType;
62use datafusion_common::cast::as_boolean_array;
63use datafusion_common::{
64    JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err,
65    internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err,
66};
67use datafusion_execution::TaskContext;
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_expr::JoinType;
70use datafusion_physical_expr::equivalence::{
71    ProjectionMapping, join_equivalence_properties,
72};
73
74use futures::{Stream, StreamExt, TryStreamExt};
75use log::debug;
76use parking_lot::Mutex;
77
78#[expect(rustdoc::private_intra_doc_links)]
79/// NestedLoopJoinExec is a build-probe join operator designed for joins that
80/// do not have equijoin keys in their `ON` clause.
81///
82/// # Execution Flow
83///
84/// ```text
85///                                                Incoming right batch
86///                Left Side Buffered Batches
87///                       ┌───────────┐              ┌───────────────┐
88///                       │ ┌───────┐ │              │               │
89///                       │ │       │ │              │               │
90///  Current Left Row ───▶│ ├───────├─┤──────────┐   │               │
91///                       │ │       │ │          │   └───────────────┘
92///                       │ │       │ │          │           │
93///                       │ │       │ │          │           │
94///                       │ └───────┘ │          │           │
95///                       │ ┌───────┐ │          │           │
96///                       │ │       │ │          │     ┌─────┘
97///                       │ │       │ │          │     │
98///                       │ │       │ │          │     │
99///                       │ │       │ │          │     │
100///                       │ │       │ │          │     │
101///                       │ └───────┘ │          ▼     ▼
102///                       │   ......  │  ┌──────────────────────┐
103///                       │           │  │X (Cartesian Product) │
104///                       │           │  └──────────┬───────────┘
105///                       └───────────┘             │
106///                                                 │
107///                                                 ▼
108///                                      ┌───────┬───────────────┐
109///                                      │       │               │
110///                                      │       │               │
111///                                      │       │               │
112///                                      └───────┴───────────────┘
113///                                        Intermediate Batch
114///                                  (For join predicate evaluation)
115/// ```
116///
117/// The execution follows a two-phase design:
118///
119/// ## 1. Buffering Left Input
120/// - The operator eagerly buffers all left-side input batches into memory,
121///   util a memory limit is reached.
122///   Currently, an out-of-memory error will be thrown if all the left-side input batches
123///   cannot fit into memory at once.
124///   In the future, it's possible to make this case finish execution. (see
125///   'Memory-limited Execution' section)
126/// - The rationale for buffering the left side is that scanning the right side
127///   can be expensive (e.g., decoding Parquet files), so buffering more left
128///   rows reduces the number of right-side scan passes required.
129///
130/// ## 2. Probing Right Input
131/// - Right-side input is streamed batch by batch.
132/// - For each right-side batch:
133///   - It evaluates the join filter against the full buffered left input.
134///     This results in a Cartesian product between the right batch and each
135///     left row -- with the join predicate/filter applied -- for each inner
136///     loop iteration.
137///   - Matched results are accumulated into an output buffer. (see more in
138///     `Output Buffering Strategy` section)
139/// - This process continues until all right-side input is consumed.
140///
141/// # Producing unmatched build-side data
142/// - For special join types like left/full joins, it's required to also output
143///   unmatched pairs. During execution, bitmaps are kept for both left and right
144///   sides of the input; they'll be handled by dedicated states in `NLJStream`.
145/// - The final output of the left side unmatched rows is handled by a single
146///   partition for simplicity, since it only counts a small portion of the
147///   execution time. (e.g. if probe side has 10k rows, the final output of
148///   unmatched build side only roughly counts for 1/10k of the total time)
149///
150/// # Output Buffering Strategy
151/// The operator uses an intermediate output buffer to accumulate results. Once
152/// the output threshold is reached (currently set to the same value as
153/// `batch_size` in the configuration), the results will be eagerly output.
154///
155/// # Extra Notes
156/// - The operator always considers the **left** side as the build (buffered) side.
157///   Therefore, the physical optimizer should assign the smaller input to the left.
158/// - The design try to minimize the intermediate data size to approximately
159///   1 batch, for better cache locality and memory efficiency.
160///
161/// # TODO: Memory-limited Execution
162/// If the memory budget is exceeded during left-side buffering, fallback
163/// strategies such as streaming left batches and re-scanning the right side
164/// may be implemented in the future.
165///
166/// Tracking issue: <https://github.com/apache/datafusion/issues/15760>
167///
168/// # Clone / Shared State
169/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
170/// loading of the left side with the processing in each output stream.
171/// Therefore it can not be [`Clone`]
172#[derive(Debug)]
173pub struct NestedLoopJoinExec {
174    /// left side
175    pub(crate) left: Arc<dyn ExecutionPlan>,
176    /// right side
177    pub(crate) right: Arc<dyn ExecutionPlan>,
178    /// Filters which are applied while finding matching rows
179    pub(crate) filter: Option<JoinFilter>,
180    /// How the join is performed
181    pub(crate) join_type: JoinType,
182    /// The full concatenated schema of left and right children should be distinct from
183    /// the output schema of the operator
184    join_schema: SchemaRef,
185    /// Future that consumes left input and buffers it in memory
186    ///
187    /// This structure is *shared* across all output streams.
188    ///
189    /// Each output stream waits on the `OnceAsync` to signal the completion of
190    /// the build(left) side data, and buffer them all for later joining.
191    build_side_data: OnceAsync<JoinLeftData>,
192    /// Information of index and left / right placement of columns
193    column_indices: Vec<ColumnIndex>,
194    /// Projection to apply to the output of the join
195    projection: Option<Vec<usize>>,
196
197    /// Execution metrics
198    metrics: ExecutionPlanMetricsSet,
199    /// Cache holding plan properties like equivalences, output partitioning etc.
200    cache: PlanProperties,
201}
202
203impl NestedLoopJoinExec {
204    /// Try to create a new [`NestedLoopJoinExec`]
205    pub fn try_new(
206        left: Arc<dyn ExecutionPlan>,
207        right: Arc<dyn ExecutionPlan>,
208        filter: Option<JoinFilter>,
209        join_type: &JoinType,
210        projection: Option<Vec<usize>>,
211    ) -> Result<Self> {
212        let left_schema = left.schema();
213        let right_schema = right.schema();
214        check_join_is_valid(&left_schema, &right_schema, &[])?;
215        let (join_schema, column_indices) =
216            build_join_schema(&left_schema, &right_schema, join_type);
217        let join_schema = Arc::new(join_schema);
218        let cache = Self::compute_properties(
219            &left,
220            &right,
221            &join_schema,
222            *join_type,
223            projection.as_ref(),
224        )?;
225
226        Ok(NestedLoopJoinExec {
227            left,
228            right,
229            filter,
230            join_type: *join_type,
231            join_schema,
232            build_side_data: Default::default(),
233            column_indices,
234            projection,
235            metrics: Default::default(),
236            cache,
237        })
238    }
239
240    /// left side
241    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
242        &self.left
243    }
244
245    /// right side
246    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
247        &self.right
248    }
249
250    /// Filters applied before join output
251    pub fn filter(&self) -> Option<&JoinFilter> {
252        self.filter.as_ref()
253    }
254
255    /// How the join is performed
256    pub fn join_type(&self) -> &JoinType {
257        &self.join_type
258    }
259
260    pub fn projection(&self) -> Option<&Vec<usize>> {
261        self.projection.as_ref()
262    }
263
264    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
265    fn compute_properties(
266        left: &Arc<dyn ExecutionPlan>,
267        right: &Arc<dyn ExecutionPlan>,
268        schema: &SchemaRef,
269        join_type: JoinType,
270        projection: Option<&Vec<usize>>,
271    ) -> Result<PlanProperties> {
272        // Calculate equivalence properties:
273        let mut eq_properties = join_equivalence_properties(
274            left.equivalence_properties().clone(),
275            right.equivalence_properties().clone(),
276            &join_type,
277            Arc::clone(schema),
278            &Self::maintains_input_order(join_type),
279            None,
280            // No on columns in nested loop join
281            &[],
282        )?;
283
284        let mut output_partitioning =
285            asymmetric_join_output_partitioning(left, right, &join_type)?;
286
287        let emission_type = if left.boundedness().is_unbounded() {
288            EmissionType::Final
289        } else if right.pipeline_behavior() == EmissionType::Incremental {
290            match join_type {
291                // If we only need to generate matched rows from the probe side,
292                // we can emit rows incrementally.
293                JoinType::Inner
294                | JoinType::LeftSemi
295                | JoinType::RightSemi
296                | JoinType::Right
297                | JoinType::RightAnti
298                | JoinType::RightMark => EmissionType::Incremental,
299                // If we need to generate unmatched rows from the *build side*,
300                // we need to emit them at the end.
301                JoinType::Left
302                | JoinType::LeftAnti
303                | JoinType::LeftMark
304                | JoinType::Full => EmissionType::Both,
305            }
306        } else {
307            right.pipeline_behavior()
308        };
309
310        if let Some(projection) = projection {
311            // construct a map from the input expressions to the output expression of the Projection
312            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
313            let out_schema = project_schema(schema, Some(projection))?;
314            output_partitioning =
315                output_partitioning.project(&projection_mapping, &eq_properties);
316            eq_properties = eq_properties.project(&projection_mapping, out_schema);
317        }
318
319        Ok(PlanProperties::new(
320            eq_properties,
321            output_partitioning,
322            emission_type,
323            boundedness_from_children([left, right]),
324        ))
325    }
326
327    /// This join implementation does not preserve the input order of either side.
328    fn maintains_input_order(_join_type: JoinType) -> Vec<bool> {
329        vec![false, false]
330    }
331
332    pub fn contains_projection(&self) -> bool {
333        self.projection.is_some()
334    }
335
336    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
337        // check if the projection is valid
338        can_project(&self.schema(), projection.as_ref())?;
339        let projection = match projection {
340            Some(projection) => match &self.projection {
341                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
342                None => Some(projection),
343            },
344            None => None,
345        };
346        Self::try_new(
347            Arc::clone(&self.left),
348            Arc::clone(&self.right),
349            self.filter.clone(),
350            &self.join_type,
351            projection,
352        )
353    }
354
355    /// Returns a new `ExecutionPlan` that runs NestedLoopsJoins with the left
356    /// and right inputs swapped.
357    ///
358    /// # Notes:
359    ///
360    /// This function should be called BEFORE inserting any repartitioning
361    /// operators on the join's children. Check [`super::HashJoinExec::swap_inputs`]
362    /// for more details.
363    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
364        let left = self.left();
365        let right = self.right();
366        let new_join = NestedLoopJoinExec::try_new(
367            Arc::clone(right),
368            Arc::clone(left),
369            self.filter().map(JoinFilter::swap),
370            &self.join_type().swap(),
371            swap_join_projection(
372                left.schema().fields().len(),
373                right.schema().fields().len(),
374                self.projection.as_ref(),
375                self.join_type(),
376            ),
377        )?;
378
379        // For Semi/Anti joins, swap result will produce same output schema,
380        // no need to wrap them into additional projection
381        let plan: Arc<dyn ExecutionPlan> = if matches!(
382            self.join_type(),
383            JoinType::LeftSemi
384                | JoinType::RightSemi
385                | JoinType::LeftAnti
386                | JoinType::RightAnti
387                | JoinType::LeftMark
388                | JoinType::RightMark
389        ) || self.projection.is_some()
390        {
391            Arc::new(new_join)
392        } else {
393            reorder_output_after_swap(
394                Arc::new(new_join),
395                &self.left().schema(),
396                &self.right().schema(),
397            )?
398        };
399
400        Ok(plan)
401    }
402}
403
404impl DisplayAs for NestedLoopJoinExec {
405    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
406        match t {
407            DisplayFormatType::Default | DisplayFormatType::Verbose => {
408                let display_filter = self.filter.as_ref().map_or_else(
409                    || "".to_string(),
410                    |f| format!(", filter={}", f.expression()),
411                );
412                let display_projections = if self.contains_projection() {
413                    format!(
414                        ", projection=[{}]",
415                        self.projection
416                            .as_ref()
417                            .unwrap()
418                            .iter()
419                            .map(|index| format!(
420                                "{}@{}",
421                                self.join_schema.fields().get(*index).unwrap().name(),
422                                index
423                            ))
424                            .collect::<Vec<_>>()
425                            .join(", ")
426                    )
427                } else {
428                    "".to_string()
429                };
430                write!(
431                    f,
432                    "NestedLoopJoinExec: join_type={:?}{}{}",
433                    self.join_type, display_filter, display_projections
434                )
435            }
436            DisplayFormatType::TreeRender => {
437                if *self.join_type() != JoinType::Inner {
438                    writeln!(f, "join_type={:?}", self.join_type)
439                } else {
440                    Ok(())
441                }
442            }
443        }
444    }
445}
446
447impl ExecutionPlan for NestedLoopJoinExec {
448    fn name(&self) -> &'static str {
449        "NestedLoopJoinExec"
450    }
451
452    fn as_any(&self) -> &dyn Any {
453        self
454    }
455
456    fn properties(&self) -> &PlanProperties {
457        &self.cache
458    }
459
460    fn required_input_distribution(&self) -> Vec<Distribution> {
461        vec![
462            Distribution::SinglePartition,
463            Distribution::UnspecifiedDistribution,
464        ]
465    }
466
467    fn maintains_input_order(&self) -> Vec<bool> {
468        Self::maintains_input_order(self.join_type)
469    }
470
471    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
472        vec![&self.left, &self.right]
473    }
474
475    fn with_new_children(
476        self: Arc<Self>,
477        children: Vec<Arc<dyn ExecutionPlan>>,
478    ) -> Result<Arc<dyn ExecutionPlan>> {
479        Ok(Arc::new(NestedLoopJoinExec::try_new(
480            Arc::clone(&children[0]),
481            Arc::clone(&children[1]),
482            self.filter.clone(),
483            &self.join_type,
484            self.projection.clone(),
485        )?))
486    }
487
488    fn execute(
489        &self,
490        partition: usize,
491        context: Arc<TaskContext>,
492    ) -> Result<SendableRecordBatchStream> {
493        assert_eq_or_internal_err!(
494            self.left.output_partitioning().partition_count(),
495            1,
496            "Invalid NestedLoopJoinExec, the output partition count of the left child must be 1,\
497                 consider using CoalescePartitionsExec or the EnforceDistribution rule"
498        );
499
500        let metrics = NestedLoopJoinMetrics::new(&self.metrics, partition);
501
502        // Initialization reservation for load of inner table
503        let load_reservation =
504            MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
505                .register(context.memory_pool());
506
507        let build_side_data = self.build_side_data.try_once(|| {
508            let stream = self.left.execute(0, Arc::clone(&context))?;
509
510            Ok(collect_left_input(
511                stream,
512                metrics.join_metrics.clone(),
513                load_reservation,
514                need_produce_result_in_final(self.join_type),
515                self.right().output_partitioning().partition_count(),
516            ))
517        })?;
518
519        let batch_size = context.session_config().batch_size();
520
521        let probe_side_data = self.right.execute(partition, context)?;
522
523        // update column indices to reflect the projection
524        let column_indices_after_projection = match &self.projection {
525            Some(projection) => projection
526                .iter()
527                .map(|i| self.column_indices[*i].clone())
528                .collect(),
529            None => self.column_indices.clone(),
530        };
531
532        Ok(Box::pin(NestedLoopJoinStream::new(
533            self.schema(),
534            self.filter.clone(),
535            self.join_type,
536            probe_side_data,
537            build_side_data,
538            column_indices_after_projection,
539            metrics,
540            batch_size,
541        )))
542    }
543
544    fn metrics(&self) -> Option<MetricsSet> {
545        Some(self.metrics.clone_inner())
546    }
547
548    fn statistics(&self) -> Result<Statistics> {
549        self.partition_statistics(None)
550    }
551
552    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
553        // NestedLoopJoinExec is designed for joins without equijoin keys in the
554        // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join
555        // predicates are stored in `self.filter`, but `estimate_join_statistics`
556        // currently doesn't support selectivity estimation for such arbitrary
557        // filter expressions. We pass an empty join column list, which means
558        // the cardinality estimation cannot use column statistics and returns
559        // unknown row counts.
560        let join_columns = Vec::new();
561
562        // Left side is always a single partition (Distribution::SinglePartition),
563        // so we always request overall stats with `None`. Right side can have
564        // multiple partitions, so we forward the partition parameter to get
565        // partition-specific statistics when requested.
566        let left_stats = self.left.partition_statistics(None)?;
567        let right_stats = match partition {
568            Some(partition) => self.right.partition_statistics(Some(partition))?,
569            None => self.right.partition_statistics(None)?,
570        };
571
572        let stats = estimate_join_statistics(
573            left_stats,
574            right_stats,
575            &join_columns,
576            &self.join_type,
577            &self.join_schema,
578        )?;
579
580        Ok(stats.project(self.projection.as_ref()))
581    }
582
583    /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the
584    /// pushdown and returns a new [`NestedLoopJoinExec`] as the top plan which has projections
585    /// as its children. Otherwise, returns `None`.
586    fn try_swapping_with_projection(
587        &self,
588        projection: &ProjectionExec,
589    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
590        // TODO: currently if there is projection in NestedLoopJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
591        if self.contains_projection() {
592            return Ok(None);
593        }
594
595        let schema = self.schema();
596        if let Some(JoinData {
597            projected_left_child,
598            projected_right_child,
599            join_filter,
600            ..
601        }) = try_pushdown_through_join(
602            projection,
603            self.left(),
604            self.right(),
605            &[],
606            &schema,
607            self.filter(),
608        )? {
609            Ok(Some(Arc::new(NestedLoopJoinExec::try_new(
610                Arc::new(projected_left_child),
611                Arc::new(projected_right_child),
612                join_filter,
613                self.join_type(),
614                // Returned early if projection is not None
615                None,
616            )?)))
617        } else {
618            try_embed_projection(projection, self)
619        }
620    }
621}
622
623impl EmbeddedProjection for NestedLoopJoinExec {
624    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
625        self.with_projection(projection)
626    }
627}
628
629/// Left (build-side) data
630pub(crate) struct JoinLeftData {
631    /// Build-side data collected to single batch
632    batch: RecordBatch,
633    /// Shared bitmap builder for visited left indices
634    bitmap: SharedBitmapBuilder,
635    /// Counter of running probe-threads, potentially able to update `bitmap`
636    probe_threads_counter: AtomicUsize,
637    /// Memory reservation for tracking batch and bitmap
638    /// Cleared on `JoinLeftData` drop
639    /// reservation is cleared on Drop
640    #[expect(dead_code)]
641    reservation: MemoryReservation,
642}
643
644impl JoinLeftData {
645    pub(crate) fn new(
646        batch: RecordBatch,
647        bitmap: SharedBitmapBuilder,
648        probe_threads_counter: AtomicUsize,
649        reservation: MemoryReservation,
650    ) -> Self {
651        Self {
652            batch,
653            bitmap,
654            probe_threads_counter,
655            reservation,
656        }
657    }
658
659    pub(crate) fn batch(&self) -> &RecordBatch {
660        &self.batch
661    }
662
663    pub(crate) fn bitmap(&self) -> &SharedBitmapBuilder {
664        &self.bitmap
665    }
666
667    /// Decrements counter of running threads, and returns `true`
668    /// if caller is the last running thread
669    pub(crate) fn report_probe_completed(&self) -> bool {
670        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
671    }
672}
673
674/// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it
675async fn collect_left_input(
676    stream: SendableRecordBatchStream,
677    join_metrics: BuildProbeJoinMetrics,
678    reservation: MemoryReservation,
679    with_visited_left_side: bool,
680    probe_threads_count: usize,
681) -> Result<JoinLeftData> {
682    let schema = stream.schema();
683
684    // Load all batches and count the rows
685    let (batches, metrics, mut reservation) = stream
686        .try_fold(
687            (Vec::new(), join_metrics, reservation),
688            |(mut batches, metrics, mut reservation), batch| async {
689                let batch_size = batch.get_array_memory_size();
690                // Reserve memory for incoming batch
691                reservation.try_grow(batch_size)?;
692                // Update metrics
693                metrics.build_mem_used.add(batch_size);
694                metrics.build_input_batches.add(1);
695                metrics.build_input_rows.add(batch.num_rows());
696                // Push batch to output
697                batches.push(batch);
698                Ok((batches, metrics, reservation))
699            },
700        )
701        .await?;
702
703    let merged_batch = concat_batches(&schema, &batches)?;
704
705    // Reserve memory for visited_left_side bitmap if required by join type
706    let visited_left_side = if with_visited_left_side {
707        let n_rows = merged_batch.num_rows();
708        let buffer_size = n_rows.div_ceil(8);
709        reservation.try_grow(buffer_size)?;
710        metrics.build_mem_used.add(buffer_size);
711
712        let mut buffer = BooleanBufferBuilder::new(n_rows);
713        buffer.append_n(n_rows, false);
714        buffer
715    } else {
716        BooleanBufferBuilder::new(0)
717    };
718
719    Ok(JoinLeftData::new(
720        merged_batch,
721        Mutex::new(visited_left_side),
722        AtomicUsize::new(probe_threads_count),
723        reservation,
724    ))
725}
726
727/// States for join processing. See `poll_next()` comment for more details about
728/// state transitions.
729#[derive(Debug, Clone, Copy)]
730enum NLJState {
731    BufferingLeft,
732    FetchingRight,
733    ProbeRight,
734    EmitRightUnmatched,
735    EmitLeftUnmatched,
736    Done,
737}
738pub(crate) struct NestedLoopJoinStream {
739    // ========================================================================
740    // PROPERTIES:
741    // Operator's properties that remain constant
742    //
743    // Note: The implementation uses the terms left/build-side table and
744    // right/probe-side table interchangeably. Treating the left side as the
745    // build side is a convention in DataFusion: the planner always tries to
746    // swap the smaller table to the left side.
747    // ========================================================================
748    /// Output schema
749    pub(crate) output_schema: Arc<Schema>,
750    /// join filter
751    pub(crate) join_filter: Option<JoinFilter>,
752    /// type of the join
753    pub(crate) join_type: JoinType,
754    /// the probe-side(right) table data of the nested loop join
755    pub(crate) right_data: SendableRecordBatchStream,
756    /// the build-side table data of the nested loop join
757    pub(crate) left_data: OnceFut<JoinLeftData>,
758    /// Projection to construct the output schema from the left and right tables.
759    /// Example:
760    /// - output_schema: ['a', 'c']
761    /// - left_schema: ['a', 'b']
762    /// - right_schema: ['c']
763    ///
764    /// The column indices would be [(left, 0), (right, 0)] -- taking the left
765    /// 0th column and right 0th column can construct the output schema.
766    ///
767    /// Note there are other columns ('b' in the example) still kept after
768    /// projection pushdown; this is because they might be used to evaluate
769    /// the join filter (e.g., `JOIN ON (b+c)>0`).
770    pub(crate) column_indices: Vec<ColumnIndex>,
771    /// Join execution metrics
772    pub(crate) metrics: NestedLoopJoinMetrics,
773
774    /// `batch_size` from configuration
775    batch_size: usize,
776
777    /// See comments in [`need_produce_right_in_final`] for more detail
778    should_track_unmatched_right: bool,
779
780    // ========================================================================
781    // STATE FLAGS/BUFFERS:
782    // Fields that hold intermediate data/flags during execution
783    // ========================================================================
784    /// State Tracking
785    state: NLJState,
786    /// Output buffer holds the join result to output. It will emit eagerly when
787    /// the threshold is reached.
788    output_buffer: Box<BatchCoalescer>,
789    /// See comments in [`NLJState::Done`] for its purpose
790    handled_empty_output: bool,
791
792    // Buffer(left) side
793    // -----------------
794    /// The current buffered left data to join
795    buffered_left_data: Option<Arc<JoinLeftData>>,
796    /// Index into the left buffered batch. Used in `ProbeRight` state
797    left_probe_idx: usize,
798    /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state
799    left_emit_idx: usize,
800    /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched`
801    /// state is over.
802    left_exhausted: bool,
803    /// If we can buffer all left data in one pass
804    /// TODO(now): this is for the (unimplemented) memory-limited execution
805    #[expect(dead_code)]
806    left_buffered_in_one_pass: bool,
807
808    // Probe(right) side
809    // -----------------
810    /// The current probe batch to process
811    current_right_batch: Option<RecordBatch>,
812    // For right join, keep track of matched rows in `current_right_batch`
813    // Constructed when fetching each new incoming right batch in `FetchingRight` state.
814    current_right_batch_matched: Option<BooleanArray>,
815}
816
817pub(crate) struct NestedLoopJoinMetrics {
818    /// Join execution metrics
819    pub(crate) join_metrics: BuildProbeJoinMetrics,
820    /// Selectivity of the join: output_rows / (left_rows * right_rows)
821    pub(crate) selectivity: RatioMetrics,
822}
823
824impl NestedLoopJoinMetrics {
825    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
826        Self {
827            join_metrics: BuildProbeJoinMetrics::new(partition, metrics),
828            selectivity: MetricBuilder::new(metrics)
829                .with_type(MetricType::SUMMARY)
830                .ratio_metrics("selectivity", partition),
831        }
832    }
833}
834
835impl Stream for NestedLoopJoinStream {
836    type Item = Result<RecordBatch>;
837
838    /// See the comments [`NestedLoopJoinExec`] for high-level design ideas.
839    ///
840    /// # Implementation
841    ///
842    /// This function is the entry point of NLJ operator's state machine
843    /// transitions. The rough state transition graph is as follow, for more
844    /// details see the comment in each state's matching arm.
845    ///
846    /// ============================
847    /// State transition graph:
848    /// ============================
849    ///
850    /// (start) --> BufferingLeft
851    /// ----------------------------
852    /// BufferingLeft → FetchingRight
853    ///
854    /// FetchingRight → ProbeRight (if right batch available)
855    /// FetchingRight → EmitLeftUnmatched (if right exhausted)
856    ///
857    /// ProbeRight → ProbeRight (next left row or after yielding output)
858    /// ProbeRight → EmitRightUnmatched (for special join types like right join)
859    /// ProbeRight → FetchingRight (done with the current right batch)
860    ///
861    /// EmitRightUnmatched → FetchingRight
862    ///
863    /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each
864    /// iteration)
865    /// EmitLeftUnmatched → Done (if finished)
866    /// ----------------------------
867    /// Done → (end)
868    fn poll_next(
869        mut self: std::pin::Pin<&mut Self>,
870        cx: &mut std::task::Context<'_>,
871    ) -> Poll<Option<Self::Item>> {
872        loop {
873            match self.state {
874                // # NLJState transitions
875                // --> FetchingRight
876                // This state will prepare the left side batches, next state
877                // `FetchingRight` is responsible for preparing a single probe
878                // side batch, before start joining.
879                NLJState::BufferingLeft => {
880                    debug!("[NLJState] Entering: {:?}", self.state);
881                    // inside `collect_left_input` (the routine to buffer build
882                    // -side batches), related metrics except build time will be
883                    // updated.
884                    // stop on drop
885                    let build_metric = self.metrics.join_metrics.build_time.clone();
886                    let _build_timer = build_metric.timer();
887
888                    match self.handle_buffering_left(cx) {
889                        ControlFlow::Continue(()) => continue,
890                        ControlFlow::Break(poll) => return poll,
891                    }
892                }
893
894                // # NLJState transitions:
895                // 1. --> ProbeRight
896                //    Start processing the join for the newly fetched right
897                //    batch.
898                // 2. --> EmitLeftUnmatched: When the right side input is exhausted, (maybe) emit
899                //    unmatched left side rows.
900                //
901                // After fetching a new batch from the right side, it will
902                // process all rows from the buffered left data:
903                // ```text
904                // for batch in right_side:
905                //     for row in left_buffer:
906                //         join(batch, row)
907                // ```
908                // Note: the implementation does this step incrementally,
909                // instead of materializing all intermediate Cartesian products
910                // at once in memory.
911                //
912                // So after the right side input is exhausted, the join phase
913                // for the current buffered left data is finished. We can go to
914                // the next `EmitLeftUnmatched` phase to check if there is any
915                // special handling (e.g., in cases like left join).
916                NLJState::FetchingRight => {
917                    debug!("[NLJState] Entering: {:?}", self.state);
918                    // stop on drop
919                    let join_metric = self.metrics.join_metrics.join_time.clone();
920                    let _join_timer = join_metric.timer();
921
922                    match self.handle_fetching_right(cx) {
923                        ControlFlow::Continue(()) => continue,
924                        ControlFlow::Break(poll) => return poll,
925                    }
926                }
927
928                // NLJState transitions:
929                // 1. --> ProbeRight(1)
930                //    If we have already buffered enough output to yield, it
931                //    will first give back control to the parent state machine,
932                //    then resume at the same place.
933                // 2. --> ProbeRight(2)
934                //    After probing one right batch, and evaluating the
935                //    join filter on (left-row x right-batch), it will advance
936                //    to the next left row, then re-enter the current state and
937                //    continue joining.
938                // 3. --> FetchRight
939                //    After it has done with the current right batch (to join
940                //    with all rows in the left buffer), it will go to
941                //    FetchRight state to check what to do next.
942                NLJState::ProbeRight => {
943                    debug!("[NLJState] Entering: {:?}", self.state);
944
945                    // stop on drop
946                    let join_metric = self.metrics.join_metrics.join_time.clone();
947                    let _join_timer = join_metric.timer();
948
949                    match self.handle_probe_right() {
950                        ControlFlow::Continue(()) => continue,
951                        ControlFlow::Break(poll) => {
952                            return self.metrics.join_metrics.baseline.record_poll(poll);
953                        }
954                    }
955                }
956
957                // In the `current_right_batch_matched` bitmap, all trues mean
958                // it has been output by the join. In this state we have to
959                // output unmatched rows for current right batch (with null
960                // padding for left relation)
961                // Precondition: we have checked the join type so that it's
962                // possible to output right unmatched (e.g. it's right join)
963                NLJState::EmitRightUnmatched => {
964                    debug!("[NLJState] Entering: {:?}", self.state);
965
966                    // stop on drop
967                    let join_metric = self.metrics.join_metrics.join_time.clone();
968                    let _join_timer = join_metric.timer();
969
970                    match self.handle_emit_right_unmatched() {
971                        ControlFlow::Continue(()) => continue,
972                        ControlFlow::Break(poll) => {
973                            return self.metrics.join_metrics.baseline.record_poll(poll);
974                        }
975                    }
976                }
977
978                // NLJState transitions:
979                // 1. --> EmitLeftUnmatched(1)
980                //    If we have already buffered enough output to yield, it
981                //    will first give back control to the parent state machine,
982                //    then resume at the same place.
983                // 2. --> EmitLeftUnmatched(2)
984                //    After processing some unmatched rows, it will re-enter
985                //    the same state, to check if there are any more final
986                //    results to output.
987                // 3. --> Done
988                //    It has processed all data, go to the final state and ready
989                //    to exit.
990                //
991                // TODO: For memory-limited case, go back to `BufferingLeft`
992                // state again.
993                NLJState::EmitLeftUnmatched => {
994                    debug!("[NLJState] Entering: {:?}", self.state);
995
996                    // stop on drop
997                    let join_metric = self.metrics.join_metrics.join_time.clone();
998                    let _join_timer = join_metric.timer();
999
1000                    match self.handle_emit_left_unmatched() {
1001                        ControlFlow::Continue(()) => continue,
1002                        ControlFlow::Break(poll) => {
1003                            return self.metrics.join_metrics.baseline.record_poll(poll);
1004                        }
1005                    }
1006                }
1007
1008                // The final state and the exit point
1009                NLJState::Done => {
1010                    debug!("[NLJState] Entering: {:?}", self.state);
1011
1012                    // stop on drop
1013                    let join_metric = self.metrics.join_metrics.join_time.clone();
1014                    let _join_timer = join_metric.timer();
1015                    // counting it in join timer due to there might be some
1016                    // final resout batches to output in this state
1017
1018                    let poll = self.handle_done();
1019                    return self.metrics.join_metrics.baseline.record_poll(poll);
1020                }
1021            }
1022        }
1023    }
1024}
1025
1026impl RecordBatchStream for NestedLoopJoinStream {
1027    fn schema(&self) -> SchemaRef {
1028        Arc::clone(&self.output_schema)
1029    }
1030}
1031
1032impl NestedLoopJoinStream {
1033    #[expect(clippy::too_many_arguments)]
1034    pub(crate) fn new(
1035        schema: Arc<Schema>,
1036        filter: Option<JoinFilter>,
1037        join_type: JoinType,
1038        right_data: SendableRecordBatchStream,
1039        left_data: OnceFut<JoinLeftData>,
1040        column_indices: Vec<ColumnIndex>,
1041        metrics: NestedLoopJoinMetrics,
1042        batch_size: usize,
1043    ) -> Self {
1044        Self {
1045            output_schema: Arc::clone(&schema),
1046            join_filter: filter,
1047            join_type,
1048            right_data,
1049            column_indices,
1050            left_data,
1051            metrics,
1052            buffered_left_data: None,
1053            output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)),
1054            batch_size,
1055            current_right_batch: None,
1056            current_right_batch_matched: None,
1057            state: NLJState::BufferingLeft,
1058            left_probe_idx: 0,
1059            left_emit_idx: 0,
1060            left_exhausted: false,
1061            left_buffered_in_one_pass: true,
1062            handled_empty_output: false,
1063            should_track_unmatched_right: need_produce_right_in_final(join_type),
1064        }
1065    }
1066
1067    // ==== State handler functions ====
1068
1069    /// Handle BufferingLeft state - prepare left side batches
1070    fn handle_buffering_left(
1071        &mut self,
1072        cx: &mut std::task::Context<'_>,
1073    ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1074        match self.left_data.get_shared(cx) {
1075            Poll::Ready(Ok(left_data)) => {
1076                self.buffered_left_data = Some(left_data);
1077                // TODO: implement memory-limited case
1078                self.left_exhausted = true;
1079                self.state = NLJState::FetchingRight;
1080                // Continue to next state immediately
1081                ControlFlow::Continue(())
1082            }
1083            Poll::Ready(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1084            Poll::Pending => ControlFlow::Break(Poll::Pending),
1085        }
1086    }
1087
1088    /// Handle FetchingRight state - fetch next right batch and prepare for processing
1089    fn handle_fetching_right(
1090        &mut self,
1091        cx: &mut std::task::Context<'_>,
1092    ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1093        match self.right_data.poll_next_unpin(cx) {
1094            Poll::Ready(result) => match result {
1095                Some(Ok(right_batch)) => {
1096                    // Update metrics
1097                    let right_batch_size = right_batch.num_rows();
1098                    self.metrics.join_metrics.input_rows.add(right_batch_size);
1099                    self.metrics.join_metrics.input_batches.add(1);
1100
1101                    // Skip the empty batch
1102                    if right_batch_size == 0 {
1103                        return ControlFlow::Continue(());
1104                    }
1105
1106                    self.current_right_batch = Some(right_batch);
1107
1108                    // Prepare right bitmap
1109                    if self.should_track_unmatched_right {
1110                        let zeroed_buf = BooleanBuffer::new_unset(right_batch_size);
1111                        self.current_right_batch_matched =
1112                            Some(BooleanArray::new(zeroed_buf, None));
1113                    }
1114
1115                    self.left_probe_idx = 0;
1116                    self.state = NLJState::ProbeRight;
1117                    ControlFlow::Continue(())
1118                }
1119                Some(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1120                None => {
1121                    // Right stream exhausted
1122                    self.state = NLJState::EmitLeftUnmatched;
1123                    ControlFlow::Continue(())
1124                }
1125            },
1126            Poll::Pending => ControlFlow::Break(Poll::Pending),
1127        }
1128    }
1129
1130    /// Handle ProbeRight state - process current probe batch
1131    fn handle_probe_right(&mut self) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1132        // Return any completed batches first
1133        if let Some(poll) = self.maybe_flush_ready_batch() {
1134            return ControlFlow::Break(poll);
1135        }
1136
1137        // Process current probe state
1138        match self.process_probe_batch() {
1139            // State unchanged (ProbeRight)
1140            // Continue probing until we have done joining the
1141            // current right batch with all buffered left rows.
1142            Ok(true) => ControlFlow::Continue(()),
1143            // To next FetchRightState
1144            // We have finished joining
1145            // (cur_right_batch x buffered_left_batches)
1146            Ok(false) => {
1147                // Left exhausted, transition to FetchingRight
1148                self.left_probe_idx = 0;
1149
1150                // Selectivity Metric: Update total possibilities for the batch (left_rows * right_rows)
1151                // If memory-limited execution is implemented, this logic must be updated accordingly.
1152                if let (Ok(left_data), Some(right_batch)) =
1153                    (self.get_left_data(), self.current_right_batch.as_ref())
1154                {
1155                    let left_rows = left_data.batch().num_rows();
1156                    let right_rows = right_batch.num_rows();
1157                    self.metrics.selectivity.add_total(left_rows * right_rows);
1158                }
1159
1160                if self.should_track_unmatched_right {
1161                    debug_assert!(
1162                        self.current_right_batch_matched.is_some(),
1163                        "If it's required to track matched rows in the right input, the right bitmap must be present"
1164                    );
1165                    self.state = NLJState::EmitRightUnmatched;
1166                } else {
1167                    self.current_right_batch = None;
1168                    self.state = NLJState::FetchingRight;
1169                }
1170                ControlFlow::Continue(())
1171            }
1172            Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1173        }
1174    }
1175
1176    /// Handle EmitRightUnmatched state - emit unmatched right rows
1177    fn handle_emit_right_unmatched(
1178        &mut self,
1179    ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1180        // Return any completed batches first
1181        if let Some(poll) = self.maybe_flush_ready_batch() {
1182            return ControlFlow::Break(poll);
1183        }
1184
1185        debug_assert!(
1186            self.current_right_batch_matched.is_some()
1187                && self.current_right_batch.is_some(),
1188            "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present"
1189        );
1190        // Construct the result batch for unmatched right rows using a utility function
1191        match self.process_right_unmatched() {
1192            Ok(Some(batch)) => {
1193                match self.output_buffer.push_batch(batch) {
1194                    Ok(()) => {
1195                        // Processed all in one pass
1196                        // cleared inside `process_right_unmatched`
1197                        debug_assert!(self.current_right_batch.is_none());
1198                        self.state = NLJState::FetchingRight;
1199                        ControlFlow::Continue(())
1200                    }
1201                    Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))),
1202                }
1203            }
1204            Ok(None) => {
1205                // Processed all in one pass
1206                // cleared inside `process_right_unmatched`
1207                debug_assert!(self.current_right_batch.is_none());
1208                self.state = NLJState::FetchingRight;
1209                ControlFlow::Continue(())
1210            }
1211            Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1212        }
1213    }
1214
1215    /// Handle EmitLeftUnmatched state - emit unmatched left rows
1216    fn handle_emit_left_unmatched(
1217        &mut self,
1218    ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1219        // Return any completed batches first
1220        if let Some(poll) = self.maybe_flush_ready_batch() {
1221            return ControlFlow::Break(poll);
1222        }
1223
1224        // Process current unmatched state
1225        match self.process_left_unmatched() {
1226            // State unchanged (EmitLeftUnmatched)
1227            // Continue processing until we have processed all unmatched rows
1228            Ok(true) => ControlFlow::Continue(()),
1229            // To Done state
1230            // We have finished processing all unmatched rows
1231            Ok(false) => match self.output_buffer.finish_buffered_batch() {
1232                Ok(()) => {
1233                    self.state = NLJState::Done;
1234                    ControlFlow::Continue(())
1235                }
1236                Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))),
1237            },
1238            Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1239        }
1240    }
1241
1242    /// Handle Done state - final state processing
1243    fn handle_done(&mut self) -> Poll<Option<Result<RecordBatch>>> {
1244        // Return any remaining completed batches before final termination
1245        if let Some(poll) = self.maybe_flush_ready_batch() {
1246            return poll;
1247        }
1248
1249        // HACK for the doc test in https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265
1250        // If this operator directly return `Poll::Ready(None)`
1251        // for empty result, the final result will become an empty
1252        // batch with empty schema, however the expected result
1253        // should be with the expected schema for this operator
1254        if !self.handled_empty_output {
1255            let zero_count = Count::new();
1256            if *self.metrics.join_metrics.baseline.output_rows() == zero_count {
1257                let empty_batch = RecordBatch::new_empty(Arc::clone(&self.output_schema));
1258                self.handled_empty_output = true;
1259                return Poll::Ready(Some(Ok(empty_batch)));
1260            }
1261        }
1262
1263        Poll::Ready(None)
1264    }
1265
1266    // ==== Core logic handling for each state ====
1267
1268    /// Returns bool to indicate should it continue probing
1269    /// true -> continue in the same ProbeRight state
1270    /// false -> It has done with the (buffered_left x cur_right_batch), go to
1271    /// next state (ProbeRight)
1272    fn process_probe_batch(&mut self) -> Result<bool> {
1273        let left_data = Arc::clone(self.get_left_data()?);
1274        let right_batch = self
1275            .current_right_batch
1276            .as_ref()
1277            .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))?
1278            .clone();
1279
1280        // stop probing, the caller will go to the next state
1281        if self.left_probe_idx >= left_data.batch().num_rows() {
1282            return Ok(false);
1283        }
1284
1285        // ========
1286        // Join (l_row x right_batch)
1287        // and push the result into output_buffer
1288        // ========
1289
1290        // Special case:
1291        // When the right batch is very small, join with multiple left rows at once,
1292        //
1293        // The regular implementation is not efficient if the plan's right child is
1294        // very small (e.g. 1 row total), because inside the inner loop of NLJ, it's
1295        // handling one input right batch at once, if it's not large enough, the
1296        // overheads like filter evaluation can't be amortized through vectorization.
1297        debug_assert_ne!(
1298            right_batch.num_rows(),
1299            0,
1300            "When fetching the right batch, empty batches will be skipped"
1301        );
1302
1303        let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
1304        if l_row_cnt_ratio > 10 {
1305            // Calculate max left rows to handle at once. This operator tries to handle
1306            // up to `datafusion.execution.batch_size` rows at once in the intermediate
1307            // batch.
1308            let l_row_count = std::cmp::min(
1309                l_row_cnt_ratio,
1310                left_data.batch().num_rows() - self.left_probe_idx,
1311            );
1312
1313            debug_assert!(
1314                l_row_count != 0,
1315                "This function should only be entered when there are remaining left rows to process"
1316            );
1317            let joined_batch = self.process_left_range_join(
1318                &left_data,
1319                &right_batch,
1320                self.left_probe_idx,
1321                l_row_count,
1322            )?;
1323
1324            if let Some(batch) = joined_batch {
1325                self.output_buffer.push_batch(batch)?;
1326            }
1327
1328            self.left_probe_idx += l_row_count;
1329
1330            return Ok(true);
1331        }
1332
1333        let l_idx = self.left_probe_idx;
1334        let joined_batch =
1335            self.process_single_left_row_join(&left_data, &right_batch, l_idx)?;
1336
1337        if let Some(batch) = joined_batch {
1338            self.output_buffer.push_batch(batch)?;
1339        }
1340
1341        // ==== Prepare for the next iteration ====
1342
1343        // Advance left cursor
1344        self.left_probe_idx += 1;
1345
1346        // Return true to continue probing
1347        Ok(true)
1348    }
1349
1350    /// Process [l_start_index, l_start_index + l_count) JOIN right_batch
1351    /// Returns a RecordBatch containing the join results (None if empty)
1352    ///
1353    /// Side Effect: If the join type requires, left or right side matched bitmap
1354    /// will be set for matched indices.
1355    fn process_left_range_join(
1356        &mut self,
1357        left_data: &JoinLeftData,
1358        right_batch: &RecordBatch,
1359        l_start_index: usize,
1360        l_row_count: usize,
1361    ) -> Result<Option<RecordBatch>> {
1362        // Construct the Cartesian product between the specified range of left rows
1363        // and the entire right_batch. First, it calculates the index vectors, then
1364        // materializes the intermediate batch, and finally applies the join filter
1365        // to it.
1366        // -----------------------------------------------------------
1367        let right_rows = right_batch.num_rows();
1368        let total_rows = l_row_count * right_rows;
1369
1370        // Build index arrays for cartesian product: left_range X right_batch
1371        let left_indices: UInt32Array =
1372            UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| {
1373                std::iter::repeat_n((l_start_index + i) as u32, right_rows)
1374            }));
1375        let right_indices: UInt32Array = UInt32Array::from_iter_values(
1376            (0..l_row_count).flat_map(|_| 0..right_rows as u32),
1377        );
1378
1379        debug_assert!(
1380            left_indices.len() == right_indices.len()
1381                && right_indices.len() == total_rows,
1382            "The length or cartesian product should be (left_size * right_size)",
1383        );
1384
1385        // Evaluate the join filter (if any) over an intermediate batch built
1386        // using the filter's own schema/column indices.
1387        let bitmap_combined = if let Some(filter) = &self.join_filter {
1388            // Build the intermediate batch for filter evaluation
1389            let intermediate_batch = if filter.schema.fields().is_empty() {
1390                // Constant predicate (e.g., TRUE/FALSE). Use an empty schema with row_count
1391                create_record_batch_with_empty_schema(
1392                    Arc::new((*filter.schema).clone()),
1393                    total_rows,
1394                )?
1395            } else {
1396                let mut filter_columns: Vec<Arc<dyn Array>> =
1397                    Vec::with_capacity(filter.column_indices().len());
1398                for column_index in filter.column_indices() {
1399                    let array = if column_index.side == JoinSide::Left {
1400                        let col = left_data.batch().column(column_index.index);
1401                        take(col.as_ref(), &left_indices, None)?
1402                    } else {
1403                        let col = right_batch.column(column_index.index);
1404                        take(col.as_ref(), &right_indices, None)?
1405                    };
1406                    filter_columns.push(array);
1407                }
1408
1409                RecordBatch::try_new(Arc::new((*filter.schema).clone()), filter_columns)?
1410            };
1411
1412            let filter_result = filter
1413                .expression()
1414                .evaluate(&intermediate_batch)?
1415                .into_array(intermediate_batch.num_rows())?;
1416            let filter_arr = as_boolean_array(&filter_result)?;
1417
1418            // Combine with null bitmap to get a unified mask
1419            boolean_mask_from_filter(filter_arr)
1420        } else {
1421            // No filter: all pairs match
1422            BooleanArray::from(vec![true; total_rows])
1423        };
1424
1425        // Update the global left or right bitmap for matched indices
1426        // -----------------------------------------------------------
1427
1428        // None means we don't have to update left bitmap for this join type
1429        let mut left_bitmap = if need_produce_result_in_final(self.join_type) {
1430            Some(left_data.bitmap().lock())
1431        } else {
1432            None
1433        };
1434
1435        // 'local' meaning: we want to collect 'is_matched' flag for the current
1436        // right batch, after it has joining all of the left buffer, here it's only
1437        // the partial result for joining given left range
1438        let mut local_right_bitmap = if self.should_track_unmatched_right {
1439            let mut current_right_batch_bitmap = BooleanBufferBuilder::new(right_rows);
1440            // Ensure builder has logical length so set_bit is in-bounds
1441            current_right_batch_bitmap.append_n(right_rows, false);
1442            Some(current_right_batch_bitmap)
1443        } else {
1444            None
1445        };
1446
1447        // Set the matched bit for left and right side bitmap
1448        for (i, is_matched) in bitmap_combined.iter().enumerate() {
1449            let is_matched = is_matched.ok_or_else(|| {
1450                internal_datafusion_err!("Must be Some after the previous combining step")
1451            })?;
1452
1453            let l_index = l_start_index + i / right_rows;
1454            let r_index = i % right_rows;
1455
1456            if let Some(bitmap) = left_bitmap.as_mut()
1457                && is_matched
1458            {
1459                // Map local index back to absolute left index within the batch
1460                bitmap.set_bit(l_index, true);
1461            }
1462
1463            if let Some(bitmap) = local_right_bitmap.as_mut()
1464                && is_matched
1465            {
1466                bitmap.set_bit(r_index, true);
1467            }
1468        }
1469
1470        // Apply the local right bitmap to the global bitmap
1471        if self.should_track_unmatched_right {
1472            // Remember to put it back after update
1473            let global_right_bitmap =
1474                std::mem::take(&mut self.current_right_batch_matched).ok_or_else(
1475                    || internal_datafusion_err!("right batch's bitmap should be present"),
1476                )?;
1477            let (buf, nulls) = global_right_bitmap.into_parts();
1478            debug_assert!(nulls.is_none());
1479
1480            let current_right_bitmap = local_right_bitmap
1481                .ok_or_else(|| {
1482                    internal_datafusion_err!(
1483                        "Should be Some if the current join type requires right bitmap"
1484                    )
1485                })?
1486                .finish();
1487            let updated_global_right_bitmap = buf.bitor(&current_right_bitmap);
1488
1489            self.current_right_batch_matched =
1490                Some(BooleanArray::new(updated_global_right_bitmap, None));
1491        }
1492
1493        // For the following join types: only bitmaps are updated; do not emit rows now
1494        if matches!(
1495            self.join_type,
1496            JoinType::LeftAnti
1497                | JoinType::LeftSemi
1498                | JoinType::LeftMark
1499                | JoinType::RightAnti
1500                | JoinType::RightMark
1501                | JoinType::RightSemi
1502        ) {
1503            return Ok(None);
1504        }
1505
1506        // Build the projected output batch (using output schema/column_indices),
1507        // then apply the bitmap filter to it.
1508        if self.output_schema.fields().is_empty() {
1509            // Empty projection: only row count matters
1510            let row_count = bitmap_combined.true_count();
1511            return Ok(Some(create_record_batch_with_empty_schema(
1512                Arc::clone(&self.output_schema),
1513                row_count,
1514            )?));
1515        }
1516
1517        let mut out_columns: Vec<Arc<dyn Array>> =
1518            Vec::with_capacity(self.output_schema.fields().len());
1519        for column_index in &self.column_indices {
1520            let array = if column_index.side == JoinSide::Left {
1521                let col = left_data.batch().column(column_index.index);
1522                take(col.as_ref(), &left_indices, None)?
1523            } else {
1524                let col = right_batch.column(column_index.index);
1525                take(col.as_ref(), &right_indices, None)?
1526            };
1527            out_columns.push(array);
1528        }
1529        let pre_filtered =
1530            RecordBatch::try_new(Arc::clone(&self.output_schema), out_columns)?;
1531        let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?;
1532        Ok(Some(filtered))
1533    }
1534
1535    /// Process a single left row join with the current right batch.
1536    /// Returns a RecordBatch containing the join results (None if empty)
1537    ///
1538    /// Side Effect: If the join type requires, left or right side matched bitmap
1539    /// will be set for matched indices.
1540    fn process_single_left_row_join(
1541        &mut self,
1542        left_data: &JoinLeftData,
1543        right_batch: &RecordBatch,
1544        l_index: usize,
1545    ) -> Result<Option<RecordBatch>> {
1546        let right_row_count = right_batch.num_rows();
1547        if right_row_count == 0 {
1548            return Ok(None);
1549        }
1550
1551        let cur_right_bitmap = if let Some(filter) = &self.join_filter {
1552            apply_filter_to_row_join_batch(
1553                left_data.batch(),
1554                l_index,
1555                right_batch,
1556                filter,
1557            )?
1558        } else {
1559            BooleanArray::from(vec![true; right_row_count])
1560        };
1561
1562        self.update_matched_bitmap(l_index, &cur_right_bitmap)?;
1563
1564        // For the following join types: here we only have to set the left/right
1565        // bitmap, and no need to output result
1566        if matches!(
1567            self.join_type,
1568            JoinType::LeftAnti
1569                | JoinType::LeftSemi
1570                | JoinType::LeftMark
1571                | JoinType::RightAnti
1572                | JoinType::RightMark
1573                | JoinType::RightSemi
1574        ) {
1575            return Ok(None);
1576        }
1577
1578        if cur_right_bitmap.true_count() == 0 {
1579            // If none of the pairs has passed the join predicate/filter
1580            Ok(None)
1581        } else {
1582            // Use the optimized approach similar to build_intermediate_batch_for_single_left_row
1583            let join_batch = build_row_join_batch(
1584                &self.output_schema,
1585                left_data.batch(),
1586                l_index,
1587                right_batch,
1588                Some(cur_right_bitmap),
1589                &self.column_indices,
1590                JoinSide::Left,
1591            )?;
1592            Ok(join_batch)
1593        }
1594    }
1595
1596    /// Returns bool to indicate should it continue processing unmatched rows
1597    /// true -> continue in the same EmitLeftUnmatched state
1598    /// false -> next state (Done)
1599    fn process_left_unmatched(&mut self) -> Result<bool> {
1600        let left_data = self.get_left_data()?;
1601        let left_batch = left_data.batch();
1602
1603        // ========
1604        // Check early return conditions
1605        // ========
1606
1607        // Early return if join type can't have unmatched rows
1608        let join_type_no_produce_left = !need_produce_result_in_final(self.join_type);
1609        // Early return if another thread is already processing unmatched rows
1610        let handled_by_other_partition =
1611            self.left_emit_idx == 0 && !left_data.report_probe_completed();
1612        // Stop processing unmatched rows, the caller will go to the next state
1613        let finished = self.left_emit_idx >= left_batch.num_rows();
1614
1615        if join_type_no_produce_left || handled_by_other_partition || finished {
1616            return Ok(false);
1617        }
1618
1619        // ========
1620        // Process unmatched rows and push the result into output_buffer
1621        // Each time, the number to process is up to batch size
1622        // ========
1623        let start_idx = self.left_emit_idx;
1624        let end_idx = std::cmp::min(start_idx + self.batch_size, left_batch.num_rows());
1625
1626        if let Some(batch) =
1627            self.process_left_unmatched_range(left_data, start_idx, end_idx)?
1628        {
1629            self.output_buffer.push_batch(batch)?;
1630        }
1631
1632        // ==== Prepare for the next iteration ====
1633        self.left_emit_idx = end_idx;
1634
1635        // Return true to continue processing unmatched rows
1636        Ok(true)
1637    }
1638
1639    /// Process unmatched rows from the left data within the specified range.
1640    /// Returns a RecordBatch containing the unmatched rows (None if empty).
1641    ///
1642    /// # Arguments
1643    /// * `left_data` - The left side data containing the batch and bitmap
1644    /// * `start_idx` - Start index (inclusive) of the range to process
1645    /// * `end_idx` - End index (exclusive) of the range to process
1646    ///
1647    /// # Safety
1648    /// The caller is responsible for ensuring that `start_idx` and `end_idx` are
1649    /// within valid bounds of the left batch. This function does not perform
1650    /// bounds checking.
1651    fn process_left_unmatched_range(
1652        &self,
1653        left_data: &JoinLeftData,
1654        start_idx: usize,
1655        end_idx: usize,
1656    ) -> Result<Option<RecordBatch>> {
1657        if start_idx == end_idx {
1658            return Ok(None);
1659        }
1660
1661        // Slice both left batch, and bitmap to range [start_idx, end_idx)
1662        // The range is bit index (not byte)
1663        let left_batch = left_data.batch();
1664        let left_batch_sliced = left_batch.slice(start_idx, end_idx - start_idx);
1665
1666        // Can this be more efficient?
1667        let mut bitmap_sliced = BooleanBufferBuilder::new(end_idx - start_idx);
1668        bitmap_sliced.append_n(end_idx - start_idx, false);
1669        let bitmap = left_data.bitmap().lock();
1670        for i in start_idx..end_idx {
1671            assert!(
1672                i - start_idx < bitmap_sliced.capacity(),
1673                "DBG: {start_idx}, {end_idx}"
1674            );
1675            bitmap_sliced.set_bit(i - start_idx, bitmap.get_bit(i));
1676        }
1677        let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None);
1678
1679        let right_schema = self.right_data.schema();
1680        build_unmatched_batch(
1681            &self.output_schema,
1682            &left_batch_sliced,
1683            bitmap_sliced,
1684            &right_schema,
1685            &self.column_indices,
1686            self.join_type,
1687            JoinSide::Left,
1688        )
1689    }
1690
1691    /// Process unmatched rows from the current right batch and reset the bitmap.
1692    /// Returns a RecordBatch containing the unmatched right rows (None if empty).
1693    fn process_right_unmatched(&mut self) -> Result<Option<RecordBatch>> {
1694        // ==== Take current right batch and its bitmap ====
1695        let right_batch_bitmap: BooleanArray =
1696            std::mem::take(&mut self.current_right_batch_matched).ok_or_else(|| {
1697                internal_datafusion_err!("right bitmap should be available")
1698            })?;
1699
1700        let right_batch = self.current_right_batch.take();
1701        let cur_right_batch = unwrap_or_internal_err!(right_batch);
1702
1703        let left_data = self.get_left_data()?;
1704        let left_schema = left_data.batch().schema();
1705
1706        let res = build_unmatched_batch(
1707            &self.output_schema,
1708            &cur_right_batch,
1709            right_batch_bitmap,
1710            &left_schema,
1711            &self.column_indices,
1712            self.join_type,
1713            JoinSide::Right,
1714        );
1715
1716        // ==== Clean-up ====
1717        self.current_right_batch_matched = None;
1718
1719        res
1720    }
1721
1722    // ==== Utilities ====
1723
1724    /// Get the build-side data of the left input, errors if it's None
1725    fn get_left_data(&self) -> Result<&Arc<JoinLeftData>> {
1726        self.buffered_left_data
1727            .as_ref()
1728            .ok_or_else(|| internal_datafusion_err!("LeftData should be available"))
1729    }
1730
1731    /// Flush the `output_buffer` if there are batches ready to output
1732    /// None if no result batch ready.
1733    fn maybe_flush_ready_batch(&mut self) -> Option<Poll<Option<Result<RecordBatch>>>> {
1734        if self.output_buffer.has_completed_batch()
1735            && let Some(batch) = self.output_buffer.next_completed_batch()
1736        {
1737            // Update output rows for selectivity metric
1738            let output_rows = batch.num_rows();
1739            self.metrics.selectivity.add_part(output_rows);
1740
1741            return Some(Poll::Ready(Some(Ok(batch))));
1742        }
1743
1744        None
1745    }
1746
1747    /// After joining (l_index@left_buffer x current_right_batch), it will result
1748    /// in a bitmap (the same length as current_right_batch) as the join match
1749    /// result. Use this bitmap to update the global bitmap, for special join
1750    /// types like full joins.
1751    ///
1752    /// Example:
1753    /// After joining l_index=1 (1-indexed row in the left buffer), and the
1754    /// current right batch with 3 elements, this function will be called with
1755    /// arguments: l_index = 1, r_matched = [false, false, true]
1756    /// - If the join type is FullJoin, the 1-index in the left bitmap will be
1757    ///   set to true, and also the right bitmap will be bitwise-ORed with the
1758    ///   input r_matched bitmap.
1759    /// - For join types that don't require output unmatched rows, this
1760    ///   function can be a no-op. For inner joins, this function is a no-op; for left
1761    ///   joins, only the left bitmap may be updated.
1762    fn update_matched_bitmap(
1763        &mut self,
1764        l_index: usize,
1765        r_matched_bitmap: &BooleanArray,
1766    ) -> Result<()> {
1767        let left_data = self.get_left_data()?;
1768
1769        // number of successfully joined pairs from (l_index x cur_right_batch)
1770        let joined_len = r_matched_bitmap.true_count();
1771
1772        // 1. Maybe update the left bitmap
1773        if need_produce_result_in_final(self.join_type) && (joined_len > 0) {
1774            let mut bitmap = left_data.bitmap().lock();
1775            bitmap.set_bit(l_index, true);
1776        }
1777
1778        // 2. Maybe updateh the right bitmap
1779        if self.should_track_unmatched_right {
1780            debug_assert!(self.current_right_batch_matched.is_some());
1781            // after bit-wise or, it will be put back
1782            let right_bitmap = std::mem::take(&mut self.current_right_batch_matched)
1783                .ok_or_else(|| {
1784                    internal_datafusion_err!("right batch's bitmap should be present")
1785                })?;
1786            let (buf, nulls) = right_bitmap.into_parts();
1787            debug_assert!(nulls.is_none());
1788            let updated_right_bitmap = buf.bitor(r_matched_bitmap.values());
1789
1790            self.current_right_batch_matched =
1791                Some(BooleanArray::new(updated_right_bitmap, None));
1792        }
1793
1794        Ok(())
1795    }
1796}
1797
1798// ==== Utilities ====
1799
1800/// Apply the join filter between:
1801/// (l_index th row in left buffer) x (right batch)
1802/// Returns a bitmap, with successfully joined indices set to true
1803fn apply_filter_to_row_join_batch(
1804    left_batch: &RecordBatch,
1805    l_index: usize,
1806    right_batch: &RecordBatch,
1807    filter: &JoinFilter,
1808) -> Result<BooleanArray> {
1809    debug_assert!(left_batch.num_rows() != 0 && right_batch.num_rows() != 0);
1810
1811    let intermediate_batch = if filter.schema.fields().is_empty() {
1812        // If filter is constant (e.g. literal `true`), empty batch can be used
1813        // in the later filter step.
1814        create_record_batch_with_empty_schema(
1815            Arc::new((*filter.schema).clone()),
1816            right_batch.num_rows(),
1817        )?
1818    } else {
1819        build_row_join_batch(
1820            &filter.schema,
1821            left_batch,
1822            l_index,
1823            right_batch,
1824            None,
1825            &filter.column_indices,
1826            JoinSide::Left,
1827        )?
1828        .ok_or_else(|| internal_datafusion_err!("This function assume input batch is not empty, so the intermediate batch can't be empty too"))?
1829    };
1830
1831    let filter_result = filter
1832        .expression()
1833        .evaluate(&intermediate_batch)?
1834        .into_array(intermediate_batch.num_rows())?;
1835    let filter_arr = as_boolean_array(&filter_result)?;
1836
1837    // Convert boolean array with potential nulls into a unified mask bitmap
1838    let bitmap_combined = boolean_mask_from_filter(filter_arr);
1839
1840    Ok(bitmap_combined)
1841}
1842
1843/// Convert a boolean filter array into a unified mask bitmap.
1844///
1845/// Caution: The filter result is NOT a bitmap; it contains true/false/null values.
1846/// For example, `1 < NULL` evaluates to NULL. Therefore, we must combine (AND)
1847/// the boolean array with its null bitmap to construct a unified bitmap.
1848#[inline]
1849fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray {
1850    let (values, nulls) = filter_arr.clone().into_parts();
1851    match nulls {
1852        Some(nulls) => BooleanArray::new(nulls.inner() & &values, None),
1853        None => BooleanArray::new(values, None),
1854    }
1855}
1856
1857/// This function performs the following steps:
1858/// 1. Apply filter to probe-side batch
1859/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the
1860///    filtered probe-side batch
1861/// 3. Concat them together according to `col_indices`, and return the result
1862///    (None if the result is empty)
1863///
1864/// Example:
1865/// build_side_batch:
1866/// a
1867/// ----
1868/// 1
1869/// 2
1870/// 3
1871///
1872/// # 0 index element in the build_side_batch (that is `1`) will be used
1873/// build_side_index: 0
1874///
1875/// probe_side_batch:
1876/// b
1877/// ----
1878/// 10
1879/// 20
1880/// 30
1881/// 40
1882///
1883/// # After applying it, only index 1 and 3 elements in probe_side_batch will be
1884/// # kept
1885/// probe_side_filter:
1886/// false
1887/// true
1888/// false
1889/// true
1890///
1891///
1892/// # Projections to the build/probe side batch, to construct the output batch
1893/// col_indices:
1894/// [(left, 0), (right, 0)]
1895///
1896/// build_side: left
1897///
1898/// ====
1899/// Result batch:
1900/// a b
1901/// ----
1902/// 1 20
1903/// 1 40
1904fn build_row_join_batch(
1905    output_schema: &Schema,
1906    build_side_batch: &RecordBatch,
1907    build_side_index: usize,
1908    probe_side_batch: &RecordBatch,
1909    probe_side_filter: Option<BooleanArray>,
1910    // See [`NLJStream`] struct's `column_indices` field for more detail
1911    col_indices: &[ColumnIndex],
1912    // If the build side is left or right, used to interpret the side information
1913    // in `col_indices`
1914    build_side: JoinSide,
1915) -> Result<Option<RecordBatch>> {
1916    debug_assert!(build_side != JoinSide::None);
1917
1918    // TODO(perf): since the output might be projection of right batch, this
1919    // filtering step is more efficient to be done inside the column_index loop
1920    let filtered_probe_batch = if let Some(filter) = probe_side_filter {
1921        &filter_record_batch(probe_side_batch, &filter)?
1922    } else {
1923        probe_side_batch
1924    };
1925
1926    if filtered_probe_batch.num_rows() == 0 {
1927        return Ok(None);
1928    }
1929
1930    // Edge case: downstream operator does not require any columns from this NLJ,
1931    // so allow an empty projection.
1932    // Example:
1933    //  SELECT DISTINCT 32 AS col2
1934    //  FROM tab0 AS cor0
1935    //  LEFT OUTER JOIN tab2 AS cor1
1936    //  ON ( NULL ) IS NULL;
1937    if output_schema.fields.is_empty() {
1938        return Ok(Some(create_record_batch_with_empty_schema(
1939            Arc::new(output_schema.clone()),
1940            filtered_probe_batch.num_rows(),
1941        )?));
1942    }
1943
1944    let mut columns: Vec<Arc<dyn Array>> =
1945        Vec::with_capacity(output_schema.fields().len());
1946
1947    for column_index in col_indices {
1948        let array = if column_index.side == build_side {
1949            // Broadcast the single build-side row to match the filtered
1950            // probe-side batch length
1951            let original_left_array = build_side_batch.column(column_index.index);
1952            // Avoid using `ScalarValue::to_array_of_size()` for `List(Utf8View)` to avoid
1953            // deep copies for buffers inside `Utf8View` array. See below for details.
1954            // https://github.com/apache/datafusion/issues/18159
1955            //
1956            // In other cases, `to_array_of_size()` is faster.
1957            match original_left_array.data_type() {
1958                DataType::List(field) | DataType::LargeList(field)
1959                    if field.data_type() == &DataType::Utf8View =>
1960                {
1961                    let indices_iter = std::iter::repeat_n(
1962                        build_side_index as u64,
1963                        filtered_probe_batch.num_rows(),
1964                    );
1965                    let indices_array = UInt64Array::from_iter_values(indices_iter);
1966                    take(original_left_array.as_ref(), &indices_array, None)?
1967                }
1968                _ => {
1969                    let scalar_value = ScalarValue::try_from_array(
1970                        original_left_array.as_ref(),
1971                        build_side_index,
1972                    )?;
1973                    scalar_value.to_array_of_size(filtered_probe_batch.num_rows())?
1974                }
1975            }
1976        } else {
1977            // Take the filtered probe-side column using compute::take
1978            Arc::clone(filtered_probe_batch.column(column_index.index))
1979        };
1980
1981        columns.push(array);
1982    }
1983
1984    Ok(Some(RecordBatch::try_new(
1985        Arc::new(output_schema.clone()),
1986        columns,
1987    )?))
1988}
1989
1990/// Special case for `PlaceHolderRowExec`
1991/// Minimal example:  SELECT 1 WHERE EXISTS (SELECT 1);
1992//
1993/// # Return
1994/// If Some, that's the result batch
1995/// If None, it's not for this special case. Continue execution.
1996fn build_unmatched_batch_empty_schema(
1997    output_schema: &SchemaRef,
1998    batch_bitmap: &BooleanArray,
1999    // For left/right/full joins, it needs to fill nulls for another side
2000    join_type: JoinType,
2001) -> Result<Option<RecordBatch>> {
2002    let result_size = match join_type {
2003        JoinType::Left
2004        | JoinType::Right
2005        | JoinType::Full
2006        | JoinType::LeftAnti
2007        | JoinType::RightAnti => batch_bitmap.false_count(),
2008        JoinType::LeftSemi | JoinType::RightSemi => batch_bitmap.true_count(),
2009        JoinType::LeftMark | JoinType::RightMark => batch_bitmap.len(),
2010        _ => unreachable!(),
2011    };
2012
2013    if output_schema.fields().is_empty() {
2014        Ok(Some(create_record_batch_with_empty_schema(
2015            Arc::clone(output_schema),
2016            result_size,
2017        )?))
2018    } else {
2019        Ok(None)
2020    }
2021}
2022
2023/// Creates an empty RecordBatch with a specific row count.
2024/// This is useful for cases where we need a batch with the correct schema and row count
2025/// but no actual data columns (e.g., for constant filters).
2026fn create_record_batch_with_empty_schema(
2027    schema: SchemaRef,
2028    row_count: usize,
2029) -> Result<RecordBatch> {
2030    let options = RecordBatchOptions::new()
2031        .with_match_field_names(true)
2032        .with_row_count(Some(row_count));
2033
2034    RecordBatch::try_new_with_options(schema, vec![], &options).map_err(|e| {
2035        internal_datafusion_err!("Failed to create empty record batch: {}", e)
2036    })
2037}
2038
2039/// # Example:
2040/// batch:
2041/// a
2042/// ----
2043/// 1
2044/// 2
2045/// 3
2046///
2047/// batch_bitmap:
2048/// ----
2049/// false
2050/// true
2051/// false
2052///
2053/// another_side_schema:
2054/// [(b, bool), (c, int32)]
2055///
2056/// join_type: JoinType::Left
2057///
2058/// col_indices: ...(please refer to the comment in `NLJStream::column_indices``)
2059///
2060/// batch_side: right
2061///
2062/// # Walkthrough:
2063///
2064/// This executor is performing a right join, and the currently processed right
2065/// batch is as above. After joining it with all buffered left rows, the joined
2066/// entries are marked by the `batch_bitmap`.
2067/// This method will keep the unmatched indices on the batch side (right), and pad
2068/// the left side with nulls. The result would be:
2069///
2070/// b          c           a
2071/// ------------------------
2072/// Null(bool) Null(Int32) 1
2073/// Null(bool) Null(Int32) 3
2074fn build_unmatched_batch(
2075    output_schema: &SchemaRef,
2076    batch: &RecordBatch,
2077    batch_bitmap: BooleanArray,
2078    // For left/right/full joins, it needs to fill nulls for another side
2079    another_side_schema: &SchemaRef,
2080    col_indices: &[ColumnIndex],
2081    join_type: JoinType,
2082    batch_side: JoinSide,
2083) -> Result<Option<RecordBatch>> {
2084    // Should not call it for inner joins
2085    debug_assert_ne!(join_type, JoinType::Inner);
2086    debug_assert_ne!(batch_side, JoinSide::None);
2087
2088    // Handle special case (see function comment)
2089    if let Some(batch) =
2090        build_unmatched_batch_empty_schema(output_schema, &batch_bitmap, join_type)?
2091    {
2092        return Ok(Some(batch));
2093    }
2094
2095    match join_type {
2096        JoinType::Full | JoinType::Right | JoinType::Left => {
2097            if join_type == JoinType::Right {
2098                debug_assert_eq!(batch_side, JoinSide::Right);
2099            }
2100            if join_type == JoinType::Left {
2101                debug_assert_eq!(batch_side, JoinSide::Left);
2102            }
2103
2104            // 1. Filter the batch with *flipped* bitmap
2105            // 2. Fill left side with nulls
2106            let flipped_bitmap = not(&batch_bitmap)?;
2107
2108            // create a recordbatch, with left_schema, of only one row of all nulls
2109            let left_null_columns: Vec<Arc<dyn Array>> = another_side_schema
2110                .fields()
2111                .iter()
2112                .map(|field| new_null_array(field.data_type(), 1))
2113                .collect();
2114
2115            // Hack: If the left schema is not nullable, the full join result
2116            // might contain null, this is only a temporary batch to construct
2117            // such full join result.
2118            let nullable_left_schema = Arc::new(Schema::new(
2119                another_side_schema
2120                    .fields()
2121                    .iter()
2122                    .map(|field| (**field).clone().with_nullable(true))
2123                    .collect::<Vec<_>>(),
2124            ));
2125            let left_null_batch = if nullable_left_schema.fields.is_empty() {
2126                // Left input can be an empty relation, in this case left relation
2127                // won't be used to construct the result batch (i.e. not in `col_indices`)
2128                create_record_batch_with_empty_schema(nullable_left_schema, 0)?
2129            } else {
2130                RecordBatch::try_new(nullable_left_schema, left_null_columns)?
2131            };
2132
2133            debug_assert_ne!(batch_side, JoinSide::None);
2134            let opposite_side = batch_side.negate();
2135
2136            build_row_join_batch(
2137                output_schema,
2138                &left_null_batch,
2139                0,
2140                batch,
2141                Some(flipped_bitmap),
2142                col_indices,
2143                opposite_side,
2144            )
2145        }
2146        JoinType::RightSemi
2147        | JoinType::RightAnti
2148        | JoinType::LeftSemi
2149        | JoinType::LeftAnti => {
2150            if matches!(join_type, JoinType::RightSemi | JoinType::RightAnti) {
2151                debug_assert_eq!(batch_side, JoinSide::Right);
2152            }
2153            if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
2154                debug_assert_eq!(batch_side, JoinSide::Left);
2155            }
2156
2157            let bitmap = if matches!(join_type, JoinType::LeftSemi | JoinType::RightSemi)
2158            {
2159                batch_bitmap.clone()
2160            } else {
2161                not(&batch_bitmap)?
2162            };
2163
2164            if bitmap.true_count() == 0 {
2165                return Ok(None);
2166            }
2167
2168            let mut columns: Vec<Arc<dyn Array>> =
2169                Vec::with_capacity(output_schema.fields().len());
2170
2171            for column_index in col_indices {
2172                debug_assert!(column_index.side == batch_side);
2173
2174                let col = batch.column(column_index.index);
2175                let filtered_col = filter(col, &bitmap)?;
2176
2177                columns.push(filtered_col);
2178            }
2179
2180            Ok(Some(RecordBatch::try_new(
2181                Arc::clone(output_schema),
2182                columns,
2183            )?))
2184        }
2185        JoinType::RightMark | JoinType::LeftMark => {
2186            if join_type == JoinType::RightMark {
2187                debug_assert_eq!(batch_side, JoinSide::Right);
2188            }
2189            if join_type == JoinType::LeftMark {
2190                debug_assert_eq!(batch_side, JoinSide::Left);
2191            }
2192
2193            let mut columns: Vec<Arc<dyn Array>> =
2194                Vec::with_capacity(output_schema.fields().len());
2195
2196            // Hack to deal with the borrow checker
2197            let mut right_batch_bitmap_opt = Some(batch_bitmap);
2198
2199            for column_index in col_indices {
2200                if column_index.side == batch_side {
2201                    let col = batch.column(column_index.index);
2202
2203                    columns.push(Arc::clone(col));
2204                } else if column_index.side == JoinSide::None {
2205                    let right_batch_bitmap = std::mem::take(&mut right_batch_bitmap_opt);
2206                    match right_batch_bitmap {
2207                        Some(right_batch_bitmap) => {
2208                            columns.push(Arc::new(right_batch_bitmap))
2209                        }
2210                        None => unreachable!("Should only be one mark column"),
2211                    }
2212                } else {
2213                    return internal_err!(
2214                        "Not possible to have this join side for RightMark join"
2215                    );
2216                }
2217            }
2218
2219            Ok(Some(RecordBatch::try_new(
2220                Arc::clone(output_schema),
2221                columns,
2222            )?))
2223        }
2224        _ => internal_err!(
2225            "If batch is at right side, this function must be handling Full/Right/RightSemi/RightAnti/RightMark joins"
2226        ),
2227    }
2228}
2229
2230#[cfg(test)]
2231pub(crate) mod tests {
2232    use super::*;
2233    use crate::test::{TestMemoryExec, assert_join_metrics};
2234    use crate::{
2235        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2236    };
2237
2238    use arrow::compute::SortOptions;
2239    use arrow::datatypes::{DataType, Field};
2240    use datafusion_common::test_util::batches_to_sort_string;
2241    use datafusion_common::{ScalarValue, assert_contains};
2242    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2243    use datafusion_expr::Operator;
2244    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2245    use datafusion_physical_expr::{Partitioning, PhysicalExpr};
2246    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
2247
2248    use insta::allow_duplicates;
2249    use insta::assert_snapshot;
2250    use rstest::rstest;
2251
2252    fn build_table(
2253        a: (&str, &Vec<i32>),
2254        b: (&str, &Vec<i32>),
2255        c: (&str, &Vec<i32>),
2256        batch_size: Option<usize>,
2257        sorted_column_names: Vec<&str>,
2258    ) -> Arc<dyn ExecutionPlan> {
2259        let batch = build_table_i32(a, b, c);
2260        let schema = batch.schema();
2261
2262        let batches = if let Some(batch_size) = batch_size {
2263            let num_batches = batch.num_rows().div_ceil(batch_size);
2264            (0..num_batches)
2265                .map(|i| {
2266                    let start = i * batch_size;
2267                    let remaining_rows = batch.num_rows() - start;
2268                    batch.slice(start, batch_size.min(remaining_rows))
2269                })
2270                .collect::<Vec<_>>()
2271        } else {
2272            vec![batch]
2273        };
2274
2275        let mut sort_info = vec![];
2276        for name in sorted_column_names {
2277            let index = schema.index_of(name).unwrap();
2278            let sort_expr = PhysicalSortExpr::new(
2279                Arc::new(Column::new(name, index)),
2280                SortOptions::new(false, false),
2281            );
2282            sort_info.push(sort_expr);
2283        }
2284        let mut source = TestMemoryExec::try_new(&[batches], schema, None).unwrap();
2285        if let Some(ordering) = LexOrdering::new(sort_info) {
2286            source = source.try_with_sort_information(vec![ordering]).unwrap();
2287        }
2288
2289        let source = Arc::new(source);
2290        Arc::new(TestMemoryExec::update_cache(&source))
2291    }
2292
2293    fn build_left_table() -> Arc<dyn ExecutionPlan> {
2294        build_table(
2295            ("a1", &vec![5, 9, 11]),
2296            ("b1", &vec![5, 8, 8]),
2297            ("c1", &vec![50, 90, 110]),
2298            None,
2299            Vec::new(),
2300        )
2301    }
2302
2303    fn build_right_table() -> Arc<dyn ExecutionPlan> {
2304        build_table(
2305            ("a2", &vec![12, 2, 10]),
2306            ("b2", &vec![10, 2, 10]),
2307            ("c2", &vec![40, 80, 100]),
2308            None,
2309            Vec::new(),
2310        )
2311    }
2312
2313    fn prepare_join_filter() -> JoinFilter {
2314        let column_indices = vec![
2315            ColumnIndex {
2316                index: 1,
2317                side: JoinSide::Left,
2318            },
2319            ColumnIndex {
2320                index: 1,
2321                side: JoinSide::Right,
2322            },
2323        ];
2324        let intermediate_schema = Schema::new(vec![
2325            Field::new("x", DataType::Int32, true),
2326            Field::new("x", DataType::Int32, true),
2327        ]);
2328        // left.b1!=8
2329        let left_filter = Arc::new(BinaryExpr::new(
2330            Arc::new(Column::new("x", 0)),
2331            Operator::NotEq,
2332            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2333        )) as Arc<dyn PhysicalExpr>;
2334        // right.b2!=10
2335        let right_filter = Arc::new(BinaryExpr::new(
2336            Arc::new(Column::new("x", 1)),
2337            Operator::NotEq,
2338            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2339        )) as Arc<dyn PhysicalExpr>;
2340        // filter = left.b1!=8 and right.b2!=10
2341        // after filter:
2342        // left table:
2343        // ("a1", &vec![5]),
2344        // ("b1", &vec![5]),
2345        // ("c1", &vec![50]),
2346        // right table:
2347        // ("a2", &vec![12, 2]),
2348        // ("b2", &vec![10, 2]),
2349        // ("c2", &vec![40, 80]),
2350        let filter_expression =
2351            Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
2352                as Arc<dyn PhysicalExpr>;
2353
2354        JoinFilter::new(
2355            filter_expression,
2356            column_indices,
2357            Arc::new(intermediate_schema),
2358        )
2359    }
2360
2361    pub(crate) async fn multi_partitioned_join_collect(
2362        left: Arc<dyn ExecutionPlan>,
2363        right: Arc<dyn ExecutionPlan>,
2364        join_type: &JoinType,
2365        join_filter: Option<JoinFilter>,
2366        context: Arc<TaskContext>,
2367    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2368        let partition_count = 4;
2369
2370        // Redistributing right input
2371        let right = Arc::new(RepartitionExec::try_new(
2372            right,
2373            Partitioning::RoundRobinBatch(partition_count),
2374        )?) as Arc<dyn ExecutionPlan>;
2375
2376        // Use the required distribution for nested loop join to test partition data
2377        let nested_loop_join =
2378            NestedLoopJoinExec::try_new(left, right, join_filter, join_type, None)?;
2379        let columns = columns(&nested_loop_join.schema());
2380        let mut batches = vec![];
2381        for i in 0..partition_count {
2382            let stream = nested_loop_join.execute(i, Arc::clone(&context))?;
2383            let more_batches = common::collect(stream).await?;
2384            batches.extend(
2385                more_batches
2386                    .into_iter()
2387                    .inspect(|b| {
2388                        assert!(b.num_rows() <= context.session_config().batch_size())
2389                    })
2390                    .filter(|b| b.num_rows() > 0)
2391                    .collect::<Vec<_>>(),
2392            );
2393        }
2394
2395        let metrics = nested_loop_join.metrics().unwrap();
2396
2397        Ok((columns, batches, metrics))
2398    }
2399
2400    fn new_task_ctx(batch_size: usize) -> Arc<TaskContext> {
2401        let base = TaskContext::default();
2402        // limit max size of intermediate batch used in nlj to 1
2403        let cfg = base.session_config().clone().with_batch_size(batch_size);
2404        Arc::new(base.with_session_config(cfg))
2405    }
2406
2407    #[rstest]
2408    #[tokio::test]
2409    async fn join_inner_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2410        let task_ctx = new_task_ctx(batch_size);
2411        dbg!(&batch_size);
2412        let left = build_left_table();
2413        let right = build_right_table();
2414        let filter = prepare_join_filter();
2415        let (columns, batches, metrics) = multi_partitioned_join_collect(
2416            left,
2417            right,
2418            &JoinType::Inner,
2419            Some(filter),
2420            task_ctx,
2421        )
2422        .await?;
2423
2424        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2425        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2426        +----+----+----+----+----+----+
2427        | a1 | b1 | c1 | a2 | b2 | c2 |
2428        +----+----+----+----+----+----+
2429        | 5  | 5  | 50 | 2  | 2  | 80 |
2430        +----+----+----+----+----+----+
2431        "));
2432
2433        assert_join_metrics!(metrics, 1);
2434
2435        Ok(())
2436    }
2437
2438    #[rstest]
2439    #[tokio::test]
2440    async fn join_left_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2441        let task_ctx = new_task_ctx(batch_size);
2442        let left = build_left_table();
2443        let right = build_right_table();
2444
2445        let filter = prepare_join_filter();
2446        let (columns, batches, metrics) = multi_partitioned_join_collect(
2447            left,
2448            right,
2449            &JoinType::Left,
2450            Some(filter),
2451            task_ctx,
2452        )
2453        .await?;
2454        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2455        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2456        +----+----+-----+----+----+----+
2457        | a1 | b1 | c1  | a2 | b2 | c2 |
2458        +----+----+-----+----+----+----+
2459        | 11 | 8  | 110 |    |    |    |
2460        | 5  | 5  | 50  | 2  | 2  | 80 |
2461        | 9  | 8  | 90  |    |    |    |
2462        +----+----+-----+----+----+----+
2463        "));
2464
2465        assert_join_metrics!(metrics, 3);
2466
2467        Ok(())
2468    }
2469
2470    #[rstest]
2471    #[tokio::test]
2472    async fn join_right_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2473        let task_ctx = new_task_ctx(batch_size);
2474        let left = build_left_table();
2475        let right = build_right_table();
2476
2477        let filter = prepare_join_filter();
2478        let (columns, batches, metrics) = multi_partitioned_join_collect(
2479            left,
2480            right,
2481            &JoinType::Right,
2482            Some(filter),
2483            task_ctx,
2484        )
2485        .await?;
2486        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2487        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2488        +----+----+----+----+----+-----+
2489        | a1 | b1 | c1 | a2 | b2 | c2  |
2490        +----+----+----+----+----+-----+
2491        |    |    |    | 10 | 10 | 100 |
2492        |    |    |    | 12 | 10 | 40  |
2493        | 5  | 5  | 50 | 2  | 2  | 80  |
2494        +----+----+----+----+----+-----+
2495        "));
2496
2497        assert_join_metrics!(metrics, 3);
2498
2499        Ok(())
2500    }
2501
2502    #[rstest]
2503    #[tokio::test]
2504    async fn join_full_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2505        let task_ctx = new_task_ctx(batch_size);
2506        let left = build_left_table();
2507        let right = build_right_table();
2508
2509        let filter = prepare_join_filter();
2510        let (columns, batches, metrics) = multi_partitioned_join_collect(
2511            left,
2512            right,
2513            &JoinType::Full,
2514            Some(filter),
2515            task_ctx,
2516        )
2517        .await?;
2518        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2519        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2520        +----+----+-----+----+----+-----+
2521        | a1 | b1 | c1  | a2 | b2 | c2  |
2522        +----+----+-----+----+----+-----+
2523        |    |    |     | 10 | 10 | 100 |
2524        |    |    |     | 12 | 10 | 40  |
2525        | 11 | 8  | 110 |    |    |     |
2526        | 5  | 5  | 50  | 2  | 2  | 80  |
2527        | 9  | 8  | 90  |    |    |     |
2528        +----+----+-----+----+----+-----+
2529        "));
2530
2531        assert_join_metrics!(metrics, 5);
2532
2533        Ok(())
2534    }
2535
2536    #[rstest]
2537    #[tokio::test]
2538    async fn join_left_semi_with_filter(
2539        #[values(1, 2, 16)] batch_size: usize,
2540    ) -> Result<()> {
2541        let task_ctx = new_task_ctx(batch_size);
2542        let left = build_left_table();
2543        let right = build_right_table();
2544
2545        let filter = prepare_join_filter();
2546        let (columns, batches, metrics) = multi_partitioned_join_collect(
2547            left,
2548            right,
2549            &JoinType::LeftSemi,
2550            Some(filter),
2551            task_ctx,
2552        )
2553        .await?;
2554        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2555        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2556        +----+----+----+
2557        | a1 | b1 | c1 |
2558        +----+----+----+
2559        | 5  | 5  | 50 |
2560        +----+----+----+
2561        "));
2562
2563        assert_join_metrics!(metrics, 1);
2564
2565        Ok(())
2566    }
2567
2568    #[rstest]
2569    #[tokio::test]
2570    async fn join_left_anti_with_filter(
2571        #[values(1, 2, 16)] batch_size: usize,
2572    ) -> Result<()> {
2573        let task_ctx = new_task_ctx(batch_size);
2574        let left = build_left_table();
2575        let right = build_right_table();
2576
2577        let filter = prepare_join_filter();
2578        let (columns, batches, metrics) = multi_partitioned_join_collect(
2579            left,
2580            right,
2581            &JoinType::LeftAnti,
2582            Some(filter),
2583            task_ctx,
2584        )
2585        .await?;
2586        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2587        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2588        +----+----+-----+
2589        | a1 | b1 | c1  |
2590        +----+----+-----+
2591        | 11 | 8  | 110 |
2592        | 9  | 8  | 90  |
2593        +----+----+-----+
2594        "));
2595
2596        assert_join_metrics!(metrics, 2);
2597
2598        Ok(())
2599    }
2600
2601    #[tokio::test]
2602    async fn join_has_correct_stats() -> Result<()> {
2603        let left = build_left_table();
2604        let right = build_right_table();
2605        let nested_loop_join = NestedLoopJoinExec::try_new(
2606            left,
2607            right,
2608            None,
2609            &JoinType::Left,
2610            Some(vec![1, 2]),
2611        )?;
2612        let stats = nested_loop_join.partition_statistics(None)?;
2613        assert_eq!(
2614            nested_loop_join.schema().fields().len(),
2615            stats.column_statistics.len(),
2616        );
2617        assert_eq!(2, stats.column_statistics.len());
2618        Ok(())
2619    }
2620
2621    #[rstest]
2622    #[tokio::test]
2623    async fn join_right_semi_with_filter(
2624        #[values(1, 2, 16)] batch_size: usize,
2625    ) -> Result<()> {
2626        let task_ctx = new_task_ctx(batch_size);
2627        let left = build_left_table();
2628        let right = build_right_table();
2629
2630        let filter = prepare_join_filter();
2631        let (columns, batches, metrics) = multi_partitioned_join_collect(
2632            left,
2633            right,
2634            &JoinType::RightSemi,
2635            Some(filter),
2636            task_ctx,
2637        )
2638        .await?;
2639        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2640        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2641        +----+----+----+
2642        | a2 | b2 | c2 |
2643        +----+----+----+
2644        | 2  | 2  | 80 |
2645        +----+----+----+
2646        "));
2647
2648        assert_join_metrics!(metrics, 1);
2649
2650        Ok(())
2651    }
2652
2653    #[rstest]
2654    #[tokio::test]
2655    async fn join_right_anti_with_filter(
2656        #[values(1, 2, 16)] batch_size: usize,
2657    ) -> Result<()> {
2658        let task_ctx = new_task_ctx(batch_size);
2659        let left = build_left_table();
2660        let right = build_right_table();
2661
2662        let filter = prepare_join_filter();
2663        let (columns, batches, metrics) = multi_partitioned_join_collect(
2664            left,
2665            right,
2666            &JoinType::RightAnti,
2667            Some(filter),
2668            task_ctx,
2669        )
2670        .await?;
2671        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2672        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2673        +----+----+-----+
2674        | a2 | b2 | c2  |
2675        +----+----+-----+
2676        | 10 | 10 | 100 |
2677        | 12 | 10 | 40  |
2678        +----+----+-----+
2679        "));
2680
2681        assert_join_metrics!(metrics, 2);
2682
2683        Ok(())
2684    }
2685
2686    #[rstest]
2687    #[tokio::test]
2688    async fn join_left_mark_with_filter(
2689        #[values(1, 2, 16)] batch_size: usize,
2690    ) -> Result<()> {
2691        let task_ctx = new_task_ctx(batch_size);
2692        let left = build_left_table();
2693        let right = build_right_table();
2694
2695        let filter = prepare_join_filter();
2696        let (columns, batches, metrics) = multi_partitioned_join_collect(
2697            left,
2698            right,
2699            &JoinType::LeftMark,
2700            Some(filter),
2701            task_ctx,
2702        )
2703        .await?;
2704        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
2705        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2706        +----+----+-----+-------+
2707        | a1 | b1 | c1  | mark  |
2708        +----+----+-----+-------+
2709        | 11 | 8  | 110 | false |
2710        | 5  | 5  | 50  | true  |
2711        | 9  | 8  | 90  | false |
2712        +----+----+-----+-------+
2713        "));
2714
2715        assert_join_metrics!(metrics, 3);
2716
2717        Ok(())
2718    }
2719
2720    #[rstest]
2721    #[tokio::test]
2722    async fn join_right_mark_with_filter(
2723        #[values(1, 2, 16)] batch_size: usize,
2724    ) -> Result<()> {
2725        let task_ctx = new_task_ctx(batch_size);
2726        let left = build_left_table();
2727        let right = build_right_table();
2728
2729        let filter = prepare_join_filter();
2730        let (columns, batches, metrics) = multi_partitioned_join_collect(
2731            left,
2732            right,
2733            &JoinType::RightMark,
2734            Some(filter),
2735            task_ctx,
2736        )
2737        .await?;
2738        assert_eq!(columns, vec!["a2", "b2", "c2", "mark"]);
2739
2740        allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2741        +----+----+-----+-------+
2742        | a2 | b2 | c2  | mark  |
2743        +----+----+-----+-------+
2744        | 10 | 10 | 100 | false |
2745        | 12 | 10 | 40  | false |
2746        | 2  | 2  | 80  | true  |
2747        +----+----+-----+-------+
2748        "));
2749
2750        assert_join_metrics!(metrics, 3);
2751
2752        Ok(())
2753    }
2754
2755    #[tokio::test]
2756    async fn test_overallocation() -> Result<()> {
2757        let left = build_table(
2758            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2759            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2760            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2761            None,
2762            Vec::new(),
2763        );
2764        let right = build_table(
2765            ("a2", &vec![10, 11]),
2766            ("b2", &vec![12, 13]),
2767            ("c2", &vec![14, 15]),
2768            None,
2769            Vec::new(),
2770        );
2771        let filter = prepare_join_filter();
2772
2773        let join_types = vec![
2774            JoinType::Inner,
2775            JoinType::Left,
2776            JoinType::Right,
2777            JoinType::Full,
2778            JoinType::LeftSemi,
2779            JoinType::LeftAnti,
2780            JoinType::LeftMark,
2781            JoinType::RightSemi,
2782            JoinType::RightAnti,
2783            JoinType::RightMark,
2784        ];
2785
2786        for join_type in join_types {
2787            let runtime = RuntimeEnvBuilder::new()
2788                .with_memory_limit(100, 1.0)
2789                .build_arc()?;
2790            let task_ctx = TaskContext::default().with_runtime(runtime);
2791            let task_ctx = Arc::new(task_ctx);
2792
2793            let err = multi_partitioned_join_collect(
2794                Arc::clone(&left),
2795                Arc::clone(&right),
2796                &join_type,
2797                Some(filter.clone()),
2798                task_ctx,
2799            )
2800            .await
2801            .unwrap_err();
2802
2803            assert_contains!(
2804                err.to_string(),
2805                "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n  NestedLoopJoinLoad[0]"
2806            );
2807        }
2808
2809        Ok(())
2810    }
2811
2812    /// Returns the column names on the schema
2813    fn columns(schema: &Schema) -> Vec<String> {
2814        schema.fields().iter().map(|f| f.name().clone()).collect()
2815    }
2816}