datafusion_physical_plan/joins/hash_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::execution_plan::{boundedness_from_children, EmissionType};
25use crate::filter_pushdown::{
26    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
27    FilterPushdownPropagation,
28};
29use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
30use crate::joins::hash_join::stream::{
31    BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
32};
33use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
34use crate::joins::utils::{
35    asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection,
36    update_hash, OnceAsync, OnceFut,
37};
38use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
39use crate::projection::{
40    try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
41    ProjectionExec,
42};
43use crate::spill::get_record_batch_memory_size;
44use crate::ExecutionPlanProperties;
45use crate::{
46    common::can_project,
47    joins::utils::{
48        build_join_schema, check_join_is_valid, estimate_join_statistics,
49        need_produce_result_in_final, symmetric_join_output_partitioning,
50        BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
51    },
52    metrics::{ExecutionPlanMetricsSet, MetricsSet},
53    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
54    PlanProperties, SendableRecordBatchStream, Statistics,
55};
56
57use arrow::array::{ArrayRef, BooleanBufferBuilder};
58use arrow::compute::concat_batches;
59use arrow::datatypes::SchemaRef;
60use arrow::record_batch::RecordBatch;
61use arrow::util::bit_util;
62use arrow_schema::DataType;
63use datafusion_common::config::ConfigOptions;
64use datafusion_common::utils::memory::estimate_memory_size;
65use datafusion_common::{
66    internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
67};
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_execution::TaskContext;
70use datafusion_expr::Accumulator;
71use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
72use datafusion_physical_expr::equivalence::{
73    join_equivalence_properties, ProjectionMapping,
74};
75use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
76use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
77
78use ahash::RandomState;
79use datafusion_physical_expr_common::physical_expr::fmt_sql;
80use futures::TryStreamExt;
81use parking_lot::Mutex;
82
83/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
84const HASH_JOIN_SEED: RandomState =
85    RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
86
87/// HashTable and input data for the left (build side) of a join
88pub(super) struct JoinLeftData {
89    /// The hash table with indices into `batch`
90    pub(super) hash_map: Box<dyn JoinHashMapType>,
91    /// The input rows for the build side
92    batch: RecordBatch,
93    /// The build side on expressions values
94    values: Vec<ArrayRef>,
95    /// Shared bitmap builder for visited left indices
96    visited_indices_bitmap: SharedBitmapBuilder,
97    /// Counter of running probe-threads, potentially
98    /// able to update `visited_indices_bitmap`
99    probe_threads_counter: AtomicUsize,
100    /// We need to keep this field to maintain accurate memory accounting, even though we don't directly use it.
101    /// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
102    /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
103    /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
104    _reservation: MemoryReservation,
105    /// Bounds computed from the build side for dynamic filter pushdown
106    pub(super) bounds: Option<Vec<ColumnBounds>>,
107}
108
109impl JoinLeftData {
110    /// Create a new `JoinLeftData` from its parts
111    pub(super) fn new(
112        hash_map: Box<dyn JoinHashMapType>,
113        batch: RecordBatch,
114        values: Vec<ArrayRef>,
115        visited_indices_bitmap: SharedBitmapBuilder,
116        probe_threads_counter: AtomicUsize,
117        reservation: MemoryReservation,
118        bounds: Option<Vec<ColumnBounds>>,
119    ) -> Self {
120        Self {
121            hash_map,
122            batch,
123            values,
124            visited_indices_bitmap,
125            probe_threads_counter,
126            _reservation: reservation,
127            bounds,
128        }
129    }
130
131    /// return a reference to the hash map
132    pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
133        &*self.hash_map
134    }
135
136    /// returns a reference to the build side batch
137    pub(super) fn batch(&self) -> &RecordBatch {
138        &self.batch
139    }
140
141    /// returns a reference to the build side expressions values
142    pub(super) fn values(&self) -> &[ArrayRef] {
143        &self.values
144    }
145
146    /// returns a reference to the visited indices bitmap
147    pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
148        &self.visited_indices_bitmap
149    }
150
151    /// Decrements the counter of running threads, and returns `true`
152    /// if caller is the last running thread
153    pub(super) fn report_probe_completed(&self) -> bool {
154        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
155    }
156}
157
158#[allow(rustdoc::private_intra_doc_links)]
159/// Join execution plan: Evaluates equijoin predicates in parallel on multiple
160/// partitions using a hash table and an optional filter list to apply post
161/// join.
162///
163/// # Join Expressions
164///
165/// This implementation is optimized for evaluating equijoin predicates  (
166/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
167/// in [`Self::on`].
168///
169/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
170/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
171/// after the equijoin predicates.
172///
173/// # "Build Side" vs "Probe Side"
174///
175/// HashJoin takes two inputs, which are referred to as the "build" and the
176/// "probe". The build side is the first child, and the probe side is the second
177/// child.
178///
179/// The two inputs are treated differently and it is VERY important that the
180/// *smaller* input is placed on the build side to minimize the work of creating
181/// the hash table.
182///
183/// ```text
184///          ┌───────────┐
185///          │ HashJoin  │
186///          │           │
187///          └───────────┘
188///              │   │
189///        ┌─────┘   └─────┐
190///        ▼               ▼
191/// ┌────────────┐  ┌─────────────┐
192/// │   Input    │  │    Input    │
193/// │    [0]     │  │     [1]     │
194/// └────────────┘  └─────────────┘
195///
196///  "build side"    "probe side"
197/// ```
198///
199/// Execution proceeds in 2 stages:
200///
201/// 1. the **build phase** creates a hash table from the tuples of the build side,
202///    and single concatenated batch containing data from all fetched record batches.
203///    Resulting hash table stores hashed join-key fields for each row as a key, and
204///    indices of corresponding rows in concatenated batch.
205///
206/// Hash join uses LIFO data structure as a hash table, and in order to retain
207/// original build-side input order while obtaining data during probe phase, hash
208/// table is updated by iterating batch sequence in reverse order -- it allows to
209/// keep rows with smaller indices "on the top" of hash table, and still maintain
210/// correct indexing for concatenated build-side data batch.
211///
212/// Example of build phase for 3 record batches:
213///
214///
215/// ```text
216///
217///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
218///                                                                         ┌───────────────────────────┐
219///                             hashmap.insert(row-hash, row-idx + offset)  │                      idx  │
220///            ┌───────┐                                                    │          ┌───────┐        │
221///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
222///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
223///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
224///            └───────┘                                                    │          └───────┘        │
225///                                                                         │                           │
226///            ┌───────┐                                                    │          ┌───────┐        │
227///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
228///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
229///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
230///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
231///            │ Row 5 │                                                    │          │ Row 5 │    4   │
232///            └───────┘                                                    │          └───────┘        │
233///                                                                         │                           │
234///            ┌───────┐                                                    │          ┌───────┐        │
235///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
236///   Batch 3  │       │           - hashmap.insert(Row 2, idx 6)           │ Batch 1  │       │        │
237///            │ Row 7 │           - hashmap.insert(Row 1, idx 5)           │          │ Row 2 │    6   │
238///            └───────┘                                                    │          └───────┘        │
239///                                                                         │                           │
240///                                                                         └───────────────────────────┘
241/// ```
242///
243/// 2. the **probe phase** where the tuples of the probe side are streamed
244///    through, checking for matches of the join keys in the hash table.
245///
246/// ```text
247///                 ┌────────────────┐          ┌────────────────┐
248///                 │ ┌─────────┐    │          │ ┌─────────┐    │
249///                 │ │  Hash   │    │          │ │  Hash   │    │
250///                 │ │  Table  │    │          │ │  Table  │    │
251///                 │ │(keys are│    │          │ │(keys are│    │
252///                 │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
253///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
254/// *entire* build  │ │         │    │          │ │         │    │  streamed through, and
255///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
256/// into the hash   │      ▲         │          │          ▲     │   contents of the hash
257///     table       │       HashJoin │          │  HashJoin      │          table
258///                 └──────┼─────────┘          └──────────┼─────┘
259///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
260///            │                                                         │
261///
262///            │                                                         │
263///     ┌────────────┐                                            ┌────────────┐
264///     │RecordBatch │                                            │RecordBatch │
265///     └────────────┘                                            └────────────┘
266///     ┌────────────┐                                            ┌────────────┐
267///     │RecordBatch │                                            │RecordBatch │
268///     └────────────┘                                            └────────────┘
269///           ...                                                       ...
270///     ┌────────────┐                                            ┌────────────┐
271///     │RecordBatch │                                            │RecordBatch │
272///     └────────────┘                                            └────────────┘
273///
274///        build side                                                probe side
275/// ```
276///
277/// # Example "Optimal" Plans
278///
279/// The differences in the inputs means that for classic "Star Schema Query",
280/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
281/// one where there is one large table and several smaller "dimension" tables,
282/// joined on `Foreign Key = Primary Key` predicates.
283///
284/// A "Right Deep Tree" looks like this large table as the probe side on the
285/// lowest join:
286///
287/// ```text
288///             ┌───────────┐
289///             │ HashJoin  │
290///             │           │
291///             └───────────┘
292///                 │   │
293///         ┌───────┘   └──────────┐
294///         ▼                      ▼
295/// ┌───────────────┐        ┌───────────┐
296/// │ small table 1 │        │ HashJoin  │
297/// │  "dimension"  │        │           │
298/// └───────────────┘        └───┬───┬───┘
299///                   ┌──────────┘   └───────┐
300///                   │                      │
301///                   ▼                      ▼
302///           ┌───────────────┐        ┌───────────┐
303///           │ small table 2 │        │ HashJoin  │
304///           │  "dimension"  │        │           │
305///           └───────────────┘        └───┬───┬───┘
306///                               ┌────────┘   └────────┐
307///                               │                     │
308///                               ▼                     ▼
309///                       ┌───────────────┐     ┌───────────────┐
310///                       │ small table 3 │     │  large table  │
311///                       │  "dimension"  │     │    "fact"     │
312///                       └───────────────┘     └───────────────┘
313/// ```
314///
315/// # Clone / Shared State
316///
317/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
318/// loading of the left side with the processing in each output stream.
319/// Therefore it can not be [`Clone`]
320pub struct HashJoinExec {
321    /// left (build) side which gets hashed
322    pub left: Arc<dyn ExecutionPlan>,
323    /// right (probe) side which are filtered by the hash table
324    pub right: Arc<dyn ExecutionPlan>,
325    /// Set of equijoin columns from the relations: `(left_col, right_col)`
326    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
327    /// Filters which are applied while finding matching rows
328    pub filter: Option<JoinFilter>,
329    /// How the join is performed (`OUTER`, `INNER`, etc)
330    pub join_type: JoinType,
331    /// The schema after join. Please be careful when using this schema,
332    /// if there is a projection, the schema isn't the same as the output schema.
333    join_schema: SchemaRef,
334    /// Future that consumes left input and builds the hash table
335    ///
336    /// For CollectLeft partition mode, this structure is *shared* across all output streams.
337    ///
338    /// Each output stream waits on the `OnceAsync` to signal the completion of
339    /// the hash table creation.
340    left_fut: Arc<OnceAsync<JoinLeftData>>,
341    /// Shared the `RandomState` for the hashing algorithm
342    random_state: RandomState,
343    /// Partitioning mode to use
344    pub mode: PartitionMode,
345    /// Execution metrics
346    metrics: ExecutionPlanMetricsSet,
347    /// The projection indices of the columns in the output schema of join
348    pub projection: Option<Vec<usize>>,
349    /// Information of index and left / right placement of columns
350    column_indices: Vec<ColumnIndex>,
351    /// The equality null-handling behavior of the join algorithm.
352    pub null_equality: NullEquality,
353    /// Cache holding plan properties like equivalences, output partitioning etc.
354    cache: PlanProperties,
355    /// Dynamic filter for pushing down to the probe side
356    /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
357    /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
358    dynamic_filter: Option<HashJoinExecDynamicFilter>,
359}
360
361#[derive(Clone)]
362struct HashJoinExecDynamicFilter {
363    /// Dynamic filter that we'll update with the results of the build side once that is done.
364    filter: Arc<DynamicFilterPhysicalExpr>,
365    /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
366    /// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
367    bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
368}
369
370impl fmt::Debug for HashJoinExec {
371    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
372        f.debug_struct("HashJoinExec")
373            .field("left", &self.left)
374            .field("right", &self.right)
375            .field("on", &self.on)
376            .field("filter", &self.filter)
377            .field("join_type", &self.join_type)
378            .field("join_schema", &self.join_schema)
379            .field("left_fut", &self.left_fut)
380            .field("random_state", &self.random_state)
381            .field("mode", &self.mode)
382            .field("metrics", &self.metrics)
383            .field("projection", &self.projection)
384            .field("column_indices", &self.column_indices)
385            .field("null_equality", &self.null_equality)
386            .field("cache", &self.cache)
387            // Explicitly exclude dynamic_filter to avoid runtime state differences in tests
388            .finish()
389    }
390}
391
392impl EmbeddedProjection for HashJoinExec {
393    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
394        self.with_projection(projection)
395    }
396}
397
398impl HashJoinExec {
399    /// Tries to create a new [HashJoinExec].
400    ///
401    /// # Error
402    /// This function errors when it is not possible to join the left and right sides on keys `on`.
403    #[allow(clippy::too_many_arguments)]
404    pub fn try_new(
405        left: Arc<dyn ExecutionPlan>,
406        right: Arc<dyn ExecutionPlan>,
407        on: JoinOn,
408        filter: Option<JoinFilter>,
409        join_type: &JoinType,
410        projection: Option<Vec<usize>>,
411        partition_mode: PartitionMode,
412        null_equality: NullEquality,
413    ) -> Result<Self> {
414        let left_schema = left.schema();
415        let right_schema = right.schema();
416        if on.is_empty() {
417            return plan_err!("On constraints in HashJoinExec should be non-empty");
418        }
419
420        check_join_is_valid(&left_schema, &right_schema, &on)?;
421
422        let (join_schema, column_indices) =
423            build_join_schema(&left_schema, &right_schema, join_type);
424
425        let random_state = HASH_JOIN_SEED;
426
427        let join_schema = Arc::new(join_schema);
428
429        //  check if the projection is valid
430        can_project(&join_schema, projection.as_ref())?;
431
432        let cache = Self::compute_properties(
433            &left,
434            &right,
435            Arc::clone(&join_schema),
436            *join_type,
437            &on,
438            partition_mode,
439            projection.as_ref(),
440        )?;
441
442        // Initialize both dynamic filter and bounds accumulator to None
443        // They will be set later if dynamic filtering is enabled
444
445        Ok(HashJoinExec {
446            left,
447            right,
448            on,
449            filter,
450            join_type: *join_type,
451            join_schema,
452            left_fut: Default::default(),
453            random_state,
454            mode: partition_mode,
455            metrics: ExecutionPlanMetricsSet::new(),
456            projection,
457            column_indices,
458            null_equality,
459            cache,
460            dynamic_filter: None,
461        })
462    }
463
464    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
465        // Extract the right-side keys (probe side keys) from the `on` clauses
466        // Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
467        let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
468        // Initialize with a placeholder expression (true) that will be updated when the hash table is built
469        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
470    }
471
472    /// left (build) side which gets hashed
473    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
474        &self.left
475    }
476
477    /// right (probe) side which are filtered by the hash table
478    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
479        &self.right
480    }
481
482    /// Set of common columns used to join on
483    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
484        &self.on
485    }
486
487    /// Filters applied before join output
488    pub fn filter(&self) -> Option<&JoinFilter> {
489        self.filter.as_ref()
490    }
491
492    /// How the join is performed
493    pub fn join_type(&self) -> &JoinType {
494        &self.join_type
495    }
496
497    /// The schema after join. Please be careful when using this schema,
498    /// if there is a projection, the schema isn't the same as the output schema.
499    pub fn join_schema(&self) -> &SchemaRef {
500        &self.join_schema
501    }
502
503    /// The partitioning mode of this hash join
504    pub fn partition_mode(&self) -> &PartitionMode {
505        &self.mode
506    }
507
508    /// Get null_equality
509    pub fn null_equality(&self) -> NullEquality {
510        self.null_equality
511    }
512
513    /// Calculate order preservation flags for this hash join.
514    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
515        vec![
516            false,
517            matches!(
518                join_type,
519                JoinType::Inner
520                    | JoinType::Right
521                    | JoinType::RightAnti
522                    | JoinType::RightSemi
523                    | JoinType::RightMark
524            ),
525        ]
526    }
527
528    /// Get probe side information for the hash join.
529    pub fn probe_side() -> JoinSide {
530        // In current implementation right side is always probe side.
531        JoinSide::Right
532    }
533
534    /// Return whether the join contains a projection
535    pub fn contains_projection(&self) -> bool {
536        self.projection.is_some()
537    }
538
539    /// Return new instance of [HashJoinExec] with the given projection.
540    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
541        //  check if the projection is valid
542        can_project(&self.schema(), projection.as_ref())?;
543        let projection = match projection {
544            Some(projection) => match &self.projection {
545                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
546                None => Some(projection),
547            },
548            None => None,
549        };
550        Self::try_new(
551            Arc::clone(&self.left),
552            Arc::clone(&self.right),
553            self.on.clone(),
554            self.filter.clone(),
555            &self.join_type,
556            projection,
557            self.mode,
558            self.null_equality,
559        )
560    }
561
562    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
563    fn compute_properties(
564        left: &Arc<dyn ExecutionPlan>,
565        right: &Arc<dyn ExecutionPlan>,
566        schema: SchemaRef,
567        join_type: JoinType,
568        on: JoinOnRef,
569        mode: PartitionMode,
570        projection: Option<&Vec<usize>>,
571    ) -> Result<PlanProperties> {
572        // Calculate equivalence properties:
573        let mut eq_properties = join_equivalence_properties(
574            left.equivalence_properties().clone(),
575            right.equivalence_properties().clone(),
576            &join_type,
577            Arc::clone(&schema),
578            &Self::maintains_input_order(join_type),
579            Some(Self::probe_side()),
580            on,
581        )?;
582
583        let mut output_partitioning = match mode {
584            PartitionMode::CollectLeft => {
585                asymmetric_join_output_partitioning(left, right, &join_type)?
586            }
587            PartitionMode::Auto => Partitioning::UnknownPartitioning(
588                right.output_partitioning().partition_count(),
589            ),
590            PartitionMode::Partitioned => {
591                symmetric_join_output_partitioning(left, right, &join_type)?
592            }
593        };
594
595        let emission_type = if left.boundedness().is_unbounded() {
596            EmissionType::Final
597        } else if right.pipeline_behavior() == EmissionType::Incremental {
598            match join_type {
599                // If we only need to generate matched rows from the probe side,
600                // we can emit rows incrementally.
601                JoinType::Inner
602                | JoinType::LeftSemi
603                | JoinType::RightSemi
604                | JoinType::Right
605                | JoinType::RightAnti
606                | JoinType::RightMark => EmissionType::Incremental,
607                // If we need to generate unmatched rows from the *build side*,
608                // we need to emit them at the end.
609                JoinType::Left
610                | JoinType::LeftAnti
611                | JoinType::LeftMark
612                | JoinType::Full => EmissionType::Both,
613            }
614        } else {
615            right.pipeline_behavior()
616        };
617
618        // If contains projection, update the PlanProperties.
619        if let Some(projection) = projection {
620            // construct a map from the input expressions to the output expression of the Projection
621            let projection_mapping =
622                ProjectionMapping::from_indices(projection, &schema)?;
623            let out_schema = project_schema(&schema, Some(projection))?;
624            output_partitioning =
625                output_partitioning.project(&projection_mapping, &eq_properties);
626            eq_properties = eq_properties.project(&projection_mapping, out_schema);
627        }
628
629        Ok(PlanProperties::new(
630            eq_properties,
631            output_partitioning,
632            emission_type,
633            boundedness_from_children([left, right]),
634        ))
635    }
636
637    /// Returns a new `ExecutionPlan` that computes the same join as this one,
638    /// with the left and right inputs swapped using the  specified
639    /// `partition_mode`.
640    ///
641    /// # Notes:
642    ///
643    /// This function is public so other downstream projects can use it to
644    /// construct `HashJoinExec` with right side as the build side.
645    ///
646    /// For using this interface directly, please refer to below:
647    ///
648    /// Hash join execution may require specific input partitioning (for example,
649    /// the left child may have a single partition while the right child has multiple).
650    ///
651    /// Calling this function on join nodes whose children have already been repartitioned
652    /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning
653    /// requirements of the hash join. Therefore, ensure you call this function
654    /// before inserting any repartitioning operators on the join's children.
655    ///
656    /// In DataFusion's default SQL interface, this function is used by the `JoinSelection`
657    /// physical optimizer rule to determine a good join order, which is
658    /// executed before the `EnforceDistribution` rule (the rule that may
659    /// insert `RepartitionExec` operators).
660    pub fn swap_inputs(
661        &self,
662        partition_mode: PartitionMode,
663    ) -> Result<Arc<dyn ExecutionPlan>> {
664        let left = self.left();
665        let right = self.right();
666        let new_join = HashJoinExec::try_new(
667            Arc::clone(right),
668            Arc::clone(left),
669            self.on()
670                .iter()
671                .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
672                .collect(),
673            self.filter().map(JoinFilter::swap),
674            &self.join_type().swap(),
675            swap_join_projection(
676                left.schema().fields().len(),
677                right.schema().fields().len(),
678                self.projection.as_ref(),
679                self.join_type(),
680            ),
681            partition_mode,
682            self.null_equality(),
683        )?;
684        // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
685        if matches!(
686            self.join_type(),
687            JoinType::LeftSemi
688                | JoinType::RightSemi
689                | JoinType::LeftAnti
690                | JoinType::RightAnti
691                | JoinType::LeftMark
692                | JoinType::RightMark
693        ) || self.projection.is_some()
694        {
695            Ok(Arc::new(new_join))
696        } else {
697            reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
698        }
699    }
700}
701
702impl DisplayAs for HashJoinExec {
703    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
704        match t {
705            DisplayFormatType::Default | DisplayFormatType::Verbose => {
706                let display_filter = self.filter.as_ref().map_or_else(
707                    || "".to_string(),
708                    |f| format!(", filter={}", f.expression()),
709                );
710                let display_projections = if self.contains_projection() {
711                    format!(
712                        ", projection=[{}]",
713                        self.projection
714                            .as_ref()
715                            .unwrap()
716                            .iter()
717                            .map(|index| format!(
718                                "{}@{}",
719                                self.join_schema.fields().get(*index).unwrap().name(),
720                                index
721                            ))
722                            .collect::<Vec<_>>()
723                            .join(", ")
724                    )
725                } else {
726                    "".to_string()
727                };
728                let display_null_equality =
729                    if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
730                        ", NullsEqual: true"
731                    } else {
732                        ""
733                    };
734                let on = self
735                    .on
736                    .iter()
737                    .map(|(c1, c2)| format!("({c1}, {c2})"))
738                    .collect::<Vec<String>>()
739                    .join(", ");
740                write!(
741                    f,
742                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
743                    self.mode,
744                    self.join_type,
745                    on,
746                    display_filter,
747                    display_projections,
748                    display_null_equality,
749                )
750            }
751            DisplayFormatType::TreeRender => {
752                let on = self
753                    .on
754                    .iter()
755                    .map(|(c1, c2)| {
756                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
757                    })
758                    .collect::<Vec<String>>()
759                    .join(", ");
760
761                if *self.join_type() != JoinType::Inner {
762                    writeln!(f, "join_type={:?}", self.join_type)?;
763                }
764
765                writeln!(f, "on={on}")?;
766
767                if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
768                    writeln!(f, "NullsEqual: true")?;
769                }
770
771                if let Some(filter) = self.filter.as_ref() {
772                    writeln!(f, "filter={filter}")?;
773                }
774
775                Ok(())
776            }
777        }
778    }
779}
780
781impl ExecutionPlan for HashJoinExec {
782    fn name(&self) -> &'static str {
783        "HashJoinExec"
784    }
785
786    fn as_any(&self) -> &dyn Any {
787        self
788    }
789
790    fn properties(&self) -> &PlanProperties {
791        &self.cache
792    }
793
794    fn required_input_distribution(&self) -> Vec<Distribution> {
795        match self.mode {
796            PartitionMode::CollectLeft => vec![
797                Distribution::SinglePartition,
798                Distribution::UnspecifiedDistribution,
799            ],
800            PartitionMode::Partitioned => {
801                let (left_expr, right_expr) = self
802                    .on
803                    .iter()
804                    .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
805                    .unzip();
806                vec![
807                    Distribution::HashPartitioned(left_expr),
808                    Distribution::HashPartitioned(right_expr),
809                ]
810            }
811            PartitionMode::Auto => vec![
812                Distribution::UnspecifiedDistribution,
813                Distribution::UnspecifiedDistribution,
814            ],
815        }
816    }
817
818    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by
819    // applying the hash function to convert the join key(s) in each row into a hash value from the
820    // probe side table in the order they're arranged. The hash value is used to look up corresponding
821    // entries in the hash table that was constructed from the build side table during the build phase.
822    //
823    // Because of the immediate generation of result rows once a match is found,
824    // the output of the join tends to follow the order in which the rows were read from
825    // the probe side table. This is simply due to the sequence in which the rows were processed.
826    // Hence, it appears that the hash join is preserving the order of the probe side.
827    //
828    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
829    // the unmatched rows from the probe side are also kept in order.
830    // This is because the **`RightAnti`** join is designed to return rows from the right
831    // (probe side) table that have no match in the left (build side) table. Because the rows
832    // are processed sequentially in the probe phase, and unmatched rows are directly output
833    // as results, these results tend to retain the order of the probe side table.
834    fn maintains_input_order(&self) -> Vec<bool> {
835        Self::maintains_input_order(self.join_type)
836    }
837
838    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
839        vec![&self.left, &self.right]
840    }
841
842    /// Creates a new HashJoinExec with different children while preserving configuration.
843    ///
844    /// This method is called during query optimization when the optimizer creates new
845    /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
846    /// rather than cloning the existing one because partitioning may have changed.
847    fn with_new_children(
848        self: Arc<Self>,
849        children: Vec<Arc<dyn ExecutionPlan>>,
850    ) -> Result<Arc<dyn ExecutionPlan>> {
851        Ok(Arc::new(HashJoinExec {
852            left: Arc::clone(&children[0]),
853            right: Arc::clone(&children[1]),
854            on: self.on.clone(),
855            filter: self.filter.clone(),
856            join_type: self.join_type,
857            join_schema: Arc::clone(&self.join_schema),
858            left_fut: Arc::clone(&self.left_fut),
859            random_state: self.random_state.clone(),
860            mode: self.mode,
861            metrics: ExecutionPlanMetricsSet::new(),
862            projection: self.projection.clone(),
863            column_indices: self.column_indices.clone(),
864            null_equality: self.null_equality,
865            cache: Self::compute_properties(
866                &children[0],
867                &children[1],
868                Arc::clone(&self.join_schema),
869                self.join_type,
870                &self.on,
871                self.mode,
872                self.projection.as_ref(),
873            )?,
874            // Keep the dynamic filter, bounds accumulator will be reset
875            dynamic_filter: self.dynamic_filter.clone(),
876        }))
877    }
878
879    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
880        Ok(Arc::new(HashJoinExec {
881            left: Arc::clone(&self.left),
882            right: Arc::clone(&self.right),
883            on: self.on.clone(),
884            filter: self.filter.clone(),
885            join_type: self.join_type,
886            join_schema: Arc::clone(&self.join_schema),
887            // Reset the left_fut to allow re-execution
888            left_fut: Arc::new(OnceAsync::default()),
889            random_state: self.random_state.clone(),
890            mode: self.mode,
891            metrics: ExecutionPlanMetricsSet::new(),
892            projection: self.projection.clone(),
893            column_indices: self.column_indices.clone(),
894            null_equality: self.null_equality,
895            cache: self.cache.clone(),
896            // Reset dynamic filter and bounds accumulator to initial state
897            dynamic_filter: None,
898        }))
899    }
900
901    fn execute(
902        &self,
903        partition: usize,
904        context: Arc<TaskContext>,
905    ) -> Result<SendableRecordBatchStream> {
906        let on_left = self
907            .on
908            .iter()
909            .map(|on| Arc::clone(&on.0))
910            .collect::<Vec<_>>();
911        let left_partitions = self.left.output_partitioning().partition_count();
912        let right_partitions = self.right.output_partitioning().partition_count();
913
914        if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions
915        {
916            return internal_err!(
917                "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
918                 consider using RepartitionExec"
919            );
920        }
921
922        if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
923            return internal_err!(
924                "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
925                 consider using CoalescePartitionsExec or the EnforceDistribution rule"
926            );
927        }
928
929        let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
930
931        let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
932        let left_fut = match self.mode {
933            PartitionMode::CollectLeft => self.left_fut.try_once(|| {
934                let left_stream = self.left.execute(0, Arc::clone(&context))?;
935
936                let reservation =
937                    MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
938
939                Ok(collect_left_input(
940                    self.random_state.clone(),
941                    left_stream,
942                    on_left.clone(),
943                    join_metrics.clone(),
944                    reservation,
945                    need_produce_result_in_final(self.join_type),
946                    self.right().output_partitioning().partition_count(),
947                    enable_dynamic_filter_pushdown,
948                ))
949            })?,
950            PartitionMode::Partitioned => {
951                let left_stream = self.left.execute(partition, Arc::clone(&context))?;
952
953                let reservation =
954                    MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
955                        .register(context.memory_pool());
956
957                OnceFut::new(collect_left_input(
958                    self.random_state.clone(),
959                    left_stream,
960                    on_left.clone(),
961                    join_metrics.clone(),
962                    reservation,
963                    need_produce_result_in_final(self.join_type),
964                    1,
965                    enable_dynamic_filter_pushdown,
966                ))
967            }
968            PartitionMode::Auto => {
969                return plan_err!(
970                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
971                    PartitionMode::Auto
972                );
973            }
974        };
975
976        let batch_size = context.session_config().batch_size();
977
978        // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
979        let bounds_accumulator = enable_dynamic_filter_pushdown
980            .then(|| {
981                self.dynamic_filter.as_ref().map(|df| {
982                    let filter = Arc::clone(&df.filter);
983                    let on_right = self
984                        .on
985                        .iter()
986                        .map(|(_, right_expr)| Arc::clone(right_expr))
987                        .collect::<Vec<_>>();
988                    Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
989                        Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
990                            self.mode,
991                            self.left.as_ref(),
992                            self.right.as_ref(),
993                            filter,
994                            on_right,
995                        ))
996                    })))
997                })
998            })
999            .flatten()
1000            .flatten();
1001
1002        // we have the batches and the hash map with their keys. We can how create a stream
1003        // over the right that uses this information to issue new batches.
1004        let right_stream = self.right.execute(partition, context)?;
1005
1006        // update column indices to reflect the projection
1007        let column_indices_after_projection = match &self.projection {
1008            Some(projection) => projection
1009                .iter()
1010                .map(|i| self.column_indices[*i].clone())
1011                .collect(),
1012            None => self.column_indices.clone(),
1013        };
1014
1015        let on_right = self
1016            .on
1017            .iter()
1018            .map(|(_, right_expr)| Arc::clone(right_expr))
1019            .collect::<Vec<_>>();
1020
1021        Ok(Box::pin(HashJoinStream::new(
1022            partition,
1023            self.schema(),
1024            on_right,
1025            self.filter.clone(),
1026            self.join_type,
1027            right_stream,
1028            self.random_state.clone(),
1029            join_metrics,
1030            column_indices_after_projection,
1031            self.null_equality,
1032            HashJoinStreamState::WaitBuildSide,
1033            BuildSide::Initial(BuildSideInitialState { left_fut }),
1034            batch_size,
1035            vec![],
1036            self.right.output_ordering().is_some(),
1037            bounds_accumulator,
1038            self.mode,
1039        )))
1040    }
1041
1042    fn metrics(&self) -> Option<MetricsSet> {
1043        Some(self.metrics.clone_inner())
1044    }
1045
1046    fn statistics(&self) -> Result<Statistics> {
1047        self.partition_statistics(None)
1048    }
1049
1050    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1051        if partition.is_some() {
1052            return Ok(Statistics::new_unknown(&self.schema()));
1053        }
1054        // TODO stats: it is not possible in general to know the output size of joins
1055        // There are some special cases though, for example:
1056        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1057        let stats = estimate_join_statistics(
1058            self.left.partition_statistics(None)?,
1059            self.right.partition_statistics(None)?,
1060            self.on.clone(),
1061            &self.join_type,
1062            &self.join_schema,
1063        )?;
1064        // Project statistics if there is a projection
1065        Ok(stats.project(self.projection.as_ref()))
1066    }
1067
1068    /// Tries to push `projection` down through `hash_join`. If possible, performs the
1069    /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
1070    /// as its children. Otherwise, returns `None`.
1071    fn try_swapping_with_projection(
1072        &self,
1073        projection: &ProjectionExec,
1074    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1075        // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
1076        if self.contains_projection() {
1077            return Ok(None);
1078        }
1079
1080        if let Some(JoinData {
1081            projected_left_child,
1082            projected_right_child,
1083            join_filter,
1084            join_on,
1085        }) = try_pushdown_through_join(
1086            projection,
1087            self.left(),
1088            self.right(),
1089            self.on(),
1090            self.schema(),
1091            self.filter(),
1092        )? {
1093            Ok(Some(Arc::new(HashJoinExec::try_new(
1094                Arc::new(projected_left_child),
1095                Arc::new(projected_right_child),
1096                join_on,
1097                join_filter,
1098                self.join_type(),
1099                // Returned early if projection is not None
1100                None,
1101                *self.partition_mode(),
1102                self.null_equality,
1103            )?)))
1104        } else {
1105            try_embed_projection(projection, self)
1106        }
1107    }
1108
1109    fn gather_filters_for_pushdown(
1110        &self,
1111        phase: FilterPushdownPhase,
1112        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1113        config: &ConfigOptions,
1114    ) -> Result<FilterDescription> {
1115        // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1116        // For now we don't support them.
1117        // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1118        // See https://github.com/apache/datafusion/issues/16973 for tracking.
1119        if self.join_type != JoinType::Inner {
1120            return Ok(FilterDescription::all_unsupported(
1121                &parent_filters,
1122                &self.children(),
1123            ));
1124        }
1125
1126        // Get basic filter descriptions for both children
1127        let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1128            &parent_filters,
1129            self.left(),
1130        )?;
1131        let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1132            &parent_filters,
1133            self.right(),
1134        )?;
1135
1136        // Add dynamic filters in Post phase if enabled
1137        if matches!(phase, FilterPushdownPhase::Post)
1138            && config.optimizer.enable_join_dynamic_filter_pushdown
1139        {
1140            // Add actual dynamic filter to right side (probe side)
1141            let dynamic_filter = Self::create_dynamic_filter(&self.on);
1142            right_child = right_child.with_self_filter(dynamic_filter);
1143        }
1144
1145        Ok(FilterDescription::new()
1146            .with_child(left_child)
1147            .with_child(right_child))
1148    }
1149
1150    fn handle_child_pushdown_result(
1151        &self,
1152        _phase: FilterPushdownPhase,
1153        child_pushdown_result: ChildPushdownResult,
1154        _config: &ConfigOptions,
1155    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1156        // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for
1157        // non-inner joins in `gather_filters_for_pushdown`.
1158        // However it's a cheap check and serves to inform future devs touching this function that they need to be really
1159        // careful pushing down filters through non-inner joins.
1160        if self.join_type != JoinType::Inner {
1161            // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1162            // For now we don't support them.
1163            // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1164            return Ok(FilterPushdownPropagation::all_unsupported(
1165                child_pushdown_result,
1166            ));
1167        }
1168
1169        let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1170        assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1171        let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1172                                                                               // We expect 0 or 1 self filters
1173        if let Some(filter) = right_child_self_filters.first() {
1174            // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1175            // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1176            let predicate = Arc::clone(&filter.predicate);
1177            if let Ok(dynamic_filter) =
1178                Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1179            {
1180                // We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1181                let new_node = Arc::new(HashJoinExec {
1182                    left: Arc::clone(&self.left),
1183                    right: Arc::clone(&self.right),
1184                    on: self.on.clone(),
1185                    filter: self.filter.clone(),
1186                    join_type: self.join_type,
1187                    join_schema: Arc::clone(&self.join_schema),
1188                    left_fut: Arc::clone(&self.left_fut),
1189                    random_state: self.random_state.clone(),
1190                    mode: self.mode,
1191                    metrics: ExecutionPlanMetricsSet::new(),
1192                    projection: self.projection.clone(),
1193                    column_indices: self.column_indices.clone(),
1194                    null_equality: self.null_equality,
1195                    cache: self.cache.clone(),
1196                    dynamic_filter: Some(HashJoinExecDynamicFilter {
1197                        filter: dynamic_filter,
1198                        bounds_accumulator: OnceLock::new(),
1199                    }),
1200                });
1201                result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1202            }
1203        }
1204        Ok(result)
1205    }
1206}
1207
1208/// Accumulator for collecting min/max bounds from build-side data during hash join.
1209///
1210/// This struct encapsulates the logic for progressively computing column bounds
1211/// (minimum and maximum values) for a specific join key expression as batches
1212/// are processed during the build phase of a hash join.
1213///
1214/// The bounds are used for dynamic filter pushdown optimization, where filters
1215/// based on the actual data ranges can be pushed down to the probe side to
1216/// eliminate unnecessary data early.
1217struct CollectLeftAccumulator {
1218    /// The physical expression to evaluate for each batch
1219    expr: Arc<dyn PhysicalExpr>,
1220    /// Accumulator for tracking the minimum value across all batches
1221    min: MinAccumulator,
1222    /// Accumulator for tracking the maximum value across all batches
1223    max: MaxAccumulator,
1224}
1225
1226impl CollectLeftAccumulator {
1227    /// Creates a new accumulator for tracking bounds of a join key expression.
1228    ///
1229    /// # Arguments
1230    /// * `expr` - The physical expression to track bounds for
1231    /// * `schema` - The schema of the input data
1232    ///
1233    /// # Returns
1234    /// A new `CollectLeftAccumulator` instance configured for the expression's data type
1235    fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1236        /// Recursively unwraps dictionary types to get the underlying value type.
1237        fn dictionary_value_type(data_type: &DataType) -> DataType {
1238            match data_type {
1239                DataType::Dictionary(_, value_type) => {
1240                    dictionary_value_type(value_type.as_ref())
1241                }
1242                _ => data_type.clone(),
1243            }
1244        }
1245
1246        let data_type = expr
1247            .data_type(schema)
1248            // Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
1249            .map(|dt| dictionary_value_type(&dt))?;
1250        Ok(Self {
1251            expr,
1252            min: MinAccumulator::try_new(&data_type)?,
1253            max: MaxAccumulator::try_new(&data_type)?,
1254        })
1255    }
1256
1257    /// Updates the accumulators with values from a new batch.
1258    ///
1259    /// Evaluates the expression on the batch and updates both min and max
1260    /// accumulators with the resulting values.
1261    ///
1262    /// # Arguments
1263    /// * `batch` - The record batch to process
1264    ///
1265    /// # Returns
1266    /// Ok(()) if the update succeeds, or an error if expression evaluation fails
1267    fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1268        let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1269        self.min.update_batch(std::slice::from_ref(&array))?;
1270        self.max.update_batch(std::slice::from_ref(&array))?;
1271        Ok(())
1272    }
1273
1274    /// Finalizes the accumulation and returns the computed bounds.
1275    ///
1276    /// Consumes self to extract the final min and max values from the accumulators.
1277    ///
1278    /// # Returns
1279    /// The `ColumnBounds` containing the minimum and maximum values observed
1280    fn evaluate(mut self) -> Result<ColumnBounds> {
1281        Ok(ColumnBounds::new(
1282            self.min.evaluate()?,
1283            self.max.evaluate()?,
1284        ))
1285    }
1286}
1287
1288/// State for collecting the build-side data during hash join
1289struct BuildSideState {
1290    batches: Vec<RecordBatch>,
1291    num_rows: usize,
1292    metrics: BuildProbeJoinMetrics,
1293    reservation: MemoryReservation,
1294    bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1295}
1296
1297impl BuildSideState {
1298    /// Create a new BuildSideState with optional accumulators for bounds computation
1299    fn try_new(
1300        metrics: BuildProbeJoinMetrics,
1301        reservation: MemoryReservation,
1302        on_left: Vec<Arc<dyn PhysicalExpr>>,
1303        schema: &SchemaRef,
1304        should_compute_bounds: bool,
1305    ) -> Result<Self> {
1306        Ok(Self {
1307            batches: Vec::new(),
1308            num_rows: 0,
1309            metrics,
1310            reservation,
1311            bounds_accumulators: should_compute_bounds
1312                .then(|| {
1313                    on_left
1314                        .iter()
1315                        .map(|expr| {
1316                            CollectLeftAccumulator::try_new(Arc::clone(expr), schema)
1317                        })
1318                        .collect::<Result<Vec<_>>>()
1319                })
1320                .transpose()?,
1321        })
1322    }
1323}
1324
1325/// Collects all batches from the left (build) side stream and creates a hash map for joining.
1326///
1327/// This function is responsible for:
1328/// 1. Consuming the entire left stream and collecting all batches into memory
1329/// 2. Building a hash map from the join key columns for efficient probe operations
1330/// 3. Computing bounds for dynamic filter pushdown (if enabled)
1331/// 4. Preparing visited indices bitmap for certain join types
1332///
1333/// # Parameters
1334/// * `random_state` - Random state for consistent hashing across partitions
1335/// * `left_stream` - Stream of record batches from the build side
1336/// * `on_left` - Physical expressions for the left side join keys
1337/// * `metrics` - Metrics collector for tracking memory usage and row counts
1338/// * `reservation` - Memory reservation tracker for the hash table and data
1339/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
1340/// * `probe_threads_count` - Number of threads that will probe this hash table
1341/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering
1342///
1343/// # Dynamic Filter Coordination
1344/// When `should_compute_bounds` is true, this function computes the min/max bounds
1345/// for each join key column but does NOT update the dynamic filter. Instead, the
1346/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1347/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
1348/// before updating the filter exactly once.
1349///
1350/// # Returns
1351/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
1352/// visited indices bitmap, and computed bounds (if requested).
1353#[allow(clippy::too_many_arguments)]
1354async fn collect_left_input(
1355    random_state: RandomState,
1356    left_stream: SendableRecordBatchStream,
1357    on_left: Vec<PhysicalExprRef>,
1358    metrics: BuildProbeJoinMetrics,
1359    reservation: MemoryReservation,
1360    with_visited_indices_bitmap: bool,
1361    probe_threads_count: usize,
1362    should_compute_bounds: bool,
1363) -> Result<JoinLeftData> {
1364    let schema = left_stream.schema();
1365
1366    // This operation performs 2 steps at once:
1367    // 1. creates a [JoinHashMap] of all batches from the stream
1368    // 2. stores the batches in a vector.
1369    let initial = BuildSideState::try_new(
1370        metrics,
1371        reservation,
1372        on_left.clone(),
1373        &schema,
1374        should_compute_bounds,
1375    )?;
1376
1377    let state = left_stream
1378        .try_fold(initial, |mut state, batch| async move {
1379            // Update accumulators if computing bounds
1380            if let Some(ref mut accumulators) = state.bounds_accumulators {
1381                for accumulator in accumulators {
1382                    accumulator.update_batch(&batch)?;
1383                }
1384            }
1385
1386            // Decide if we spill or not
1387            let batch_size = get_record_batch_memory_size(&batch);
1388            // Reserve memory for incoming batch
1389            state.reservation.try_grow(batch_size)?;
1390            // Update metrics
1391            state.metrics.build_mem_used.add(batch_size);
1392            state.metrics.build_input_batches.add(1);
1393            state.metrics.build_input_rows.add(batch.num_rows());
1394            // Update row count
1395            state.num_rows += batch.num_rows();
1396            // Push batch to output
1397            state.batches.push(batch);
1398            Ok(state)
1399        })
1400        .await?;
1401
1402    // Extract fields from state
1403    let BuildSideState {
1404        batches,
1405        num_rows,
1406        metrics,
1407        mut reservation,
1408        bounds_accumulators,
1409    } = state;
1410
1411    // Estimation of memory size, required for hashtable, prior to allocation.
1412    // Final result can be verified using `RawTable.allocation_info()`
1413    let fixed_size_u32 = size_of::<JoinHashMapU32>();
1414    let fixed_size_u64 = size_of::<JoinHashMapU64>();
1415
1416    // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
1417    // `u64` indice variant
1418    let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1419        let estimated_hashtable_size =
1420            estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1421        reservation.try_grow(estimated_hashtable_size)?;
1422        metrics.build_mem_used.add(estimated_hashtable_size);
1423        Box::new(JoinHashMapU64::with_capacity(num_rows))
1424    } else {
1425        let estimated_hashtable_size =
1426            estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1427        reservation.try_grow(estimated_hashtable_size)?;
1428        metrics.build_mem_used.add(estimated_hashtable_size);
1429        Box::new(JoinHashMapU32::with_capacity(num_rows))
1430    };
1431
1432    let mut hashes_buffer = Vec::new();
1433    let mut offset = 0;
1434
1435    // Updating hashmap starting from the last batch
1436    let batches_iter = batches.iter().rev();
1437    for batch in batches_iter.clone() {
1438        hashes_buffer.clear();
1439        hashes_buffer.resize(batch.num_rows(), 0);
1440        update_hash(
1441            &on_left,
1442            batch,
1443            &mut *hashmap,
1444            offset,
1445            &random_state,
1446            &mut hashes_buffer,
1447            0,
1448            true,
1449        )?;
1450        offset += batch.num_rows();
1451    }
1452    // Merge all batches into a single batch, so we can directly index into the arrays
1453    let single_batch = concat_batches(&schema, batches_iter)?;
1454
1455    // Reserve additional memory for visited indices bitmap and create shared builder
1456    let visited_indices_bitmap = if with_visited_indices_bitmap {
1457        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1458        reservation.try_grow(bitmap_size)?;
1459        metrics.build_mem_used.add(bitmap_size);
1460
1461        let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1462        bitmap_buffer.append_n(num_rows, false);
1463        bitmap_buffer
1464    } else {
1465        BooleanBufferBuilder::new(0)
1466    };
1467
1468    let left_values = on_left
1469        .iter()
1470        .map(|c| {
1471            c.evaluate(&single_batch)?
1472                .into_array(single_batch.num_rows())
1473        })
1474        .collect::<Result<Vec<_>>>()?;
1475
1476    // Compute bounds for dynamic filter if enabled
1477    let bounds = match bounds_accumulators {
1478        Some(accumulators) if num_rows > 0 => {
1479            let bounds = accumulators
1480                .into_iter()
1481                .map(CollectLeftAccumulator::evaluate)
1482                .collect::<Result<Vec<_>>>()?;
1483            Some(bounds)
1484        }
1485        _ => None,
1486    };
1487
1488    let data = JoinLeftData::new(
1489        hashmap,
1490        single_batch,
1491        left_values.clone(),
1492        Mutex::new(visited_indices_bitmap),
1493        AtomicUsize::new(probe_threads_count),
1494        reservation,
1495        bounds,
1496    );
1497
1498    Ok(data)
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503    use super::*;
1504    use crate::coalesce_partitions::CoalescePartitionsExec;
1505    use crate::joins::hash_join::stream::lookup_join_hashmap;
1506    use crate::test::{assert_join_metrics, TestMemoryExec};
1507    use crate::{
1508        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1509        test::exec::MockExec,
1510    };
1511
1512    use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1513    use arrow::buffer::NullBuffer;
1514    use arrow::datatypes::{DataType, Field};
1515    use arrow_schema::Schema;
1516    use datafusion_common::hash_utils::create_hashes;
1517    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1518    use datafusion_common::{
1519        assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
1520        ScalarValue,
1521    };
1522    use datafusion_execution::config::SessionConfig;
1523    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1524    use datafusion_expr::Operator;
1525    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1526    use datafusion_physical_expr::PhysicalExpr;
1527    use hashbrown::HashTable;
1528    use insta::{allow_duplicates, assert_snapshot};
1529    use rstest::*;
1530    use rstest_reuse::*;
1531
1532    fn div_ceil(a: usize, b: usize) -> usize {
1533        a.div_ceil(b)
1534    }
1535
1536    #[template]
1537    #[rstest]
1538    fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1539
1540    fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1541        let session_config = SessionConfig::default().with_batch_size(batch_size);
1542        Arc::new(TaskContext::default().with_session_config(session_config))
1543    }
1544
1545    fn build_table(
1546        a: (&str, &Vec<i32>),
1547        b: (&str, &Vec<i32>),
1548        c: (&str, &Vec<i32>),
1549    ) -> Arc<dyn ExecutionPlan> {
1550        let batch = build_table_i32(a, b, c);
1551        let schema = batch.schema();
1552        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1553    }
1554
1555    fn join(
1556        left: Arc<dyn ExecutionPlan>,
1557        right: Arc<dyn ExecutionPlan>,
1558        on: JoinOn,
1559        join_type: &JoinType,
1560        null_equality: NullEquality,
1561    ) -> Result<HashJoinExec> {
1562        HashJoinExec::try_new(
1563            left,
1564            right,
1565            on,
1566            None,
1567            join_type,
1568            None,
1569            PartitionMode::CollectLeft,
1570            null_equality,
1571        )
1572    }
1573
1574    fn join_with_filter(
1575        left: Arc<dyn ExecutionPlan>,
1576        right: Arc<dyn ExecutionPlan>,
1577        on: JoinOn,
1578        filter: JoinFilter,
1579        join_type: &JoinType,
1580        null_equality: NullEquality,
1581    ) -> Result<HashJoinExec> {
1582        HashJoinExec::try_new(
1583            left,
1584            right,
1585            on,
1586            Some(filter),
1587            join_type,
1588            None,
1589            PartitionMode::CollectLeft,
1590            null_equality,
1591        )
1592    }
1593
1594    async fn join_collect(
1595        left: Arc<dyn ExecutionPlan>,
1596        right: Arc<dyn ExecutionPlan>,
1597        on: JoinOn,
1598        join_type: &JoinType,
1599        null_equality: NullEquality,
1600        context: Arc<TaskContext>,
1601    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1602        let join = join(left, right, on, join_type, null_equality)?;
1603        let columns_header = columns(&join.schema());
1604
1605        let stream = join.execute(0, context)?;
1606        let batches = common::collect(stream).await?;
1607        let metrics = join.metrics().unwrap();
1608
1609        Ok((columns_header, batches, metrics))
1610    }
1611
1612    async fn partitioned_join_collect(
1613        left: Arc<dyn ExecutionPlan>,
1614        right: Arc<dyn ExecutionPlan>,
1615        on: JoinOn,
1616        join_type: &JoinType,
1617        null_equality: NullEquality,
1618        context: Arc<TaskContext>,
1619    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1620        join_collect_with_partition_mode(
1621            left,
1622            right,
1623            on,
1624            join_type,
1625            PartitionMode::Partitioned,
1626            null_equality,
1627            context,
1628        )
1629        .await
1630    }
1631
1632    async fn join_collect_with_partition_mode(
1633        left: Arc<dyn ExecutionPlan>,
1634        right: Arc<dyn ExecutionPlan>,
1635        on: JoinOn,
1636        join_type: &JoinType,
1637        partition_mode: PartitionMode,
1638        null_equality: NullEquality,
1639        context: Arc<TaskContext>,
1640    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1641        let partition_count = 4;
1642
1643        let (left_expr, right_expr) = on
1644            .iter()
1645            .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1646            .unzip();
1647
1648        let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1649            PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1650            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1651                left,
1652                Partitioning::Hash(left_expr, partition_count),
1653            )?),
1654            PartitionMode::Auto => {
1655                return internal_err!("Unexpected PartitionMode::Auto in join tests")
1656            }
1657        };
1658
1659        let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1660            PartitionMode::CollectLeft => {
1661                let partition_column_name = right.schema().field(0).name().clone();
1662                let partition_expr = vec![Arc::new(Column::new_with_schema(
1663                    &partition_column_name,
1664                    &right.schema(),
1665                )?) as _];
1666                Arc::new(RepartitionExec::try_new(
1667                    right,
1668                    Partitioning::Hash(partition_expr, partition_count),
1669                )?) as _
1670            }
1671            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1672                right,
1673                Partitioning::Hash(right_expr, partition_count),
1674            )?),
1675            PartitionMode::Auto => {
1676                return internal_err!("Unexpected PartitionMode::Auto in join tests")
1677            }
1678        };
1679
1680        let join = HashJoinExec::try_new(
1681            left_repartitioned,
1682            right_repartitioned,
1683            on,
1684            None,
1685            join_type,
1686            None,
1687            partition_mode,
1688            null_equality,
1689        )?;
1690
1691        let columns = columns(&join.schema());
1692
1693        let mut batches = vec![];
1694        for i in 0..partition_count {
1695            let stream = join.execute(i, Arc::clone(&context))?;
1696            let more_batches = common::collect(stream).await?;
1697            batches.extend(
1698                more_batches
1699                    .into_iter()
1700                    .filter(|b| b.num_rows() > 0)
1701                    .collect::<Vec<_>>(),
1702            );
1703        }
1704        let metrics = join.metrics().unwrap();
1705
1706        Ok((columns, batches, metrics))
1707    }
1708
1709    #[apply(batch_sizes)]
1710    #[tokio::test]
1711    async fn join_inner_one(batch_size: usize) -> Result<()> {
1712        let task_ctx = prepare_task_ctx(batch_size);
1713        let left = build_table(
1714            ("a1", &vec![1, 2, 3]),
1715            ("b1", &vec![4, 5, 5]), // this has a repetition
1716            ("c1", &vec![7, 8, 9]),
1717        );
1718        let right = build_table(
1719            ("a2", &vec![10, 20, 30]),
1720            ("b1", &vec![4, 5, 6]),
1721            ("c2", &vec![70, 80, 90]),
1722        );
1723
1724        let on = vec![(
1725            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1726            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1727        )];
1728
1729        let (columns, batches, metrics) = join_collect(
1730            Arc::clone(&left),
1731            Arc::clone(&right),
1732            on.clone(),
1733            &JoinType::Inner,
1734            NullEquality::NullEqualsNothing,
1735            task_ctx,
1736        )
1737        .await?;
1738
1739        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1740
1741        allow_duplicates! {
1742            // Inner join output is expected to preserve both inputs order
1743            assert_snapshot!(batches_to_string(&batches), @r#"
1744                +----+----+----+----+----+----+
1745                | a1 | b1 | c1 | a2 | b1 | c2 |
1746                +----+----+----+----+----+----+
1747                | 1  | 4  | 7  | 10 | 4  | 70 |
1748                | 2  | 5  | 8  | 20 | 5  | 80 |
1749                | 3  | 5  | 9  | 20 | 5  | 80 |
1750                +----+----+----+----+----+----+
1751                "#);
1752        }
1753
1754        assert_join_metrics!(metrics, 3);
1755
1756        Ok(())
1757    }
1758
1759    #[apply(batch_sizes)]
1760    #[tokio::test]
1761    async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1762        let task_ctx = prepare_task_ctx(batch_size);
1763        let left = build_table(
1764            ("a1", &vec![1, 2, 3]),
1765            ("b1", &vec![4, 5, 5]), // this has a repetition
1766            ("c1", &vec![7, 8, 9]),
1767        );
1768        let right = build_table(
1769            ("a2", &vec![10, 20, 30]),
1770            ("b1", &vec![4, 5, 6]),
1771            ("c2", &vec![70, 80, 90]),
1772        );
1773        let on = vec![(
1774            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1775            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1776        )];
1777
1778        let (columns, batches, metrics) = partitioned_join_collect(
1779            Arc::clone(&left),
1780            Arc::clone(&right),
1781            on.clone(),
1782            &JoinType::Inner,
1783            NullEquality::NullEqualsNothing,
1784            task_ctx,
1785        )
1786        .await?;
1787
1788        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1789
1790        allow_duplicates! {
1791            assert_snapshot!(batches_to_sort_string(&batches), @r#"
1792                +----+----+----+----+----+----+
1793                | a1 | b1 | c1 | a2 | b1 | c2 |
1794                +----+----+----+----+----+----+
1795                | 1  | 4  | 7  | 10 | 4  | 70 |
1796                | 2  | 5  | 8  | 20 | 5  | 80 |
1797                | 3  | 5  | 9  | 20 | 5  | 80 |
1798                +----+----+----+----+----+----+
1799                "#);
1800        }
1801
1802        assert_join_metrics!(metrics, 3);
1803
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    async fn join_inner_one_no_shared_column_names() -> Result<()> {
1809        let task_ctx = Arc::new(TaskContext::default());
1810        let left = build_table(
1811            ("a1", &vec![1, 2, 3]),
1812            ("b1", &vec![4, 5, 5]), // this has a repetition
1813            ("c1", &vec![7, 8, 9]),
1814        );
1815        let right = build_table(
1816            ("a2", &vec![10, 20, 30]),
1817            ("b2", &vec![4, 5, 6]),
1818            ("c2", &vec![70, 80, 90]),
1819        );
1820        let on = vec![(
1821            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1822            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1823        )];
1824
1825        let (columns, batches, metrics) = join_collect(
1826            left,
1827            right,
1828            on,
1829            &JoinType::Inner,
1830            NullEquality::NullEqualsNothing,
1831            task_ctx,
1832        )
1833        .await?;
1834
1835        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1836
1837        // Inner join output is expected to preserve both inputs order
1838        allow_duplicates! {
1839            assert_snapshot!(batches_to_string(&batches), @r#"
1840            +----+----+----+----+----+----+
1841            | a1 | b1 | c1 | a2 | b2 | c2 |
1842            +----+----+----+----+----+----+
1843            | 1  | 4  | 7  | 10 | 4  | 70 |
1844            | 2  | 5  | 8  | 20 | 5  | 80 |
1845            | 3  | 5  | 9  | 20 | 5  | 80 |
1846            +----+----+----+----+----+----+
1847                "#);
1848        }
1849
1850        assert_join_metrics!(metrics, 3);
1851
1852        Ok(())
1853    }
1854
1855    #[tokio::test]
1856    async fn join_inner_one_randomly_ordered() -> Result<()> {
1857        let task_ctx = Arc::new(TaskContext::default());
1858        let left = build_table(
1859            ("a1", &vec![0, 3, 2, 1]),
1860            ("b1", &vec![4, 5, 5, 4]),
1861            ("c1", &vec![6, 9, 8, 7]),
1862        );
1863        let right = build_table(
1864            ("a2", &vec![20, 30, 10]),
1865            ("b2", &vec![5, 6, 4]),
1866            ("c2", &vec![80, 90, 70]),
1867        );
1868        let on = vec![(
1869            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1870            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1871        )];
1872
1873        let (columns, batches, metrics) = join_collect(
1874            left,
1875            right,
1876            on,
1877            &JoinType::Inner,
1878            NullEquality::NullEqualsNothing,
1879            task_ctx,
1880        )
1881        .await?;
1882
1883        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1884
1885        // Inner join output is expected to preserve both inputs order
1886        allow_duplicates! {
1887            assert_snapshot!(batches_to_string(&batches), @r#"
1888            +----+----+----+----+----+----+
1889            | a1 | b1 | c1 | a2 | b2 | c2 |
1890            +----+----+----+----+----+----+
1891            | 3  | 5  | 9  | 20 | 5  | 80 |
1892            | 2  | 5  | 8  | 20 | 5  | 80 |
1893            | 0  | 4  | 6  | 10 | 4  | 70 |
1894            | 1  | 4  | 7  | 10 | 4  | 70 |
1895            +----+----+----+----+----+----+
1896                "#);
1897        }
1898
1899        assert_join_metrics!(metrics, 4);
1900
1901        Ok(())
1902    }
1903
1904    #[apply(batch_sizes)]
1905    #[tokio::test]
1906    async fn join_inner_two(batch_size: usize) -> Result<()> {
1907        let task_ctx = prepare_task_ctx(batch_size);
1908        let left = build_table(
1909            ("a1", &vec![1, 2, 2]),
1910            ("b2", &vec![1, 2, 2]),
1911            ("c1", &vec![7, 8, 9]),
1912        );
1913        let right = build_table(
1914            ("a1", &vec![1, 2, 3]),
1915            ("b2", &vec![1, 2, 2]),
1916            ("c2", &vec![70, 80, 90]),
1917        );
1918        let on = vec![
1919            (
1920                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1921                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1922            ),
1923            (
1924                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1925                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1926            ),
1927        ];
1928
1929        let (columns, batches, metrics) = join_collect(
1930            left,
1931            right,
1932            on,
1933            &JoinType::Inner,
1934            NullEquality::NullEqualsNothing,
1935            task_ctx,
1936        )
1937        .await?;
1938
1939        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1940
1941        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1942            // Expected number of hash table matches = 3
1943            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
1944            let mut expected_batch_count = div_ceil(3, batch_size);
1945            if batch_size == 1 {
1946                expected_batch_count += 1;
1947            }
1948            expected_batch_count
1949        } else {
1950            // With hash collisions enabled, all records will match each other
1951            // and filtered later.
1952            div_ceil(9, batch_size)
1953        };
1954
1955        assert_eq!(batches.len(), expected_batch_count);
1956
1957        // Inner join output is expected to preserve both inputs order
1958        allow_duplicates! {
1959            assert_snapshot!(batches_to_string(&batches), @r#"
1960            +----+----+----+----+----+----+
1961            | a1 | b2 | c1 | a1 | b2 | c2 |
1962            +----+----+----+----+----+----+
1963            | 1  | 1  | 7  | 1  | 1  | 70 |
1964            | 2  | 2  | 8  | 2  | 2  | 80 |
1965            | 2  | 2  | 9  | 2  | 2  | 80 |
1966            +----+----+----+----+----+----+
1967                "#);
1968        }
1969
1970        assert_join_metrics!(metrics, 3);
1971
1972        Ok(())
1973    }
1974
1975    /// Test where the left has 2 parts, the right with 1 part => 1 part
1976    #[apply(batch_sizes)]
1977    #[tokio::test]
1978    async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
1979        let task_ctx = prepare_task_ctx(batch_size);
1980        let batch1 = build_table_i32(
1981            ("a1", &vec![1, 2]),
1982            ("b2", &vec![1, 2]),
1983            ("c1", &vec![7, 8]),
1984        );
1985        let batch2 =
1986            build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
1987        let schema = batch1.schema();
1988        let left =
1989            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1990                .unwrap();
1991        let left = Arc::new(CoalescePartitionsExec::new(left));
1992
1993        let right = build_table(
1994            ("a1", &vec![1, 2, 3]),
1995            ("b2", &vec![1, 2, 2]),
1996            ("c2", &vec![70, 80, 90]),
1997        );
1998        let on = vec![
1999            (
2000                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2001                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2002            ),
2003            (
2004                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2005                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2006            ),
2007        ];
2008
2009        let (columns, batches, metrics) = join_collect(
2010            left,
2011            right,
2012            on,
2013            &JoinType::Inner,
2014            NullEquality::NullEqualsNothing,
2015            task_ctx,
2016        )
2017        .await?;
2018
2019        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2020
2021        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2022            // Expected number of hash table matches = 3
2023            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2024            let mut expected_batch_count = div_ceil(3, batch_size);
2025            if batch_size == 1 {
2026                expected_batch_count += 1;
2027            }
2028            expected_batch_count
2029        } else {
2030            // With hash collisions enabled, all records will match each other
2031            // and filtered later.
2032            div_ceil(9, batch_size)
2033        };
2034
2035        assert_eq!(batches.len(), expected_batch_count);
2036
2037        // Inner join output is expected to preserve both inputs order
2038        allow_duplicates! {
2039            assert_snapshot!(batches_to_string(&batches), @r#"
2040            +----+----+----+----+----+----+
2041            | a1 | b2 | c1 | a1 | b2 | c2 |
2042            +----+----+----+----+----+----+
2043            | 1  | 1  | 7  | 1  | 1  | 70 |
2044            | 2  | 2  | 8  | 2  | 2  | 80 |
2045            | 2  | 2  | 9  | 2  | 2  | 80 |
2046            +----+----+----+----+----+----+
2047                "#);
2048        }
2049
2050        assert_join_metrics!(metrics, 3);
2051
2052        Ok(())
2053    }
2054
2055    #[tokio::test]
2056    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2057        let task_ctx = Arc::new(TaskContext::default());
2058        let batch1 = build_table_i32(
2059            ("a1", &vec![0, 3]),
2060            ("b1", &vec![4, 5]),
2061            ("c1", &vec![6, 9]),
2062        );
2063        let batch2 = build_table_i32(
2064            ("a1", &vec![2, 1]),
2065            ("b1", &vec![5, 4]),
2066            ("c1", &vec![8, 7]),
2067        );
2068        let schema = batch1.schema();
2069
2070        let left =
2071            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2072                .unwrap();
2073        let left = Arc::new(CoalescePartitionsExec::new(left));
2074        let right = build_table(
2075            ("a2", &vec![20, 30, 10]),
2076            ("b2", &vec![5, 6, 4]),
2077            ("c2", &vec![80, 90, 70]),
2078        );
2079        let on = vec![(
2080            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2081            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2082        )];
2083
2084        let (columns, batches, metrics) = join_collect(
2085            left,
2086            right,
2087            on,
2088            &JoinType::Inner,
2089            NullEquality::NullEqualsNothing,
2090            task_ctx,
2091        )
2092        .await?;
2093
2094        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2095
2096        // Inner join output is expected to preserve both inputs order
2097        allow_duplicates! {
2098            assert_snapshot!(batches_to_string(&batches), @r#"
2099            +----+----+----+----+----+----+
2100            | a1 | b1 | c1 | a2 | b2 | c2 |
2101            +----+----+----+----+----+----+
2102            | 3  | 5  | 9  | 20 | 5  | 80 |
2103            | 2  | 5  | 8  | 20 | 5  | 80 |
2104            | 0  | 4  | 6  | 10 | 4  | 70 |
2105            | 1  | 4  | 7  | 10 | 4  | 70 |
2106            +----+----+----+----+----+----+
2107                "#);
2108        }
2109
2110        assert_join_metrics!(metrics, 4);
2111
2112        Ok(())
2113    }
2114
2115    /// Test where the left has 1 part, the right has 2 parts => 2 parts
2116    #[apply(batch_sizes)]
2117    #[tokio::test]
2118    async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
2119        let task_ctx = prepare_task_ctx(batch_size);
2120        let left = build_table(
2121            ("a1", &vec![1, 2, 3]),
2122            ("b1", &vec![4, 5, 5]), // this has a repetition
2123            ("c1", &vec![7, 8, 9]),
2124        );
2125
2126        let batch1 = build_table_i32(
2127            ("a2", &vec![10, 20]),
2128            ("b1", &vec![4, 6]),
2129            ("c2", &vec![70, 80]),
2130        );
2131        let batch2 =
2132            build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2133        let schema = batch1.schema();
2134        let right =
2135            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2136                .unwrap();
2137
2138        let on = vec![(
2139            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2140            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2141        )];
2142
2143        let join = join(
2144            left,
2145            right,
2146            on,
2147            &JoinType::Inner,
2148            NullEquality::NullEqualsNothing,
2149        )?;
2150
2151        let columns = columns(&join.schema());
2152        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2153
2154        // first part
2155        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2156        let batches = common::collect(stream).await?;
2157
2158        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2159            // Expected number of hash table matches for first right batch = 1
2160            // and additional empty batch for non-joined 20-6-80
2161            let mut expected_batch_count = div_ceil(1, batch_size);
2162            if batch_size == 1 {
2163                expected_batch_count += 1;
2164            }
2165            expected_batch_count
2166        } else {
2167            // With hash collisions enabled, all records will match each other
2168            // and filtered later.
2169            div_ceil(6, batch_size)
2170        };
2171        assert_eq!(batches.len(), expected_batch_count);
2172
2173        // Inner join output is expected to preserve both inputs order
2174        allow_duplicates! {
2175            assert_snapshot!(batches_to_string(&batches), @r#"
2176            +----+----+----+----+----+----+
2177            | a1 | b1 | c1 | a2 | b1 | c2 |
2178            +----+----+----+----+----+----+
2179            | 1  | 4  | 7  | 10 | 4  | 70 |
2180            +----+----+----+----+----+----+
2181                "#);
2182        }
2183
2184        // second part
2185        let stream = join.execute(1, Arc::clone(&task_ctx))?;
2186        let batches = common::collect(stream).await?;
2187
2188        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2189            // Expected number of hash table matches for second right batch = 2
2190            div_ceil(2, batch_size)
2191        } else {
2192            // With hash collisions enabled, all records will match each other
2193            // and filtered later.
2194            div_ceil(3, batch_size)
2195        };
2196        assert_eq!(batches.len(), expected_batch_count);
2197
2198        // Inner join output is expected to preserve both inputs order
2199        allow_duplicates! {
2200            assert_snapshot!(batches_to_string(&batches), @r#"
2201            +----+----+----+----+----+----+
2202            | a1 | b1 | c1 | a2 | b1 | c2 |
2203            +----+----+----+----+----+----+
2204            | 2  | 5  | 8  | 30 | 5  | 90 |
2205            | 3  | 5  | 9  | 30 | 5  | 90 |
2206            +----+----+----+----+----+----+
2207                "#);
2208        }
2209
2210        Ok(())
2211    }
2212
2213    fn build_table_two_batches(
2214        a: (&str, &Vec<i32>),
2215        b: (&str, &Vec<i32>),
2216        c: (&str, &Vec<i32>),
2217    ) -> Arc<dyn ExecutionPlan> {
2218        let batch = build_table_i32(a, b, c);
2219        let schema = batch.schema();
2220        TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2221    }
2222
2223    #[apply(batch_sizes)]
2224    #[tokio::test]
2225    async fn join_left_multi_batch(batch_size: usize) {
2226        let task_ctx = prepare_task_ctx(batch_size);
2227        let left = build_table(
2228            ("a1", &vec![1, 2, 3]),
2229            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2230            ("c1", &vec![7, 8, 9]),
2231        );
2232        let right = build_table_two_batches(
2233            ("a2", &vec![10, 20, 30]),
2234            ("b1", &vec![4, 5, 6]),
2235            ("c2", &vec![70, 80, 90]),
2236        );
2237        let on = vec![(
2238            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2239            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2240        )];
2241
2242        let join = join(
2243            left,
2244            right,
2245            on,
2246            &JoinType::Left,
2247            NullEquality::NullEqualsNothing,
2248        )
2249        .unwrap();
2250
2251        let columns = columns(&join.schema());
2252        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2253
2254        let stream = join.execute(0, task_ctx).unwrap();
2255        let batches = common::collect(stream).await.unwrap();
2256
2257        allow_duplicates! {
2258            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2259            +----+----+----+----+----+----+
2260            | a1 | b1 | c1 | a2 | b1 | c2 |
2261            +----+----+----+----+----+----+
2262            | 1  | 4  | 7  | 10 | 4  | 70 |
2263            | 1  | 4  | 7  | 10 | 4  | 70 |
2264            | 2  | 5  | 8  | 20 | 5  | 80 |
2265            | 2  | 5  | 8  | 20 | 5  | 80 |
2266            | 3  | 7  | 9  |    |    |    |
2267            +----+----+----+----+----+----+
2268                "#);
2269        }
2270    }
2271
2272    #[apply(batch_sizes)]
2273    #[tokio::test]
2274    async fn join_full_multi_batch(batch_size: usize) {
2275        let task_ctx = prepare_task_ctx(batch_size);
2276        let left = build_table(
2277            ("a1", &vec![1, 2, 3]),
2278            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2279            ("c1", &vec![7, 8, 9]),
2280        );
2281        // create two identical batches for the right side
2282        let right = build_table_two_batches(
2283            ("a2", &vec![10, 20, 30]),
2284            ("b2", &vec![4, 5, 6]),
2285            ("c2", &vec![70, 80, 90]),
2286        );
2287        let on = vec![(
2288            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2289            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2290        )];
2291
2292        let join = join(
2293            left,
2294            right,
2295            on,
2296            &JoinType::Full,
2297            NullEquality::NullEqualsNothing,
2298        )
2299        .unwrap();
2300
2301        let columns = columns(&join.schema());
2302        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2303
2304        let stream = join.execute(0, task_ctx).unwrap();
2305        let batches = common::collect(stream).await.unwrap();
2306
2307        allow_duplicates! {
2308            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2309            +----+----+----+----+----+----+
2310            | a1 | b1 | c1 | a2 | b2 | c2 |
2311            +----+----+----+----+----+----+
2312            |    |    |    | 30 | 6  | 90 |
2313            |    |    |    | 30 | 6  | 90 |
2314            | 1  | 4  | 7  | 10 | 4  | 70 |
2315            | 1  | 4  | 7  | 10 | 4  | 70 |
2316            | 2  | 5  | 8  | 20 | 5  | 80 |
2317            | 2  | 5  | 8  | 20 | 5  | 80 |
2318            | 3  | 7  | 9  |    |    |    |
2319            +----+----+----+----+----+----+
2320                "#);
2321        }
2322    }
2323
2324    #[apply(batch_sizes)]
2325    #[tokio::test]
2326    async fn join_left_empty_right(batch_size: usize) {
2327        let task_ctx = prepare_task_ctx(batch_size);
2328        let left = build_table(
2329            ("a1", &vec![1, 2, 3]),
2330            ("b1", &vec![4, 5, 7]),
2331            ("c1", &vec![7, 8, 9]),
2332        );
2333        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2334        let on = vec![(
2335            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2336            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2337        )];
2338        let schema = right.schema();
2339        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2340        let join = join(
2341            left,
2342            right,
2343            on,
2344            &JoinType::Left,
2345            NullEquality::NullEqualsNothing,
2346        )
2347        .unwrap();
2348
2349        let columns = columns(&join.schema());
2350        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2351
2352        let stream = join.execute(0, task_ctx).unwrap();
2353        let batches = common::collect(stream).await.unwrap();
2354
2355        allow_duplicates! {
2356            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2357            +----+----+----+----+----+----+
2358            | a1 | b1 | c1 | a2 | b1 | c2 |
2359            +----+----+----+----+----+----+
2360            | 1  | 4  | 7  |    |    |    |
2361            | 2  | 5  | 8  |    |    |    |
2362            | 3  | 7  | 9  |    |    |    |
2363            +----+----+----+----+----+----+
2364                "#);
2365        }
2366    }
2367
2368    #[apply(batch_sizes)]
2369    #[tokio::test]
2370    async fn join_full_empty_right(batch_size: usize) {
2371        let task_ctx = prepare_task_ctx(batch_size);
2372        let left = build_table(
2373            ("a1", &vec![1, 2, 3]),
2374            ("b1", &vec![4, 5, 7]),
2375            ("c1", &vec![7, 8, 9]),
2376        );
2377        let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2378        let on = vec![(
2379            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2380            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2381        )];
2382        let schema = right.schema();
2383        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2384        let join = join(
2385            left,
2386            right,
2387            on,
2388            &JoinType::Full,
2389            NullEquality::NullEqualsNothing,
2390        )
2391        .unwrap();
2392
2393        let columns = columns(&join.schema());
2394        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2395
2396        let stream = join.execute(0, task_ctx).unwrap();
2397        let batches = common::collect(stream).await.unwrap();
2398
2399        allow_duplicates! {
2400            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2401            +----+----+----+----+----+----+
2402            | a1 | b1 | c1 | a2 | b2 | c2 |
2403            +----+----+----+----+----+----+
2404            | 1  | 4  | 7  |    |    |    |
2405            | 2  | 5  | 8  |    |    |    |
2406            | 3  | 7  | 9  |    |    |    |
2407            +----+----+----+----+----+----+
2408                "#);
2409        }
2410    }
2411
2412    #[apply(batch_sizes)]
2413    #[tokio::test]
2414    async fn join_left_one(batch_size: usize) -> Result<()> {
2415        let task_ctx = prepare_task_ctx(batch_size);
2416        let left = build_table(
2417            ("a1", &vec![1, 2, 3]),
2418            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2419            ("c1", &vec![7, 8, 9]),
2420        );
2421        let right = build_table(
2422            ("a2", &vec![10, 20, 30]),
2423            ("b1", &vec![4, 5, 6]),
2424            ("c2", &vec![70, 80, 90]),
2425        );
2426        let on = vec![(
2427            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2428            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2429        )];
2430
2431        let (columns, batches, metrics) = join_collect(
2432            Arc::clone(&left),
2433            Arc::clone(&right),
2434            on.clone(),
2435            &JoinType::Left,
2436            NullEquality::NullEqualsNothing,
2437            task_ctx,
2438        )
2439        .await?;
2440
2441        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2442
2443        allow_duplicates! {
2444            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2445            +----+----+----+----+----+----+
2446            | a1 | b1 | c1 | a2 | b1 | c2 |
2447            +----+----+----+----+----+----+
2448            | 1  | 4  | 7  | 10 | 4  | 70 |
2449            | 2  | 5  | 8  | 20 | 5  | 80 |
2450            | 3  | 7  | 9  |    |    |    |
2451            +----+----+----+----+----+----+
2452                "#);
2453        }
2454
2455        assert_join_metrics!(metrics, 3);
2456
2457        Ok(())
2458    }
2459
2460    #[apply(batch_sizes)]
2461    #[tokio::test]
2462    async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2463        let task_ctx = prepare_task_ctx(batch_size);
2464        let left = build_table(
2465            ("a1", &vec![1, 2, 3]),
2466            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2467            ("c1", &vec![7, 8, 9]),
2468        );
2469        let right = build_table(
2470            ("a2", &vec![10, 20, 30]),
2471            ("b1", &vec![4, 5, 6]),
2472            ("c2", &vec![70, 80, 90]),
2473        );
2474        let on = vec![(
2475            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2476            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2477        )];
2478
2479        let (columns, batches, metrics) = partitioned_join_collect(
2480            Arc::clone(&left),
2481            Arc::clone(&right),
2482            on.clone(),
2483            &JoinType::Left,
2484            NullEquality::NullEqualsNothing,
2485            task_ctx,
2486        )
2487        .await?;
2488
2489        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2490
2491        allow_duplicates! {
2492            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2493            +----+----+----+----+----+----+
2494            | a1 | b1 | c1 | a2 | b1 | c2 |
2495            +----+----+----+----+----+----+
2496            | 1  | 4  | 7  | 10 | 4  | 70 |
2497            | 2  | 5  | 8  | 20 | 5  | 80 |
2498            | 3  | 7  | 9  |    |    |    |
2499            +----+----+----+----+----+----+
2500                "#);
2501        }
2502
2503        assert_join_metrics!(metrics, 3);
2504
2505        Ok(())
2506    }
2507
2508    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2509        // just two line match
2510        // b1 = 10
2511        build_table(
2512            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2513            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2514            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2515        )
2516    }
2517
2518    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2519        // just two line match
2520        // b2 = 10
2521        build_table(
2522            ("a2", &vec![8, 12, 6, 2, 10, 4]),
2523            ("b2", &vec![8, 10, 6, 2, 10, 4]),
2524            ("c2", &vec![20, 40, 60, 80, 100, 120]),
2525        )
2526    }
2527
2528    #[apply(batch_sizes)]
2529    #[tokio::test]
2530    async fn join_left_semi(batch_size: usize) -> Result<()> {
2531        let task_ctx = prepare_task_ctx(batch_size);
2532        let left = build_semi_anti_left_table();
2533        let right = build_semi_anti_right_table();
2534        // left_table left semi join right_table on left_table.b1 = right_table.b2
2535        let on = vec![(
2536            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2537            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2538        )];
2539
2540        let join = join(
2541            left,
2542            right,
2543            on,
2544            &JoinType::LeftSemi,
2545            NullEquality::NullEqualsNothing,
2546        )?;
2547
2548        let columns = columns(&join.schema());
2549        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2550
2551        let stream = join.execute(0, task_ctx)?;
2552        let batches = common::collect(stream).await?;
2553
2554        // ignore the order
2555        allow_duplicates! {
2556            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2557            +----+----+-----+
2558            | a1 | b1 | c1  |
2559            +----+----+-----+
2560            | 11 | 8  | 110 |
2561            | 13 | 10 | 130 |
2562            | 9  | 8  | 90  |
2563            +----+----+-----+
2564                "#);
2565        }
2566
2567        Ok(())
2568    }
2569
2570    #[apply(batch_sizes)]
2571    #[tokio::test]
2572    async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2573        let task_ctx = prepare_task_ctx(batch_size);
2574        let left = build_semi_anti_left_table();
2575        let right = build_semi_anti_right_table();
2576
2577        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 10
2578        let on = vec![(
2579            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2580            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2581        )];
2582
2583        let column_indices = vec![ColumnIndex {
2584            index: 0,
2585            side: JoinSide::Right,
2586        }];
2587        let intermediate_schema =
2588            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2589
2590        let filter_expression = Arc::new(BinaryExpr::new(
2591            Arc::new(Column::new("x", 0)),
2592            Operator::NotEq,
2593            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2594        )) as Arc<dyn PhysicalExpr>;
2595
2596        let filter = JoinFilter::new(
2597            filter_expression,
2598            column_indices.clone(),
2599            Arc::new(intermediate_schema.clone()),
2600        );
2601
2602        let join = join_with_filter(
2603            Arc::clone(&left),
2604            Arc::clone(&right),
2605            on.clone(),
2606            filter,
2607            &JoinType::LeftSemi,
2608            NullEquality::NullEqualsNothing,
2609        )?;
2610
2611        let columns_header = columns(&join.schema());
2612        assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2613
2614        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2615        let batches = common::collect(stream).await?;
2616
2617        allow_duplicates! {
2618            assert_snapshot!(batches_to_sort_string(&batches), @r"
2619            +----+----+-----+
2620            | a1 | b1 | c1  |
2621            +----+----+-----+
2622            | 11 | 8  | 110 |
2623            | 13 | 10 | 130 |
2624            | 9  | 8  | 90  |
2625            +----+----+-----+
2626            ");
2627        }
2628
2629        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10
2630        let filter_expression = Arc::new(BinaryExpr::new(
2631            Arc::new(Column::new("x", 0)),
2632            Operator::Gt,
2633            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2634        )) as Arc<dyn PhysicalExpr>;
2635        let filter = JoinFilter::new(
2636            filter_expression,
2637            column_indices,
2638            Arc::new(intermediate_schema),
2639        );
2640
2641        let join = join_with_filter(
2642            left,
2643            right,
2644            on,
2645            filter,
2646            &JoinType::LeftSemi,
2647            NullEquality::NullEqualsNothing,
2648        )?;
2649
2650        let columns_header = columns(&join.schema());
2651        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2652
2653        let stream = join.execute(0, task_ctx)?;
2654        let batches = common::collect(stream).await?;
2655
2656        allow_duplicates! {
2657            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2658            +----+----+-----+
2659            | a1 | b1 | c1  |
2660            +----+----+-----+
2661            | 13 | 10 | 130 |
2662            +----+----+-----+
2663                "#);
2664        }
2665
2666        Ok(())
2667    }
2668
2669    #[apply(batch_sizes)]
2670    #[tokio::test]
2671    async fn join_right_semi(batch_size: usize) -> Result<()> {
2672        let task_ctx = prepare_task_ctx(batch_size);
2673        let left = build_semi_anti_left_table();
2674        let right = build_semi_anti_right_table();
2675
2676        // left_table right semi join right_table on left_table.b1 = right_table.b2
2677        let on = vec![(
2678            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2679            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2680        )];
2681
2682        let join = join(
2683            left,
2684            right,
2685            on,
2686            &JoinType::RightSemi,
2687            NullEquality::NullEqualsNothing,
2688        )?;
2689
2690        let columns = columns(&join.schema());
2691        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2692
2693        let stream = join.execute(0, task_ctx)?;
2694        let batches = common::collect(stream).await?;
2695
2696        // RightSemi join output is expected to preserve right input order
2697        allow_duplicates! {
2698            assert_snapshot!(batches_to_string(&batches), @r#"
2699            +----+----+-----+
2700            | a2 | b2 | c2  |
2701            +----+----+-----+
2702            | 8  | 8  | 20  |
2703            | 12 | 10 | 40  |
2704            | 10 | 10 | 100 |
2705            +----+----+-----+
2706                "#);
2707        }
2708
2709        Ok(())
2710    }
2711
2712    #[apply(batch_sizes)]
2713    #[tokio::test]
2714    async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2715        let task_ctx = prepare_task_ctx(batch_size);
2716        let left = build_semi_anti_left_table();
2717        let right = build_semi_anti_right_table();
2718
2719        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2720        let on = vec![(
2721            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2722            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2723        )];
2724
2725        let column_indices = vec![ColumnIndex {
2726            index: 0,
2727            side: JoinSide::Left,
2728        }];
2729        let intermediate_schema =
2730            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2731
2732        let filter_expression = Arc::new(BinaryExpr::new(
2733            Arc::new(Column::new("x", 0)),
2734            Operator::NotEq,
2735            Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2736        )) as Arc<dyn PhysicalExpr>;
2737
2738        let filter = JoinFilter::new(
2739            filter_expression,
2740            column_indices.clone(),
2741            Arc::new(intermediate_schema.clone()),
2742        );
2743
2744        let join = join_with_filter(
2745            Arc::clone(&left),
2746            Arc::clone(&right),
2747            on.clone(),
2748            filter,
2749            &JoinType::RightSemi,
2750            NullEquality::NullEqualsNothing,
2751        )?;
2752
2753        let columns = columns(&join.schema());
2754        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2755
2756        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2757        let batches = common::collect(stream).await?;
2758
2759        // RightSemi join output is expected to preserve right input order
2760        allow_duplicates! {
2761            assert_snapshot!(batches_to_string(&batches), @r#"
2762            +----+----+-----+
2763            | a2 | b2 | c2  |
2764            +----+----+-----+
2765            | 8  | 8  | 20  |
2766            | 12 | 10 | 40  |
2767            | 10 | 10 | 100 |
2768            +----+----+-----+
2769                "#);
2770        }
2771
2772        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2773        let filter_expression = Arc::new(BinaryExpr::new(
2774            Arc::new(Column::new("x", 0)),
2775            Operator::Gt,
2776            Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2777        )) as Arc<dyn PhysicalExpr>;
2778
2779        let filter = JoinFilter::new(
2780            filter_expression,
2781            column_indices,
2782            Arc::new(intermediate_schema.clone()),
2783        );
2784
2785        let join = join_with_filter(
2786            left,
2787            right,
2788            on,
2789            filter,
2790            &JoinType::RightSemi,
2791            NullEquality::NullEqualsNothing,
2792        )?;
2793        let stream = join.execute(0, task_ctx)?;
2794        let batches = common::collect(stream).await?;
2795
2796        // RightSemi join output is expected to preserve right input order
2797        allow_duplicates! {
2798            assert_snapshot!(batches_to_string(&batches), @r#"
2799            +----+----+-----+
2800            | a2 | b2 | c2  |
2801            +----+----+-----+
2802            | 12 | 10 | 40  |
2803            | 10 | 10 | 100 |
2804            +----+----+-----+
2805                "#);
2806        }
2807
2808        Ok(())
2809    }
2810
2811    #[apply(batch_sizes)]
2812    #[tokio::test]
2813    async fn join_left_anti(batch_size: usize) -> Result<()> {
2814        let task_ctx = prepare_task_ctx(batch_size);
2815        let left = build_semi_anti_left_table();
2816        let right = build_semi_anti_right_table();
2817        // left_table left anti join right_table on left_table.b1 = right_table.b2
2818        let on = vec![(
2819            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2820            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2821        )];
2822
2823        let join = join(
2824            left,
2825            right,
2826            on,
2827            &JoinType::LeftAnti,
2828            NullEquality::NullEqualsNothing,
2829        )?;
2830
2831        let columns = columns(&join.schema());
2832        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2833
2834        let stream = join.execute(0, task_ctx)?;
2835        let batches = common::collect(stream).await?;
2836
2837        allow_duplicates! {
2838            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2839            +----+----+----+
2840            | a1 | b1 | c1 |
2841            +----+----+----+
2842            | 1  | 1  | 10 |
2843            | 3  | 3  | 30 |
2844            | 5  | 5  | 50 |
2845            | 7  | 7  | 70 |
2846            +----+----+----+
2847                "#);
2848        }
2849        Ok(())
2850    }
2851
2852    #[apply(batch_sizes)]
2853    #[tokio::test]
2854    async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2855        let task_ctx = prepare_task_ctx(batch_size);
2856        let left = build_semi_anti_left_table();
2857        let right = build_semi_anti_right_table();
2858        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8
2859        let on = vec![(
2860            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2861            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2862        )];
2863
2864        let column_indices = vec![ColumnIndex {
2865            index: 0,
2866            side: JoinSide::Right,
2867        }];
2868        let intermediate_schema =
2869            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2870        let filter_expression = Arc::new(BinaryExpr::new(
2871            Arc::new(Column::new("x", 0)),
2872            Operator::NotEq,
2873            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2874        )) as Arc<dyn PhysicalExpr>;
2875
2876        let filter = JoinFilter::new(
2877            filter_expression,
2878            column_indices.clone(),
2879            Arc::new(intermediate_schema.clone()),
2880        );
2881
2882        let join = join_with_filter(
2883            Arc::clone(&left),
2884            Arc::clone(&right),
2885            on.clone(),
2886            filter,
2887            &JoinType::LeftAnti,
2888            NullEquality::NullEqualsNothing,
2889        )?;
2890
2891        let columns_header = columns(&join.schema());
2892        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2893
2894        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2895        let batches = common::collect(stream).await?;
2896
2897        allow_duplicates! {
2898            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2899            +----+----+-----+
2900            | a1 | b1 | c1  |
2901            +----+----+-----+
2902            | 1  | 1  | 10  |
2903            | 11 | 8  | 110 |
2904            | 3  | 3  | 30  |
2905            | 5  | 5  | 50  |
2906            | 7  | 7  | 70  |
2907            | 9  | 8  | 90  |
2908            +----+----+-----+
2909                "#);
2910        }
2911
2912        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13
2913        let filter_expression = Arc::new(BinaryExpr::new(
2914            Arc::new(Column::new("x", 0)),
2915            Operator::NotEq,
2916            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2917        )) as Arc<dyn PhysicalExpr>;
2918
2919        let filter = JoinFilter::new(
2920            filter_expression,
2921            column_indices,
2922            Arc::new(intermediate_schema),
2923        );
2924
2925        let join = join_with_filter(
2926            left,
2927            right,
2928            on,
2929            filter,
2930            &JoinType::LeftAnti,
2931            NullEquality::NullEqualsNothing,
2932        )?;
2933
2934        let columns_header = columns(&join.schema());
2935        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2936
2937        let stream = join.execute(0, task_ctx)?;
2938        let batches = common::collect(stream).await?;
2939
2940        allow_duplicates! {
2941            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2942            +----+----+-----+
2943            | a1 | b1 | c1  |
2944            +----+----+-----+
2945            | 1  | 1  | 10  |
2946            | 11 | 8  | 110 |
2947            | 3  | 3  | 30  |
2948            | 5  | 5  | 50  |
2949            | 7  | 7  | 70  |
2950            | 9  | 8  | 90  |
2951            +----+----+-----+
2952                "#);
2953        }
2954
2955        Ok(())
2956    }
2957
2958    #[apply(batch_sizes)]
2959    #[tokio::test]
2960    async fn join_right_anti(batch_size: usize) -> Result<()> {
2961        let task_ctx = prepare_task_ctx(batch_size);
2962        let left = build_semi_anti_left_table();
2963        let right = build_semi_anti_right_table();
2964        let on = vec![(
2965            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2966            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2967        )];
2968
2969        let join = join(
2970            left,
2971            right,
2972            on,
2973            &JoinType::RightAnti,
2974            NullEquality::NullEqualsNothing,
2975        )?;
2976
2977        let columns = columns(&join.schema());
2978        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2979
2980        let stream = join.execute(0, task_ctx)?;
2981        let batches = common::collect(stream).await?;
2982
2983        // RightAnti join output is expected to preserve right input order
2984        allow_duplicates! {
2985            assert_snapshot!(batches_to_string(&batches), @r#"
2986            +----+----+-----+
2987            | a2 | b2 | c2  |
2988            +----+----+-----+
2989            | 6  | 6  | 60  |
2990            | 2  | 2  | 80  |
2991            | 4  | 4  | 120 |
2992            +----+----+-----+
2993                "#);
2994        }
2995        Ok(())
2996    }
2997
2998    #[apply(batch_sizes)]
2999    #[tokio::test]
3000    async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
3001        let task_ctx = prepare_task_ctx(batch_size);
3002        let left = build_semi_anti_left_table();
3003        let right = build_semi_anti_right_table();
3004        // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13
3005        let on = vec![(
3006            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3007            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3008        )];
3009
3010        let column_indices = vec![ColumnIndex {
3011            index: 0,
3012            side: JoinSide::Left,
3013        }];
3014        let intermediate_schema =
3015            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3016
3017        let filter_expression = Arc::new(BinaryExpr::new(
3018            Arc::new(Column::new("x", 0)),
3019            Operator::NotEq,
3020            Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3021        )) as Arc<dyn PhysicalExpr>;
3022
3023        let filter = JoinFilter::new(
3024            filter_expression,
3025            column_indices,
3026            Arc::new(intermediate_schema.clone()),
3027        );
3028
3029        let join = join_with_filter(
3030            Arc::clone(&left),
3031            Arc::clone(&right),
3032            on.clone(),
3033            filter,
3034            &JoinType::RightAnti,
3035            NullEquality::NullEqualsNothing,
3036        )?;
3037
3038        let columns_header = columns(&join.schema());
3039        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3040
3041        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3042        let batches = common::collect(stream).await?;
3043
3044        // RightAnti join output is expected to preserve right input order
3045        allow_duplicates! {
3046            assert_snapshot!(batches_to_string(&batches), @r#"
3047            +----+----+-----+
3048            | a2 | b2 | c2  |
3049            +----+----+-----+
3050            | 12 | 10 | 40  |
3051            | 6  | 6  | 60  |
3052            | 2  | 2  | 80  |
3053            | 10 | 10 | 100 |
3054            | 4  | 4  | 120 |
3055            +----+----+-----+
3056                "#);
3057        }
3058
3059        // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
3060        let column_indices = vec![ColumnIndex {
3061            index: 1,
3062            side: JoinSide::Right,
3063        }];
3064        let filter_expression = Arc::new(BinaryExpr::new(
3065            Arc::new(Column::new("x", 0)),
3066            Operator::NotEq,
3067            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3068        )) as Arc<dyn PhysicalExpr>;
3069
3070        let filter = JoinFilter::new(
3071            filter_expression,
3072            column_indices,
3073            Arc::new(intermediate_schema),
3074        );
3075
3076        let join = join_with_filter(
3077            left,
3078            right,
3079            on,
3080            filter,
3081            &JoinType::RightAnti,
3082            NullEquality::NullEqualsNothing,
3083        )?;
3084
3085        let columns_header = columns(&join.schema());
3086        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3087
3088        let stream = join.execute(0, task_ctx)?;
3089        let batches = common::collect(stream).await?;
3090
3091        // RightAnti join output is expected to preserve right input order
3092        allow_duplicates! {
3093            assert_snapshot!(batches_to_string(&batches), @r#"
3094            +----+----+-----+
3095            | a2 | b2 | c2  |
3096            +----+----+-----+
3097            | 8  | 8  | 20  |
3098            | 6  | 6  | 60  |
3099            | 2  | 2  | 80  |
3100            | 4  | 4  | 120 |
3101            +----+----+-----+
3102                "#);
3103        }
3104
3105        Ok(())
3106    }
3107
3108    #[apply(batch_sizes)]
3109    #[tokio::test]
3110    async fn join_right_one(batch_size: usize) -> Result<()> {
3111        let task_ctx = prepare_task_ctx(batch_size);
3112        let left = build_table(
3113            ("a1", &vec![1, 2, 3]),
3114            ("b1", &vec![4, 5, 7]),
3115            ("c1", &vec![7, 8, 9]),
3116        );
3117        let right = build_table(
3118            ("a2", &vec![10, 20, 30]),
3119            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3120            ("c2", &vec![70, 80, 90]),
3121        );
3122        let on = vec![(
3123            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3124            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3125        )];
3126
3127        let (columns, batches, metrics) = join_collect(
3128            left,
3129            right,
3130            on,
3131            &JoinType::Right,
3132            NullEquality::NullEqualsNothing,
3133            task_ctx,
3134        )
3135        .await?;
3136
3137        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3138
3139        allow_duplicates! {
3140            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3141            +----+----+----+----+----+----+
3142            | a1 | b1 | c1 | a2 | b1 | c2 |
3143            +----+----+----+----+----+----+
3144            |    |    |    | 30 | 6  | 90 |
3145            | 1  | 4  | 7  | 10 | 4  | 70 |
3146            | 2  | 5  | 8  | 20 | 5  | 80 |
3147            +----+----+----+----+----+----+
3148                "#);
3149        }
3150
3151        assert_join_metrics!(metrics, 3);
3152
3153        Ok(())
3154    }
3155
3156    #[apply(batch_sizes)]
3157    #[tokio::test]
3158    async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3159        let task_ctx = prepare_task_ctx(batch_size);
3160        let left = build_table(
3161            ("a1", &vec![1, 2, 3]),
3162            ("b1", &vec![4, 5, 7]),
3163            ("c1", &vec![7, 8, 9]),
3164        );
3165        let right = build_table(
3166            ("a2", &vec![10, 20, 30]),
3167            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3168            ("c2", &vec![70, 80, 90]),
3169        );
3170        let on = vec![(
3171            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3172            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3173        )];
3174
3175        let (columns, batches, metrics) = partitioned_join_collect(
3176            left,
3177            right,
3178            on,
3179            &JoinType::Right,
3180            NullEquality::NullEqualsNothing,
3181            task_ctx,
3182        )
3183        .await?;
3184
3185        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3186
3187        allow_duplicates! {
3188            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3189            +----+----+----+----+----+----+
3190            | a1 | b1 | c1 | a2 | b1 | c2 |
3191            +----+----+----+----+----+----+
3192            |    |    |    | 30 | 6  | 90 |
3193            | 1  | 4  | 7  | 10 | 4  | 70 |
3194            | 2  | 5  | 8  | 20 | 5  | 80 |
3195            +----+----+----+----+----+----+
3196                "#);
3197        }
3198
3199        assert_join_metrics!(metrics, 3);
3200
3201        Ok(())
3202    }
3203
3204    #[apply(batch_sizes)]
3205    #[tokio::test]
3206    async fn join_full_one(batch_size: usize) -> Result<()> {
3207        let task_ctx = prepare_task_ctx(batch_size);
3208        let left = build_table(
3209            ("a1", &vec![1, 2, 3]),
3210            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3211            ("c1", &vec![7, 8, 9]),
3212        );
3213        let right = build_table(
3214            ("a2", &vec![10, 20, 30]),
3215            ("b2", &vec![4, 5, 6]),
3216            ("c2", &vec![70, 80, 90]),
3217        );
3218        let on = vec![(
3219            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3220            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3221        )];
3222
3223        let join = join(
3224            left,
3225            right,
3226            on,
3227            &JoinType::Full,
3228            NullEquality::NullEqualsNothing,
3229        )?;
3230
3231        let columns = columns(&join.schema());
3232        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3233
3234        let stream = join.execute(0, task_ctx)?;
3235        let batches = common::collect(stream).await?;
3236
3237        allow_duplicates! {
3238            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3239            +----+----+----+----+----+----+
3240            | a1 | b1 | c1 | a2 | b2 | c2 |
3241            +----+----+----+----+----+----+
3242            |    |    |    | 30 | 6  | 90 |
3243            | 1  | 4  | 7  | 10 | 4  | 70 |
3244            | 2  | 5  | 8  | 20 | 5  | 80 |
3245            | 3  | 7  | 9  |    |    |    |
3246            +----+----+----+----+----+----+
3247                "#);
3248        }
3249
3250        Ok(())
3251    }
3252
3253    #[apply(batch_sizes)]
3254    #[tokio::test]
3255    async fn join_left_mark(batch_size: usize) -> Result<()> {
3256        let task_ctx = prepare_task_ctx(batch_size);
3257        let left = build_table(
3258            ("a1", &vec![1, 2, 3]),
3259            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3260            ("c1", &vec![7, 8, 9]),
3261        );
3262        let right = build_table(
3263            ("a2", &vec![10, 20, 30]),
3264            ("b1", &vec![4, 5, 6]),
3265            ("c2", &vec![70, 80, 90]),
3266        );
3267        let on = vec![(
3268            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3269            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3270        )];
3271
3272        let (columns, batches, metrics) = join_collect(
3273            Arc::clone(&left),
3274            Arc::clone(&right),
3275            on.clone(),
3276            &JoinType::LeftMark,
3277            NullEquality::NullEqualsNothing,
3278            task_ctx,
3279        )
3280        .await?;
3281
3282        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3283
3284        allow_duplicates! {
3285            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3286            +----+----+----+-------+
3287            | a1 | b1 | c1 | mark  |
3288            +----+----+----+-------+
3289            | 1  | 4  | 7  | true  |
3290            | 2  | 5  | 8  | true  |
3291            | 3  | 7  | 9  | false |
3292            +----+----+----+-------+
3293                "#);
3294        }
3295
3296        assert_join_metrics!(metrics, 3);
3297
3298        Ok(())
3299    }
3300
3301    #[apply(batch_sizes)]
3302    #[tokio::test]
3303    async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3304        let task_ctx = prepare_task_ctx(batch_size);
3305        let left = build_table(
3306            ("a1", &vec![1, 2, 3]),
3307            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3308            ("c1", &vec![7, 8, 9]),
3309        );
3310        let right = build_table(
3311            ("a2", &vec![10, 20, 30, 40]),
3312            ("b1", &vec![4, 4, 5, 6]),
3313            ("c2", &vec![60, 70, 80, 90]),
3314        );
3315        let on = vec![(
3316            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3317            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3318        )];
3319
3320        let (columns, batches, metrics) = partitioned_join_collect(
3321            Arc::clone(&left),
3322            Arc::clone(&right),
3323            on.clone(),
3324            &JoinType::LeftMark,
3325            NullEquality::NullEqualsNothing,
3326            task_ctx,
3327        )
3328        .await?;
3329
3330        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3331
3332        allow_duplicates! {
3333            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3334            +----+----+----+-------+
3335            | a1 | b1 | c1 | mark  |
3336            +----+----+----+-------+
3337            | 1  | 4  | 7  | true  |
3338            | 2  | 5  | 8  | true  |
3339            | 3  | 7  | 9  | false |
3340            +----+----+----+-------+
3341                "#);
3342        }
3343
3344        assert_join_metrics!(metrics, 3);
3345
3346        Ok(())
3347    }
3348
3349    #[apply(batch_sizes)]
3350    #[tokio::test]
3351    async fn join_right_mark(batch_size: usize) -> Result<()> {
3352        let task_ctx = prepare_task_ctx(batch_size);
3353        let left = build_table(
3354            ("a1", &vec![1, 2, 3]),
3355            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3356            ("c1", &vec![7, 8, 9]),
3357        );
3358        let right = build_table(
3359            ("a2", &vec![10, 20, 30]),
3360            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3361            ("c2", &vec![70, 80, 90]),
3362        );
3363        let on = vec![(
3364            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3365            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3366        )];
3367
3368        let (columns, batches, metrics) = join_collect(
3369            Arc::clone(&left),
3370            Arc::clone(&right),
3371            on.clone(),
3372            &JoinType::RightMark,
3373            NullEquality::NullEqualsNothing,
3374            task_ctx,
3375        )
3376        .await?;
3377
3378        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3379
3380        let expected = [
3381            "+----+----+----+-------+",
3382            "| a2 | b1 | c2 | mark  |",
3383            "+----+----+----+-------+",
3384            "| 10 | 4  | 70 | true  |",
3385            "| 20 | 5  | 80 | true  |",
3386            "| 30 | 6  | 90 | false |",
3387            "+----+----+----+-------+",
3388        ];
3389        assert_batches_sorted_eq!(expected, &batches);
3390
3391        assert_join_metrics!(metrics, 3);
3392
3393        Ok(())
3394    }
3395
3396    #[apply(batch_sizes)]
3397    #[tokio::test]
3398    async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3399        let task_ctx = prepare_task_ctx(batch_size);
3400        let left = build_table(
3401            ("a1", &vec![1, 2, 3]),
3402            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3403            ("c1", &vec![7, 8, 9]),
3404        );
3405        let right = build_table(
3406            ("a2", &vec![10, 20, 30, 40]),
3407            ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left
3408            ("c2", &vec![60, 70, 80, 90]),
3409        );
3410        let on = vec![(
3411            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3412            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3413        )];
3414
3415        let (columns, batches, metrics) = partitioned_join_collect(
3416            Arc::clone(&left),
3417            Arc::clone(&right),
3418            on.clone(),
3419            &JoinType::RightMark,
3420            NullEquality::NullEqualsNothing,
3421            task_ctx,
3422        )
3423        .await?;
3424
3425        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3426
3427        let expected = [
3428            "+----+----+----+-------+",
3429            "| a2 | b1 | c2 | mark  |",
3430            "+----+----+----+-------+",
3431            "| 10 | 4  | 60 | true  |",
3432            "| 20 | 4  | 70 | true  |",
3433            "| 30 | 5  | 80 | true  |",
3434            "| 40 | 6  | 90 | false |",
3435            "+----+----+----+-------+",
3436        ];
3437        assert_batches_sorted_eq!(expected, &batches);
3438
3439        assert_join_metrics!(metrics, 4);
3440
3441        Ok(())
3442    }
3443
3444    #[test]
3445    fn join_with_hash_collisions_64() -> Result<()> {
3446        let mut hashmap_left = HashTable::with_capacity(4);
3447        let left = build_table_i32(
3448            ("a", &vec![10, 20]),
3449            ("x", &vec![100, 200]),
3450            ("y", &vec![200, 300]),
3451        );
3452
3453        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3454        let hashes_buff = &mut vec![0; left.num_rows()];
3455        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3456
3457        // Maps both values to both indices (1 and 2, representing input 0 and 1)
3458        // 0 -> (0, 1)
3459        // 1 -> (0, 2)
3460        // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
3461        hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3462        hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3463
3464        hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3465        hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3466
3467        let next = vec![2, 0];
3468
3469        let right = build_table_i32(
3470            ("a", &vec![10, 20]),
3471            ("b", &vec![0, 0]),
3472            ("c", &vec![30, 40]),
3473        );
3474
3475        // Join key column for both join sides
3476        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3477
3478        let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3479
3480        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3481        let right_keys_values =
3482            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3483        let mut hashes_buffer = vec![0; right.num_rows()];
3484        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3485
3486        let (l, r, _) = lookup_join_hashmap(
3487            &join_hash_map,
3488            &[left_keys_values],
3489            &[right_keys_values],
3490            NullEquality::NullEqualsNothing,
3491            &hashes_buffer,
3492            8192,
3493            (0, None),
3494        )?;
3495
3496        let left_ids: UInt64Array = vec![0, 1].into();
3497
3498        let right_ids: UInt32Array = vec![0, 1].into();
3499
3500        assert_eq!(left_ids, l);
3501
3502        assert_eq!(right_ids, r);
3503
3504        Ok(())
3505    }
3506
3507    #[test]
3508    fn join_with_hash_collisions_u32() -> Result<()> {
3509        let mut hashmap_left = HashTable::with_capacity(4);
3510        let left = build_table_i32(
3511            ("a", &vec![10, 20]),
3512            ("x", &vec![100, 200]),
3513            ("y", &vec![200, 300]),
3514        );
3515
3516        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3517        let hashes_buff = &mut vec![0; left.num_rows()];
3518        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3519
3520        hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3521        hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3522        hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3523        hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3524
3525        let next: Vec<u32> = vec![2, 0];
3526
3527        let right = build_table_i32(
3528            ("a", &vec![10, 20]),
3529            ("b", &vec![0, 0]),
3530            ("c", &vec![30, 40]),
3531        );
3532
3533        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3534
3535        let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3536
3537        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3538        let right_keys_values =
3539            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3540        let mut hashes_buffer = vec![0; right.num_rows()];
3541        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3542
3543        let (l, r, _) = lookup_join_hashmap(
3544            &join_hash_map,
3545            &[left_keys_values],
3546            &[right_keys_values],
3547            NullEquality::NullEqualsNothing,
3548            &hashes_buffer,
3549            8192,
3550            (0, None),
3551        )?;
3552
3553        // We still expect to match rows 0 and 1 on both sides
3554        let left_ids: UInt64Array = vec![0, 1].into();
3555        let right_ids: UInt32Array = vec![0, 1].into();
3556
3557        assert_eq!(left_ids, l);
3558        assert_eq!(right_ids, r);
3559
3560        Ok(())
3561    }
3562
3563    #[tokio::test]
3564    async fn join_with_duplicated_column_names() -> Result<()> {
3565        let task_ctx = Arc::new(TaskContext::default());
3566        let left = build_table(
3567            ("a", &vec![1, 2, 3]),
3568            ("b", &vec![4, 5, 7]),
3569            ("c", &vec![7, 8, 9]),
3570        );
3571        let right = build_table(
3572            ("a", &vec![10, 20, 30]),
3573            ("b", &vec![1, 2, 7]),
3574            ("c", &vec![70, 80, 90]),
3575        );
3576        let on = vec![(
3577            // join on a=b so there are duplicate column names on unjoined columns
3578            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3579            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3580        )];
3581
3582        let join = join(
3583            left,
3584            right,
3585            on,
3586            &JoinType::Inner,
3587            NullEquality::NullEqualsNothing,
3588        )?;
3589
3590        let columns = columns(&join.schema());
3591        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3592
3593        let stream = join.execute(0, task_ctx)?;
3594        let batches = common::collect(stream).await?;
3595
3596        allow_duplicates! {
3597            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3598            +---+---+---+----+---+----+
3599            | a | b | c | a  | b | c  |
3600            +---+---+---+----+---+----+
3601            | 1 | 4 | 7 | 10 | 1 | 70 |
3602            | 2 | 5 | 8 | 20 | 2 | 80 |
3603            +---+---+---+----+---+----+
3604                "#);
3605        }
3606
3607        Ok(())
3608    }
3609
3610    fn prepare_join_filter() -> JoinFilter {
3611        let column_indices = vec![
3612            ColumnIndex {
3613                index: 2,
3614                side: JoinSide::Left,
3615            },
3616            ColumnIndex {
3617                index: 2,
3618                side: JoinSide::Right,
3619            },
3620        ];
3621        let intermediate_schema = Schema::new(vec![
3622            Field::new("c", DataType::Int32, true),
3623            Field::new("c", DataType::Int32, true),
3624        ]);
3625        let filter_expression = Arc::new(BinaryExpr::new(
3626            Arc::new(Column::new("c", 0)),
3627            Operator::Gt,
3628            Arc::new(Column::new("c", 1)),
3629        )) as Arc<dyn PhysicalExpr>;
3630
3631        JoinFilter::new(
3632            filter_expression,
3633            column_indices,
3634            Arc::new(intermediate_schema),
3635        )
3636    }
3637
3638    #[apply(batch_sizes)]
3639    #[tokio::test]
3640    async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3641        let task_ctx = prepare_task_ctx(batch_size);
3642        let left = build_table(
3643            ("a", &vec![0, 1, 2, 2]),
3644            ("b", &vec![4, 5, 7, 8]),
3645            ("c", &vec![7, 8, 9, 1]),
3646        );
3647        let right = build_table(
3648            ("a", &vec![10, 20, 30, 40]),
3649            ("b", &vec![2, 2, 3, 4]),
3650            ("c", &vec![7, 5, 6, 4]),
3651        );
3652        let on = vec![(
3653            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3654            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3655        )];
3656        let filter = prepare_join_filter();
3657
3658        let join = join_with_filter(
3659            left,
3660            right,
3661            on,
3662            filter,
3663            &JoinType::Inner,
3664            NullEquality::NullEqualsNothing,
3665        )?;
3666
3667        let columns = columns(&join.schema());
3668        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3669
3670        let stream = join.execute(0, task_ctx)?;
3671        let batches = common::collect(stream).await?;
3672
3673        allow_duplicates! {
3674            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3675            +---+---+---+----+---+---+
3676            | a | b | c | a  | b | c |
3677            +---+---+---+----+---+---+
3678            | 2 | 7 | 9 | 10 | 2 | 7 |
3679            | 2 | 7 | 9 | 20 | 2 | 5 |
3680            +---+---+---+----+---+---+
3681                "#);
3682        }
3683
3684        Ok(())
3685    }
3686
3687    #[apply(batch_sizes)]
3688    #[tokio::test]
3689    async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3690        let task_ctx = prepare_task_ctx(batch_size);
3691        let left = build_table(
3692            ("a", &vec![0, 1, 2, 2]),
3693            ("b", &vec![4, 5, 7, 8]),
3694            ("c", &vec![7, 8, 9, 1]),
3695        );
3696        let right = build_table(
3697            ("a", &vec![10, 20, 30, 40]),
3698            ("b", &vec![2, 2, 3, 4]),
3699            ("c", &vec![7, 5, 6, 4]),
3700        );
3701        let on = vec![(
3702            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3703            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3704        )];
3705        let filter = prepare_join_filter();
3706
3707        let join = join_with_filter(
3708            left,
3709            right,
3710            on,
3711            filter,
3712            &JoinType::Left,
3713            NullEquality::NullEqualsNothing,
3714        )?;
3715
3716        let columns = columns(&join.schema());
3717        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3718
3719        let stream = join.execute(0, task_ctx)?;
3720        let batches = common::collect(stream).await?;
3721
3722        allow_duplicates! {
3723            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3724            +---+---+---+----+---+---+
3725            | a | b | c | a  | b | c |
3726            +---+---+---+----+---+---+
3727            | 0 | 4 | 7 |    |   |   |
3728            | 1 | 5 | 8 |    |   |   |
3729            | 2 | 7 | 9 | 10 | 2 | 7 |
3730            | 2 | 7 | 9 | 20 | 2 | 5 |
3731            | 2 | 8 | 1 |    |   |   |
3732            +---+---+---+----+---+---+
3733                "#);
3734        }
3735
3736        Ok(())
3737    }
3738
3739    #[apply(batch_sizes)]
3740    #[tokio::test]
3741    async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3742        let task_ctx = prepare_task_ctx(batch_size);
3743        let left = build_table(
3744            ("a", &vec![0, 1, 2, 2]),
3745            ("b", &vec![4, 5, 7, 8]),
3746            ("c", &vec![7, 8, 9, 1]),
3747        );
3748        let right = build_table(
3749            ("a", &vec![10, 20, 30, 40]),
3750            ("b", &vec![2, 2, 3, 4]),
3751            ("c", &vec![7, 5, 6, 4]),
3752        );
3753        let on = vec![(
3754            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3755            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3756        )];
3757        let filter = prepare_join_filter();
3758
3759        let join = join_with_filter(
3760            left,
3761            right,
3762            on,
3763            filter,
3764            &JoinType::Right,
3765            NullEquality::NullEqualsNothing,
3766        )?;
3767
3768        let columns = columns(&join.schema());
3769        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3770
3771        let stream = join.execute(0, task_ctx)?;
3772        let batches = common::collect(stream).await?;
3773
3774        allow_duplicates! {
3775            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3776            +---+---+---+----+---+---+
3777            | a | b | c | a  | b | c |
3778            +---+---+---+----+---+---+
3779            |   |   |   | 30 | 3 | 6 |
3780            |   |   |   | 40 | 4 | 4 |
3781            | 2 | 7 | 9 | 10 | 2 | 7 |
3782            | 2 | 7 | 9 | 20 | 2 | 5 |
3783            +---+---+---+----+---+---+
3784                "#);
3785        }
3786
3787        Ok(())
3788    }
3789
3790    #[apply(batch_sizes)]
3791    #[tokio::test]
3792    async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3793        let task_ctx = prepare_task_ctx(batch_size);
3794        let left = build_table(
3795            ("a", &vec![0, 1, 2, 2]),
3796            ("b", &vec![4, 5, 7, 8]),
3797            ("c", &vec![7, 8, 9, 1]),
3798        );
3799        let right = build_table(
3800            ("a", &vec![10, 20, 30, 40]),
3801            ("b", &vec![2, 2, 3, 4]),
3802            ("c", &vec![7, 5, 6, 4]),
3803        );
3804        let on = vec![(
3805            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3806            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3807        )];
3808        let filter = prepare_join_filter();
3809
3810        let join = join_with_filter(
3811            left,
3812            right,
3813            on,
3814            filter,
3815            &JoinType::Full,
3816            NullEquality::NullEqualsNothing,
3817        )?;
3818
3819        let columns = columns(&join.schema());
3820        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3821
3822        let stream = join.execute(0, task_ctx)?;
3823        let batches = common::collect(stream).await?;
3824
3825        let expected = [
3826            "+---+---+---+----+---+---+",
3827            "| a | b | c | a  | b | c |",
3828            "+---+---+---+----+---+---+",
3829            "|   |   |   | 30 | 3 | 6 |",
3830            "|   |   |   | 40 | 4 | 4 |",
3831            "| 2 | 7 | 9 | 10 | 2 | 7 |",
3832            "| 2 | 7 | 9 | 20 | 2 | 5 |",
3833            "| 0 | 4 | 7 |    |   |   |",
3834            "| 1 | 5 | 8 |    |   |   |",
3835            "| 2 | 8 | 1 |    |   |   |",
3836            "+---+---+---+----+---+---+",
3837        ];
3838        assert_batches_sorted_eq!(expected, &batches);
3839
3840        // THIS MIGRATION HALTED DUE TO ISSUE #15312
3841        //allow_duplicates! {
3842        //    assert_snapshot!(batches_to_sort_string(&batches), @r#"
3843        //    +---+---+---+----+---+---+
3844        //    | a | b | c | a  | b | c |
3845        //    +---+---+---+----+---+---+
3846        //    |   |   |   | 30 | 3 | 6 |
3847        //    |   |   |   | 40 | 4 | 4 |
3848        //    | 2 | 7 | 9 | 10 | 2 | 7 |
3849        //    | 2 | 7 | 9 | 20 | 2 | 5 |
3850        //    | 0 | 4 | 7 |    |   |   |
3851        //    | 1 | 5 | 8 |    |   |   |
3852        //    | 2 | 8 | 1 |    |   |   |
3853        //    +---+---+---+----+---+---+
3854        //        "#)
3855        //}
3856
3857        Ok(())
3858    }
3859
3860    /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft
3861    #[tokio::test]
3862    async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3863        let task_ctx = Arc::new(TaskContext::default());
3864        let left = build_table(
3865            ("a1", &vec![1, 2, 3]),
3866            ("b1", &vec![4, 5, 7]),
3867            ("c1", &vec![7, 8, 9]),
3868        );
3869        let right = build_table(
3870            ("a2", &vec![10, 20, 30]),
3871            ("b2", &vec![4, 5, 6]),
3872            ("c2", &vec![70, 80, 90]),
3873        );
3874        let on = vec![(
3875            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3876            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3877        )];
3878
3879        let expected_inner = vec![
3880            "+----+----+----+----+----+----+",
3881            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3882            "+----+----+----+----+----+----+",
3883            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3884            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3885            "+----+----+----+----+----+----+",
3886        ];
3887        let expected_left = vec![
3888            "+----+----+----+----+----+----+",
3889            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3890            "+----+----+----+----+----+----+",
3891            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3892            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3893            "| 3  | 7  | 9  |    |    |    |",
3894            "+----+----+----+----+----+----+",
3895        ];
3896        let expected_right = vec![
3897            "+----+----+----+----+----+----+",
3898            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3899            "+----+----+----+----+----+----+",
3900            "|    |    |    | 30 | 6  | 90 |",
3901            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3902            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3903            "+----+----+----+----+----+----+",
3904        ];
3905        let expected_full = vec![
3906            "+----+----+----+----+----+----+",
3907            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3908            "+----+----+----+----+----+----+",
3909            "|    |    |    | 30 | 6  | 90 |",
3910            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3911            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3912            "| 3  | 7  | 9  |    |    |    |",
3913            "+----+----+----+----+----+----+",
3914        ];
3915        let expected_left_semi = vec![
3916            "+----+----+----+",
3917            "| a1 | b1 | c1 |",
3918            "+----+----+----+",
3919            "| 1  | 4  | 7  |",
3920            "| 2  | 5  | 8  |",
3921            "+----+----+----+",
3922        ];
3923        let expected_left_anti = vec![
3924            "+----+----+----+",
3925            "| a1 | b1 | c1 |",
3926            "+----+----+----+",
3927            "| 3  | 7  | 9  |",
3928            "+----+----+----+",
3929        ];
3930        let expected_right_semi = vec![
3931            "+----+----+----+",
3932            "| a2 | b2 | c2 |",
3933            "+----+----+----+",
3934            "| 10 | 4  | 70 |",
3935            "| 20 | 5  | 80 |",
3936            "+----+----+----+",
3937        ];
3938        let expected_right_anti = vec![
3939            "+----+----+----+",
3940            "| a2 | b2 | c2 |",
3941            "+----+----+----+",
3942            "| 30 | 6  | 90 |",
3943            "+----+----+----+",
3944        ];
3945        let expected_left_mark = vec![
3946            "+----+----+----+-------+",
3947            "| a1 | b1 | c1 | mark  |",
3948            "+----+----+----+-------+",
3949            "| 1  | 4  | 7  | true  |",
3950            "| 2  | 5  | 8  | true  |",
3951            "| 3  | 7  | 9  | false |",
3952            "+----+----+----+-------+",
3953        ];
3954        let expected_right_mark = vec![
3955            "+----+----+----+-------+",
3956            "| a2 | b2 | c2 | mark  |",
3957            "+----+----+----+-------+",
3958            "| 10 | 4  | 70 | true  |",
3959            "| 20 | 5  | 80 | true  |",
3960            "| 30 | 6  | 90 | false |",
3961            "+----+----+----+-------+",
3962        ];
3963
3964        let test_cases = vec![
3965            (JoinType::Inner, expected_inner),
3966            (JoinType::Left, expected_left),
3967            (JoinType::Right, expected_right),
3968            (JoinType::Full, expected_full),
3969            (JoinType::LeftSemi, expected_left_semi),
3970            (JoinType::LeftAnti, expected_left_anti),
3971            (JoinType::RightSemi, expected_right_semi),
3972            (JoinType::RightAnti, expected_right_anti),
3973            (JoinType::LeftMark, expected_left_mark),
3974            (JoinType::RightMark, expected_right_mark),
3975        ];
3976
3977        for (join_type, expected) in test_cases {
3978            let (_, batches, metrics) = join_collect_with_partition_mode(
3979                Arc::clone(&left),
3980                Arc::clone(&right),
3981                on.clone(),
3982                &join_type,
3983                PartitionMode::CollectLeft,
3984                NullEquality::NullEqualsNothing,
3985                Arc::clone(&task_ctx),
3986            )
3987            .await?;
3988            assert_batches_sorted_eq!(expected, &batches);
3989            assert_join_metrics!(metrics, expected.len() - 4);
3990        }
3991
3992        Ok(())
3993    }
3994
3995    #[tokio::test]
3996    async fn join_date32() -> Result<()> {
3997        let schema = Arc::new(Schema::new(vec![
3998            Field::new("date", DataType::Date32, false),
3999            Field::new("n", DataType::Int32, false),
4000        ]));
4001
4002        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4003        let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4004        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4005        let left =
4006            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4007                .unwrap();
4008        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4009        let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4010        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4011        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4012        let on = vec![(
4013            Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4014            Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4015        )];
4016
4017        let join = join(
4018            left,
4019            right,
4020            on,
4021            &JoinType::Inner,
4022            NullEquality::NullEqualsNothing,
4023        )?;
4024
4025        let task_ctx = Arc::new(TaskContext::default());
4026        let stream = join.execute(0, task_ctx)?;
4027        let batches = common::collect(stream).await?;
4028
4029        allow_duplicates! {
4030            assert_snapshot!(batches_to_sort_string(&batches), @r#"
4031            +------------+---+------------+---+
4032            | date       | n | date       | n |
4033            +------------+---+------------+---+
4034            | 2022-04-26 | 2 | 2022-04-26 | 4 |
4035            | 2022-04-26 | 2 | 2022-04-26 | 5 |
4036            | 2022-04-27 | 3 | 2022-04-27 | 6 |
4037            +------------+---+------------+---+
4038                "#);
4039        }
4040
4041        Ok(())
4042    }
4043
4044    #[tokio::test]
4045    async fn join_with_error_right() {
4046        let left = build_table(
4047            ("a1", &vec![1, 2, 3]),
4048            ("b1", &vec![4, 5, 7]),
4049            ("c1", &vec![7, 8, 9]),
4050        );
4051
4052        // right input stream returns one good batch and then one error.
4053        // The error should be returned.
4054        let err = exec_err!("bad data error");
4055        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4056
4057        let on = vec![(
4058            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4059            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4060        )];
4061        let schema = right.schema();
4062        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4063        let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4064
4065        let join_types = vec![
4066            JoinType::Inner,
4067            JoinType::Left,
4068            JoinType::Right,
4069            JoinType::Full,
4070            JoinType::LeftSemi,
4071            JoinType::LeftAnti,
4072            JoinType::RightSemi,
4073            JoinType::RightAnti,
4074        ];
4075
4076        for join_type in join_types {
4077            let join = join(
4078                Arc::clone(&left),
4079                Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4080                on.clone(),
4081                &join_type,
4082                NullEquality::NullEqualsNothing,
4083            )
4084            .unwrap();
4085            let task_ctx = Arc::new(TaskContext::default());
4086
4087            let stream = join.execute(0, task_ctx).unwrap();
4088
4089            // Expect that an error is returned
4090            let result_string = common::collect(stream).await.unwrap_err().to_string();
4091            assert!(
4092                result_string.contains("bad data error"),
4093                "actual: {result_string}"
4094            );
4095        }
4096    }
4097
4098    #[tokio::test]
4099    async fn join_split_batch() {
4100        let left = build_table(
4101            ("a1", &vec![1, 2, 3, 4]),
4102            ("b1", &vec![1, 1, 1, 1]),
4103            ("c1", &vec![0, 0, 0, 0]),
4104        );
4105        let right = build_table(
4106            ("a2", &vec![10, 20, 30, 40, 50]),
4107            ("b2", &vec![1, 1, 1, 1, 1]),
4108            ("c2", &vec![0, 0, 0, 0, 0]),
4109        );
4110        let on = vec![(
4111            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4112            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4113        )];
4114
4115        let join_types = vec![
4116            JoinType::Inner,
4117            JoinType::Left,
4118            JoinType::Right,
4119            JoinType::Full,
4120            JoinType::RightSemi,
4121            JoinType::RightAnti,
4122            JoinType::LeftSemi,
4123            JoinType::LeftAnti,
4124        ];
4125        let expected_resultset_records = 20;
4126        let common_result = [
4127            "+----+----+----+----+----+----+",
4128            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4129            "+----+----+----+----+----+----+",
4130            "| 1  | 1  | 0  | 10 | 1  | 0  |",
4131            "| 2  | 1  | 0  | 10 | 1  | 0  |",
4132            "| 3  | 1  | 0  | 10 | 1  | 0  |",
4133            "| 4  | 1  | 0  | 10 | 1  | 0  |",
4134            "| 1  | 1  | 0  | 20 | 1  | 0  |",
4135            "| 2  | 1  | 0  | 20 | 1  | 0  |",
4136            "| 3  | 1  | 0  | 20 | 1  | 0  |",
4137            "| 4  | 1  | 0  | 20 | 1  | 0  |",
4138            "| 1  | 1  | 0  | 30 | 1  | 0  |",
4139            "| 2  | 1  | 0  | 30 | 1  | 0  |",
4140            "| 3  | 1  | 0  | 30 | 1  | 0  |",
4141            "| 4  | 1  | 0  | 30 | 1  | 0  |",
4142            "| 1  | 1  | 0  | 40 | 1  | 0  |",
4143            "| 2  | 1  | 0  | 40 | 1  | 0  |",
4144            "| 3  | 1  | 0  | 40 | 1  | 0  |",
4145            "| 4  | 1  | 0  | 40 | 1  | 0  |",
4146            "| 1  | 1  | 0  | 50 | 1  | 0  |",
4147            "| 2  | 1  | 0  | 50 | 1  | 0  |",
4148            "| 3  | 1  | 0  | 50 | 1  | 0  |",
4149            "| 4  | 1  | 0  | 50 | 1  | 0  |",
4150            "+----+----+----+----+----+----+",
4151        ];
4152        let left_batch = [
4153            "+----+----+----+",
4154            "| a1 | b1 | c1 |",
4155            "+----+----+----+",
4156            "| 1  | 1  | 0  |",
4157            "| 2  | 1  | 0  |",
4158            "| 3  | 1  | 0  |",
4159            "| 4  | 1  | 0  |",
4160            "+----+----+----+",
4161        ];
4162        let right_batch = [
4163            "+----+----+----+",
4164            "| a2 | b2 | c2 |",
4165            "+----+----+----+",
4166            "| 10 | 1  | 0  |",
4167            "| 20 | 1  | 0  |",
4168            "| 30 | 1  | 0  |",
4169            "| 40 | 1  | 0  |",
4170            "| 50 | 1  | 0  |",
4171            "+----+----+----+",
4172        ];
4173        let right_empty = [
4174            "+----+----+----+",
4175            "| a2 | b2 | c2 |",
4176            "+----+----+----+",
4177            "+----+----+----+",
4178        ];
4179        let left_empty = [
4180            "+----+----+----+",
4181            "| a1 | b1 | c1 |",
4182            "+----+----+----+",
4183            "+----+----+----+",
4184        ];
4185
4186        // validation of partial join results output for different batch_size setting
4187        for join_type in join_types {
4188            for batch_size in (1..21).rev() {
4189                let task_ctx = prepare_task_ctx(batch_size);
4190
4191                let join = join(
4192                    Arc::clone(&left),
4193                    Arc::clone(&right),
4194                    on.clone(),
4195                    &join_type,
4196                    NullEquality::NullEqualsNothing,
4197                )
4198                .unwrap();
4199
4200                let stream = join.execute(0, task_ctx).unwrap();
4201                let batches = common::collect(stream).await.unwrap();
4202
4203                // For inner/right join expected batch count equals dev_ceil result,
4204                // as there is no need to append non-joined build side data.
4205                // For other join types it'll be div_ceil + 1 -- for additional batch
4206                // containing not visited build side rows (empty in this test case).
4207                let expected_batch_count = match join_type {
4208                    JoinType::Inner
4209                    | JoinType::Right
4210                    | JoinType::RightSemi
4211                    | JoinType::RightAnti => {
4212                        div_ceil(expected_resultset_records, batch_size)
4213                    }
4214                    _ => div_ceil(expected_resultset_records, batch_size) + 1,
4215                };
4216                assert_eq!(
4217                    batches.len(),
4218                    expected_batch_count,
4219                    "expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4220                );
4221
4222                let expected = match join_type {
4223                    JoinType::RightSemi => right_batch.to_vec(),
4224                    JoinType::RightAnti => right_empty.to_vec(),
4225                    JoinType::LeftSemi => left_batch.to_vec(),
4226                    JoinType::LeftAnti => left_empty.to_vec(),
4227                    _ => common_result.to_vec(),
4228                };
4229                assert_batches_eq!(expected, &batches);
4230            }
4231        }
4232    }
4233
4234    #[tokio::test]
4235    async fn single_partition_join_overallocation() -> Result<()> {
4236        let left = build_table(
4237            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4238            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4239            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4240        );
4241        let right = build_table(
4242            ("a2", &vec![10, 11]),
4243            ("b2", &vec![12, 13]),
4244            ("c2", &vec![14, 15]),
4245        );
4246        let on = vec![(
4247            Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4248            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4249        )];
4250
4251        let join_types = vec![
4252            JoinType::Inner,
4253            JoinType::Left,
4254            JoinType::Right,
4255            JoinType::Full,
4256            JoinType::LeftSemi,
4257            JoinType::LeftAnti,
4258            JoinType::RightSemi,
4259            JoinType::RightAnti,
4260            JoinType::LeftMark,
4261            JoinType::RightMark,
4262        ];
4263
4264        for join_type in join_types {
4265            let runtime = RuntimeEnvBuilder::new()
4266                .with_memory_limit(100, 1.0)
4267                .build_arc()?;
4268            let task_ctx = TaskContext::default().with_runtime(runtime);
4269            let task_ctx = Arc::new(task_ctx);
4270
4271            let join = join(
4272                Arc::clone(&left),
4273                Arc::clone(&right),
4274                on.clone(),
4275                &join_type,
4276                NullEquality::NullEqualsNothing,
4277            )?;
4278
4279            let stream = join.execute(0, task_ctx)?;
4280            let err = common::collect(stream).await.unwrap_err();
4281
4282            // Asserting that operator-level reservation attempting to overallocate
4283            assert_contains!(
4284                err.to_string(),
4285                "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n  HashJoinInput"
4286            );
4287
4288            assert_contains!(
4289                err.to_string(),
4290                "Failed to allocate additional 120.0 B for HashJoinInput"
4291            );
4292        }
4293
4294        Ok(())
4295    }
4296
4297    #[tokio::test]
4298    async fn partitioned_join_overallocation() -> Result<()> {
4299        // Prepare partitioned inputs for HashJoinExec
4300        // No need to adjust partitioning, as execution should fail with `Resources exhausted` error
4301        let left_batch = build_table_i32(
4302            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4303            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4304            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4305        );
4306        let left = TestMemoryExec::try_new_exec(
4307            &[vec![left_batch.clone()], vec![left_batch.clone()]],
4308            left_batch.schema(),
4309            None,
4310        )
4311        .unwrap();
4312        let right_batch = build_table_i32(
4313            ("a2", &vec![10, 11]),
4314            ("b2", &vec![12, 13]),
4315            ("c2", &vec![14, 15]),
4316        );
4317        let right = TestMemoryExec::try_new_exec(
4318            &[vec![right_batch.clone()], vec![right_batch.clone()]],
4319            right_batch.schema(),
4320            None,
4321        )
4322        .unwrap();
4323        let on = vec![(
4324            Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4325            Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4326        )];
4327
4328        let join_types = vec![
4329            JoinType::Inner,
4330            JoinType::Left,
4331            JoinType::Right,
4332            JoinType::Full,
4333            JoinType::LeftSemi,
4334            JoinType::LeftAnti,
4335            JoinType::RightSemi,
4336            JoinType::RightAnti,
4337        ];
4338
4339        for join_type in join_types {
4340            let runtime = RuntimeEnvBuilder::new()
4341                .with_memory_limit(100, 1.0)
4342                .build_arc()?;
4343            let session_config = SessionConfig::default().with_batch_size(50);
4344            let task_ctx = TaskContext::default()
4345                .with_session_config(session_config)
4346                .with_runtime(runtime);
4347            let task_ctx = Arc::new(task_ctx);
4348
4349            let join = HashJoinExec::try_new(
4350                Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4351                Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4352                on.clone(),
4353                None,
4354                &join_type,
4355                None,
4356                PartitionMode::Partitioned,
4357                NullEquality::NullEqualsNothing,
4358            )?;
4359
4360            let stream = join.execute(1, task_ctx)?;
4361            let err = common::collect(stream).await.unwrap_err();
4362
4363            // Asserting that stream-level reservation attempting to overallocate
4364            assert_contains!(
4365                err.to_string(),
4366                "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n  HashJoinInput[1]"
4367
4368            );
4369
4370            assert_contains!(
4371                err.to_string(),
4372                "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4373            );
4374        }
4375
4376        Ok(())
4377    }
4378
4379    fn build_table_struct(
4380        struct_name: &str,
4381        field_name_and_values: (&str, &Vec<Option<i32>>),
4382        nulls: Option<NullBuffer>,
4383    ) -> Arc<dyn ExecutionPlan> {
4384        let (field_name, values) = field_name_and_values;
4385        let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4386        let schema = Schema::new(vec![Field::new(
4387            struct_name,
4388            DataType::Struct(inner_fields.clone().into()),
4389            nulls.is_some(),
4390        )]);
4391
4392        let batch = RecordBatch::try_new(
4393            Arc::new(schema),
4394            vec![Arc::new(StructArray::new(
4395                inner_fields.into(),
4396                vec![Arc::new(Int32Array::from(values.clone()))],
4397                nulls,
4398            ))],
4399        )
4400        .unwrap();
4401        let schema_ref = batch.schema();
4402        TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4403    }
4404
4405    #[tokio::test]
4406    async fn join_on_struct() -> Result<()> {
4407        let task_ctx = Arc::new(TaskContext::default());
4408        let left =
4409            build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4410        let right =
4411            build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4412        let on = vec![(
4413            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4414            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4415        )];
4416
4417        let (columns, batches, metrics) = join_collect(
4418            left,
4419            right,
4420            on,
4421            &JoinType::Inner,
4422            NullEquality::NullEqualsNothing,
4423            task_ctx,
4424        )
4425        .await?;
4426
4427        assert_eq!(columns, vec!["n1", "n2"]);
4428
4429        allow_duplicates! {
4430            assert_snapshot!(batches_to_string(&batches), @r#"
4431            +--------+--------+
4432            | n1     | n2     |
4433            +--------+--------+
4434            | {a: }  | {a: }  |
4435            | {a: 1} | {a: 1} |
4436            | {a: 2} | {a: 2} |
4437            +--------+--------+
4438                "#);
4439        }
4440
4441        assert_join_metrics!(metrics, 3);
4442
4443        Ok(())
4444    }
4445
4446    #[tokio::test]
4447    async fn join_on_struct_with_nulls() -> Result<()> {
4448        let task_ctx = Arc::new(TaskContext::default());
4449        let left =
4450            build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4451        let right =
4452            build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4453        let on = vec![(
4454            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4455            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4456        )];
4457
4458        let (_, batches_null_eq, metrics) = join_collect(
4459            Arc::clone(&left),
4460            Arc::clone(&right),
4461            on.clone(),
4462            &JoinType::Inner,
4463            NullEquality::NullEqualsNull,
4464            Arc::clone(&task_ctx),
4465        )
4466        .await?;
4467
4468        allow_duplicates! {
4469            assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#"
4470            +----+----+
4471            | n1 | n2 |
4472            +----+----+
4473            |    |    |
4474            +----+----+
4475                "#);
4476        }
4477
4478        assert_join_metrics!(metrics, 1);
4479
4480        let (_, batches_null_neq, metrics) = join_collect(
4481            left,
4482            right,
4483            on,
4484            &JoinType::Inner,
4485            NullEquality::NullEqualsNothing,
4486            task_ctx,
4487        )
4488        .await?;
4489
4490        assert_join_metrics!(metrics, 0);
4491
4492        let expected_null_neq =
4493            ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4494        assert_batches_eq!(expected_null_neq, &batches_null_neq);
4495
4496        Ok(())
4497    }
4498
4499    /// Returns the column names on the schema
4500    fn columns(schema: &Schema) -> Vec<String> {
4501        schema.fields().iter().map(|f| f.name().clone()).collect()
4502    }
4503}