datafusion_physical_plan/joins/hash_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::execution_plan::{boundedness_from_children, EmissionType};
25use crate::filter_pushdown::{
26    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
27    FilterPushdownPropagation,
28};
29use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
30use crate::joins::hash_join::stream::{
31    BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
32};
33use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
34use crate::joins::utils::{
35    asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection,
36    update_hash, OnceAsync, OnceFut,
37};
38use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
39use crate::projection::{
40    try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
41    ProjectionExec,
42};
43use crate::spill::get_record_batch_memory_size;
44use crate::ExecutionPlanProperties;
45use crate::{
46    common::can_project,
47    joins::utils::{
48        build_join_schema, check_join_is_valid, estimate_join_statistics,
49        need_produce_result_in_final, symmetric_join_output_partitioning,
50        BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
51    },
52    metrics::{ExecutionPlanMetricsSet, MetricsSet},
53    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
54    PlanProperties, SendableRecordBatchStream, Statistics,
55};
56
57use arrow::array::{Array, ArrayRef, BooleanBufferBuilder};
58use arrow::compute::concat_batches;
59use arrow::datatypes::SchemaRef;
60use arrow::record_batch::RecordBatch;
61use arrow::util::bit_util;
62use datafusion_common::config::ConfigOptions;
63use datafusion_common::utils::memory::estimate_memory_size;
64use datafusion_common::{
65    internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
66    ScalarValue,
67};
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_execution::TaskContext;
70use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
71use datafusion_physical_expr::equivalence::{
72    join_equivalence_properties, ProjectionMapping,
73};
74use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
75use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
76
77use ahash::RandomState;
78use datafusion_physical_expr_common::physical_expr::fmt_sql;
79use futures::TryStreamExt;
80use parking_lot::Mutex;
81
82/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
83const HASH_JOIN_SEED: RandomState =
84    RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
85
86/// HashTable and input data for the left (build side) of a join
87pub(super) struct JoinLeftData {
88    /// The hash table with indices into `batch`
89    pub(super) hash_map: Box<dyn JoinHashMapType>,
90    /// The input rows for the build side
91    batch: RecordBatch,
92    /// The build side on expressions values
93    values: Vec<ArrayRef>,
94    /// Shared bitmap builder for visited left indices
95    visited_indices_bitmap: SharedBitmapBuilder,
96    /// Counter of running probe-threads, potentially
97    /// able to update `visited_indices_bitmap`
98    probe_threads_counter: AtomicUsize,
99    /// We need to keep this field to maintain accurate memory accounting, even though we don't directly use it.
100    /// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
101    /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
102    /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
103    _reservation: MemoryReservation,
104    /// Bounds computed from the build side for dynamic filter pushdown
105    pub(super) bounds: Option<Vec<ColumnBounds>>,
106}
107
108impl JoinLeftData {
109    /// Create a new `JoinLeftData` from its parts
110    pub(super) fn new(
111        hash_map: Box<dyn JoinHashMapType>,
112        batch: RecordBatch,
113        values: Vec<ArrayRef>,
114        visited_indices_bitmap: SharedBitmapBuilder,
115        probe_threads_counter: AtomicUsize,
116        reservation: MemoryReservation,
117        bounds: Option<Vec<ColumnBounds>>,
118    ) -> Self {
119        Self {
120            hash_map,
121            batch,
122            values,
123            visited_indices_bitmap,
124            probe_threads_counter,
125            _reservation: reservation,
126            bounds,
127        }
128    }
129
130    /// return a reference to the hash map
131    pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
132        &*self.hash_map
133    }
134
135    /// returns a reference to the build side batch
136    pub(super) fn batch(&self) -> &RecordBatch {
137        &self.batch
138    }
139
140    /// returns a reference to the build side expressions values
141    pub(super) fn values(&self) -> &[ArrayRef] {
142        &self.values
143    }
144
145    /// returns a reference to the visited indices bitmap
146    pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
147        &self.visited_indices_bitmap
148    }
149
150    /// Decrements the counter of running threads, and returns `true`
151    /// if caller is the last running thread
152    pub(super) fn report_probe_completed(&self) -> bool {
153        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
154    }
155}
156
157#[allow(rustdoc::private_intra_doc_links)]
158/// Join execution plan: Evaluates equijoin predicates in parallel on multiple
159/// partitions using a hash table and an optional filter list to apply post
160/// join.
161///
162/// # Join Expressions
163///
164/// This implementation is optimized for evaluating equijoin predicates  (
165/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
166/// in [`Self::on`].
167///
168/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
169/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
170/// after the equijoin predicates.
171///
172/// # "Build Side" vs "Probe Side"
173///
174/// HashJoin takes two inputs, which are referred to as the "build" and the
175/// "probe". The build side is the first child, and the probe side is the second
176/// child.
177///
178/// The two inputs are treated differently and it is VERY important that the
179/// *smaller* input is placed on the build side to minimize the work of creating
180/// the hash table.
181///
182/// ```text
183///          ┌───────────┐
184///          │ HashJoin  │
185///          │           │
186///          └───────────┘
187///              │   │
188///        ┌─────┘   └─────┐
189///        ▼               ▼
190/// ┌────────────┐  ┌─────────────┐
191/// │   Input    │  │    Input    │
192/// │    [0]     │  │     [1]     │
193/// └────────────┘  └─────────────┘
194///
195///  "build side"    "probe side"
196/// ```
197///
198/// Execution proceeds in 2 stages:
199///
200/// 1. the **build phase** creates a hash table from the tuples of the build side,
201///    and single concatenated batch containing data from all fetched record batches.
202///    Resulting hash table stores hashed join-key fields for each row as a key, and
203///    indices of corresponding rows in concatenated batch.
204///
205/// Hash join uses LIFO data structure as a hash table, and in order to retain
206/// original build-side input order while obtaining data during probe phase, hash
207/// table is updated by iterating batch sequence in reverse order -- it allows to
208/// keep rows with smaller indices "on the top" of hash table, and still maintain
209/// correct indexing for concatenated build-side data batch.
210///
211/// Example of build phase for 3 record batches:
212///
213///
214/// ```text
215///
216///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
217///                                                                         ┌───────────────────────────┐
218///                             hashmap.insert(row-hash, row-idx + offset)  │                      idx  │
219///            ┌───────┐                                                    │          ┌───────┐        │
220///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
221///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
222///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
223///            └───────┘                                                    │          └───────┘        │
224///                                                                         │                           │
225///            ┌───────┐                                                    │          ┌───────┐        │
226///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
227///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
228///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
229///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
230///            │ Row 5 │                                                    │          │ Row 5 │    4   │
231///            └───────┘                                                    │          └───────┘        │
232///                                                                         │                           │
233///            ┌───────┐                                                    │          ┌───────┐        │
234///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
235///   Batch 3  │       │           - hashmap.insert(Row 2, idx 6)           │ Batch 1  │       │        │
236///            │ Row 7 │           - hashmap.insert(Row 1, idx 5)           │          │ Row 2 │    6   │
237///            └───────┘                                                    │          └───────┘        │
238///                                                                         │                           │
239///                                                                         └───────────────────────────┘
240///
241/// ```
242///
243/// 2. the **probe phase** where the tuples of the probe side are streamed
244///    through, checking for matches of the join keys in the hash table.
245///
246/// ```text
247///                 ┌────────────────┐          ┌────────────────┐
248///                 │ ┌─────────┐    │          │ ┌─────────┐    │
249///                 │ │  Hash   │    │          │ │  Hash   │    │
250///                 │ │  Table  │    │          │ │  Table  │    │
251///                 │ │(keys are│    │          │ │(keys are│    │
252///                 │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
253///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
254/// *entire* build  │ │         │    │          │ │         │    │  streamed through, and
255///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
256/// into the hash   │      ▲         │          │          ▲     │   contents of the hash
257///     table       │       HashJoin │          │  HashJoin      │          table
258///                 └──────┼─────────┘          └──────────┼─────┘
259///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
260///            │                                                         │
261///
262///            │                                                         │
263///     ┌────────────┐                                            ┌────────────┐
264///     │RecordBatch │                                            │RecordBatch │
265///     └────────────┘                                            └────────────┘
266///     ┌────────────┐                                            ┌────────────┐
267///     │RecordBatch │                                            │RecordBatch │
268///     └────────────┘                                            └────────────┘
269///           ...                                                       ...
270///     ┌────────────┐                                            ┌────────────┐
271///     │RecordBatch │                                            │RecordBatch │
272///     └────────────┘                                            └────────────┘
273///
274///        build side                                                probe side
275///
276/// ```
277///
278/// # Example "Optimal" Plans
279///
280/// The differences in the inputs means that for classic "Star Schema Query",
281/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
282/// one where there is one large table and several smaller "dimension" tables,
283/// joined on `Foreign Key = Primary Key` predicates.
284///
285/// A "Right Deep Tree" looks like this large table as the probe side on the
286/// lowest join:
287///
288/// ```text
289///             ┌───────────┐
290///             │ HashJoin  │
291///             │           │
292///             └───────────┘
293///                 │   │
294///         ┌───────┘   └──────────┐
295///         ▼                      ▼
296/// ┌───────────────┐        ┌───────────┐
297/// │ small table 1 │        │ HashJoin  │
298/// │  "dimension"  │        │           │
299/// └───────────────┘        └───┬───┬───┘
300///                   ┌──────────┘   └───────┐
301///                   │                      │
302///                   ▼                      ▼
303///           ┌───────────────┐        ┌───────────┐
304///           │ small table 2 │        │ HashJoin  │
305///           │  "dimension"  │        │           │
306///           └───────────────┘        └───┬───┬───┘
307///                               ┌────────┘   └────────┐
308///                               │                     │
309///                               ▼                     ▼
310///                       ┌───────────────┐     ┌───────────────┐
311///                       │ small table 3 │     │  large table  │
312///                       │  "dimension"  │     │    "fact"     │
313///                       └───────────────┘     └───────────────┘
314/// ```
315///
316/// # Clone / Shared State
317///
318/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
319/// loading of the left side with the processing in each output stream.
320/// Therefore it can not be [`Clone`]
321pub struct HashJoinExec {
322    /// left (build) side which gets hashed
323    pub left: Arc<dyn ExecutionPlan>,
324    /// right (probe) side which are filtered by the hash table
325    pub right: Arc<dyn ExecutionPlan>,
326    /// Set of equijoin columns from the relations: `(left_col, right_col)`
327    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
328    /// Filters which are applied while finding matching rows
329    pub filter: Option<JoinFilter>,
330    /// How the join is performed (`OUTER`, `INNER`, etc)
331    pub join_type: JoinType,
332    /// The schema after join. Please be careful when using this schema,
333    /// if there is a projection, the schema isn't the same as the output schema.
334    join_schema: SchemaRef,
335    /// Future that consumes left input and builds the hash table
336    ///
337    /// For CollectLeft partition mode, this structure is *shared* across all output streams.
338    ///
339    /// Each output stream waits on the `OnceAsync` to signal the completion of
340    /// the hash table creation.
341    left_fut: Arc<OnceAsync<JoinLeftData>>,
342    /// Shared the `RandomState` for the hashing algorithm
343    random_state: RandomState,
344    /// Partitioning mode to use
345    pub mode: PartitionMode,
346    /// Execution metrics
347    metrics: ExecutionPlanMetricsSet,
348    /// The projection indices of the columns in the output schema of join
349    pub projection: Option<Vec<usize>>,
350    /// Information of index and left / right placement of columns
351    column_indices: Vec<ColumnIndex>,
352    /// The equality null-handling behavior of the join algorithm.
353    pub null_equality: NullEquality,
354    /// Cache holding plan properties like equivalences, output partitioning etc.
355    cache: PlanProperties,
356    /// Dynamic filter for pushing down to the probe side
357    /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
358    /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
359    dynamic_filter: Option<HashJoinExecDynamicFilter>,
360}
361
362#[derive(Clone)]
363struct HashJoinExecDynamicFilter {
364    /// Dynamic filter that we'll update with the results of the build side once that is done.
365    filter: Arc<DynamicFilterPhysicalExpr>,
366    /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
367    /// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
368    bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
369}
370
371impl fmt::Debug for HashJoinExec {
372    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373        f.debug_struct("HashJoinExec")
374            .field("left", &self.left)
375            .field("right", &self.right)
376            .field("on", &self.on)
377            .field("filter", &self.filter)
378            .field("join_type", &self.join_type)
379            .field("join_schema", &self.join_schema)
380            .field("left_fut", &self.left_fut)
381            .field("random_state", &self.random_state)
382            .field("mode", &self.mode)
383            .field("metrics", &self.metrics)
384            .field("projection", &self.projection)
385            .field("column_indices", &self.column_indices)
386            .field("null_equality", &self.null_equality)
387            .field("cache", &self.cache)
388            // Explicitly exclude dynamic_filter to avoid runtime state differences in tests
389            .finish()
390    }
391}
392
393impl EmbeddedProjection for HashJoinExec {
394    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
395        self.with_projection(projection)
396    }
397}
398
399impl HashJoinExec {
400    /// Tries to create a new [HashJoinExec].
401    ///
402    /// # Error
403    /// This function errors when it is not possible to join the left and right sides on keys `on`.
404    #[allow(clippy::too_many_arguments)]
405    pub fn try_new(
406        left: Arc<dyn ExecutionPlan>,
407        right: Arc<dyn ExecutionPlan>,
408        on: JoinOn,
409        filter: Option<JoinFilter>,
410        join_type: &JoinType,
411        projection: Option<Vec<usize>>,
412        partition_mode: PartitionMode,
413        null_equality: NullEquality,
414    ) -> Result<Self> {
415        let left_schema = left.schema();
416        let right_schema = right.schema();
417        if on.is_empty() {
418            return plan_err!("On constraints in HashJoinExec should be non-empty");
419        }
420
421        check_join_is_valid(&left_schema, &right_schema, &on)?;
422
423        let (join_schema, column_indices) =
424            build_join_schema(&left_schema, &right_schema, join_type);
425
426        let random_state = HASH_JOIN_SEED;
427
428        let join_schema = Arc::new(join_schema);
429
430        //  check if the projection is valid
431        can_project(&join_schema, projection.as_ref())?;
432
433        let cache = Self::compute_properties(
434            &left,
435            &right,
436            Arc::clone(&join_schema),
437            *join_type,
438            &on,
439            partition_mode,
440            projection.as_ref(),
441        )?;
442
443        // Initialize both dynamic filter and bounds accumulator to None
444        // They will be set later if dynamic filtering is enabled
445
446        Ok(HashJoinExec {
447            left,
448            right,
449            on,
450            filter,
451            join_type: *join_type,
452            join_schema,
453            left_fut: Default::default(),
454            random_state,
455            mode: partition_mode,
456            metrics: ExecutionPlanMetricsSet::new(),
457            projection,
458            column_indices,
459            null_equality,
460            cache,
461            dynamic_filter: None,
462        })
463    }
464
465    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
466        // Extract the right-side keys (probe side keys) from the `on` clauses
467        // Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
468        let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
469        // Initialize with a placeholder expression (true) that will be updated when the hash table is built
470        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
471    }
472
473    /// left (build) side which gets hashed
474    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
475        &self.left
476    }
477
478    /// right (probe) side which are filtered by the hash table
479    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
480        &self.right
481    }
482
483    /// Set of common columns used to join on
484    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
485        &self.on
486    }
487
488    /// Filters applied before join output
489    pub fn filter(&self) -> Option<&JoinFilter> {
490        self.filter.as_ref()
491    }
492
493    /// How the join is performed
494    pub fn join_type(&self) -> &JoinType {
495        &self.join_type
496    }
497
498    /// The schema after join. Please be careful when using this schema,
499    /// if there is a projection, the schema isn't the same as the output schema.
500    pub fn join_schema(&self) -> &SchemaRef {
501        &self.join_schema
502    }
503
504    /// The partitioning mode of this hash join
505    pub fn partition_mode(&self) -> &PartitionMode {
506        &self.mode
507    }
508
509    /// Get null_equality
510    pub fn null_equality(&self) -> NullEquality {
511        self.null_equality
512    }
513
514    /// Calculate order preservation flags for this hash join.
515    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
516        vec![
517            false,
518            matches!(
519                join_type,
520                JoinType::Inner
521                    | JoinType::Right
522                    | JoinType::RightAnti
523                    | JoinType::RightSemi
524                    | JoinType::RightMark
525            ),
526        ]
527    }
528
529    /// Get probe side information for the hash join.
530    pub fn probe_side() -> JoinSide {
531        // In current implementation right side is always probe side.
532        JoinSide::Right
533    }
534
535    /// Return whether the join contains a projection
536    pub fn contains_projection(&self) -> bool {
537        self.projection.is_some()
538    }
539
540    /// Return new instance of [HashJoinExec] with the given projection.
541    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
542        //  check if the projection is valid
543        can_project(&self.schema(), projection.as_ref())?;
544        let projection = match projection {
545            Some(projection) => match &self.projection {
546                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
547                None => Some(projection),
548            },
549            None => None,
550        };
551        Self::try_new(
552            Arc::clone(&self.left),
553            Arc::clone(&self.right),
554            self.on.clone(),
555            self.filter.clone(),
556            &self.join_type,
557            projection,
558            self.mode,
559            self.null_equality,
560        )
561    }
562
563    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
564    fn compute_properties(
565        left: &Arc<dyn ExecutionPlan>,
566        right: &Arc<dyn ExecutionPlan>,
567        schema: SchemaRef,
568        join_type: JoinType,
569        on: JoinOnRef,
570        mode: PartitionMode,
571        projection: Option<&Vec<usize>>,
572    ) -> Result<PlanProperties> {
573        // Calculate equivalence properties:
574        let mut eq_properties = join_equivalence_properties(
575            left.equivalence_properties().clone(),
576            right.equivalence_properties().clone(),
577            &join_type,
578            Arc::clone(&schema),
579            &Self::maintains_input_order(join_type),
580            Some(Self::probe_side()),
581            on,
582        )?;
583
584        let mut output_partitioning = match mode {
585            PartitionMode::CollectLeft => {
586                asymmetric_join_output_partitioning(left, right, &join_type)?
587            }
588            PartitionMode::Auto => Partitioning::UnknownPartitioning(
589                right.output_partitioning().partition_count(),
590            ),
591            PartitionMode::Partitioned => {
592                symmetric_join_output_partitioning(left, right, &join_type)?
593            }
594        };
595
596        let emission_type = if left.boundedness().is_unbounded() {
597            EmissionType::Final
598        } else if right.pipeline_behavior() == EmissionType::Incremental {
599            match join_type {
600                // If we only need to generate matched rows from the probe side,
601                // we can emit rows incrementally.
602                JoinType::Inner
603                | JoinType::LeftSemi
604                | JoinType::RightSemi
605                | JoinType::Right
606                | JoinType::RightAnti
607                | JoinType::RightMark => EmissionType::Incremental,
608                // If we need to generate unmatched rows from the *build side*,
609                // we need to emit them at the end.
610                JoinType::Left
611                | JoinType::LeftAnti
612                | JoinType::LeftMark
613                | JoinType::Full => EmissionType::Both,
614            }
615        } else {
616            right.pipeline_behavior()
617        };
618
619        // If contains projection, update the PlanProperties.
620        if let Some(projection) = projection {
621            // construct a map from the input expressions to the output expression of the Projection
622            let projection_mapping =
623                ProjectionMapping::from_indices(projection, &schema)?;
624            let out_schema = project_schema(&schema, Some(projection))?;
625            output_partitioning =
626                output_partitioning.project(&projection_mapping, &eq_properties);
627            eq_properties = eq_properties.project(&projection_mapping, out_schema);
628        }
629
630        Ok(PlanProperties::new(
631            eq_properties,
632            output_partitioning,
633            emission_type,
634            boundedness_from_children([left, right]),
635        ))
636    }
637
638    /// Returns a new `ExecutionPlan` that computes the same join as this one,
639    /// with the left and right inputs swapped using the  specified
640    /// `partition_mode`.
641    ///
642    /// # Notes:
643    ///
644    /// This function is public so other downstream projects can use it to
645    /// construct `HashJoinExec` with right side as the build side.
646    ///
647    /// For using this interface directly, please refer to below:
648    ///
649    /// Hash join execution may require specific input partitioning (for example,
650    /// the left child may have a single partition while the right child has multiple).
651    ///
652    /// Calling this function on join nodes whose children have already been repartitioned
653    /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning
654    /// requirements of the hash join. Therefore, ensure you call this function
655    /// before inserting any repartitioning operators on the join's children.
656    ///
657    /// In DataFusion's default SQL interface, this function is used by the `JoinSelection`
658    /// physical optimizer rule to determine a good join order, which is
659    /// executed before the `EnforceDistribution` rule (the rule that may
660    /// insert `RepartitionExec` operators).
661    pub fn swap_inputs(
662        &self,
663        partition_mode: PartitionMode,
664    ) -> Result<Arc<dyn ExecutionPlan>> {
665        let left = self.left();
666        let right = self.right();
667        let new_join = HashJoinExec::try_new(
668            Arc::clone(right),
669            Arc::clone(left),
670            self.on()
671                .iter()
672                .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
673                .collect(),
674            self.filter().map(JoinFilter::swap),
675            &self.join_type().swap(),
676            swap_join_projection(
677                left.schema().fields().len(),
678                right.schema().fields().len(),
679                self.projection.as_ref(),
680                self.join_type(),
681            ),
682            partition_mode,
683            self.null_equality(),
684        )?;
685        // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
686        if matches!(
687            self.join_type(),
688            JoinType::LeftSemi
689                | JoinType::RightSemi
690                | JoinType::LeftAnti
691                | JoinType::RightAnti
692        ) || self.projection.is_some()
693        {
694            Ok(Arc::new(new_join))
695        } else {
696            reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
697        }
698    }
699}
700
701impl DisplayAs for HashJoinExec {
702    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
703        match t {
704            DisplayFormatType::Default | DisplayFormatType::Verbose => {
705                let display_filter = self.filter.as_ref().map_or_else(
706                    || "".to_string(),
707                    |f| format!(", filter={}", f.expression()),
708                );
709                let display_projections = if self.contains_projection() {
710                    format!(
711                        ", projection=[{}]",
712                        self.projection
713                            .as_ref()
714                            .unwrap()
715                            .iter()
716                            .map(|index| format!(
717                                "{}@{}",
718                                self.join_schema.fields().get(*index).unwrap().name(),
719                                index
720                            ))
721                            .collect::<Vec<_>>()
722                            .join(", ")
723                    )
724                } else {
725                    "".to_string()
726                };
727                let on = self
728                    .on
729                    .iter()
730                    .map(|(c1, c2)| format!("({c1}, {c2})"))
731                    .collect::<Vec<String>>()
732                    .join(", ");
733                write!(
734                    f,
735                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}",
736                    self.mode, self.join_type, on, display_filter, display_projections,
737                )
738            }
739            DisplayFormatType::TreeRender => {
740                let on = self
741                    .on
742                    .iter()
743                    .map(|(c1, c2)| {
744                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
745                    })
746                    .collect::<Vec<String>>()
747                    .join(", ");
748
749                if *self.join_type() != JoinType::Inner {
750                    writeln!(f, "join_type={:?}", self.join_type)?;
751                }
752
753                writeln!(f, "on={on}")?;
754
755                if let Some(filter) = self.filter.as_ref() {
756                    writeln!(f, "filter={filter}")?;
757                }
758
759                Ok(())
760            }
761        }
762    }
763}
764
765impl ExecutionPlan for HashJoinExec {
766    fn name(&self) -> &'static str {
767        "HashJoinExec"
768    }
769
770    fn as_any(&self) -> &dyn Any {
771        self
772    }
773
774    fn properties(&self) -> &PlanProperties {
775        &self.cache
776    }
777
778    fn required_input_distribution(&self) -> Vec<Distribution> {
779        match self.mode {
780            PartitionMode::CollectLeft => vec![
781                Distribution::SinglePartition,
782                Distribution::UnspecifiedDistribution,
783            ],
784            PartitionMode::Partitioned => {
785                let (left_expr, right_expr) = self
786                    .on
787                    .iter()
788                    .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
789                    .unzip();
790                vec![
791                    Distribution::HashPartitioned(left_expr),
792                    Distribution::HashPartitioned(right_expr),
793                ]
794            }
795            PartitionMode::Auto => vec![
796                Distribution::UnspecifiedDistribution,
797                Distribution::UnspecifiedDistribution,
798            ],
799        }
800    }
801
802    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by
803    // applying the hash function to convert the join key(s) in each row into a hash value from the
804    // probe side table in the order they're arranged. The hash value is used to look up corresponding
805    // entries in the hash table that was constructed from the build side table during the build phase.
806    //
807    // Because of the immediate generation of result rows once a match is found,
808    // the output of the join tends to follow the order in which the rows were read from
809    // the probe side table. This is simply due to the sequence in which the rows were processed.
810    // Hence, it appears that the hash join is preserving the order of the probe side.
811    //
812    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
813    // the unmatched rows from the probe side are also kept in order.
814    // This is because the **`RightAnti`** join is designed to return rows from the right
815    // (probe side) table that have no match in the left (build side) table. Because the rows
816    // are processed sequentially in the probe phase, and unmatched rows are directly output
817    // as results, these results tend to retain the order of the probe side table.
818    fn maintains_input_order(&self) -> Vec<bool> {
819        Self::maintains_input_order(self.join_type)
820    }
821
822    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
823        vec![&self.left, &self.right]
824    }
825
826    /// Creates a new HashJoinExec with different children while preserving configuration.
827    ///
828    /// This method is called during query optimization when the optimizer creates new
829    /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
830    /// rather than cloning the existing one because partitioning may have changed.
831    fn with_new_children(
832        self: Arc<Self>,
833        children: Vec<Arc<dyn ExecutionPlan>>,
834    ) -> Result<Arc<dyn ExecutionPlan>> {
835        Ok(Arc::new(HashJoinExec {
836            left: Arc::clone(&children[0]),
837            right: Arc::clone(&children[1]),
838            on: self.on.clone(),
839            filter: self.filter.clone(),
840            join_type: self.join_type,
841            join_schema: Arc::clone(&self.join_schema),
842            left_fut: Arc::clone(&self.left_fut),
843            random_state: self.random_state.clone(),
844            mode: self.mode,
845            metrics: ExecutionPlanMetricsSet::new(),
846            projection: self.projection.clone(),
847            column_indices: self.column_indices.clone(),
848            null_equality: self.null_equality,
849            cache: Self::compute_properties(
850                &children[0],
851                &children[1],
852                Arc::clone(&self.join_schema),
853                self.join_type,
854                &self.on,
855                self.mode,
856                self.projection.as_ref(),
857            )?,
858            // Keep the dynamic filter, bounds accumulator will be reset
859            dynamic_filter: self.dynamic_filter.clone(),
860        }))
861    }
862
863    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
864        Ok(Arc::new(HashJoinExec {
865            left: Arc::clone(&self.left),
866            right: Arc::clone(&self.right),
867            on: self.on.clone(),
868            filter: self.filter.clone(),
869            join_type: self.join_type,
870            join_schema: Arc::clone(&self.join_schema),
871            // Reset the left_fut to allow re-execution
872            left_fut: Arc::new(OnceAsync::default()),
873            random_state: self.random_state.clone(),
874            mode: self.mode,
875            metrics: ExecutionPlanMetricsSet::new(),
876            projection: self.projection.clone(),
877            column_indices: self.column_indices.clone(),
878            null_equality: self.null_equality,
879            cache: self.cache.clone(),
880            // Reset dynamic filter and bounds accumulator to initial state
881            dynamic_filter: None,
882        }))
883    }
884
885    fn execute(
886        &self,
887        partition: usize,
888        context: Arc<TaskContext>,
889    ) -> Result<SendableRecordBatchStream> {
890        let on_left = self
891            .on
892            .iter()
893            .map(|on| Arc::clone(&on.0))
894            .collect::<Vec<_>>();
895        let left_partitions = self.left.output_partitioning().partition_count();
896        let right_partitions = self.right.output_partitioning().partition_count();
897
898        if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions
899        {
900            return internal_err!(
901                "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
902                 consider using RepartitionExec"
903            );
904        }
905
906        if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
907            return internal_err!(
908                "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
909                 consider using CoalescePartitionsExec or the EnforceDistribution rule"
910            );
911        }
912
913        let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
914
915        let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
916        let left_fut = match self.mode {
917            PartitionMode::CollectLeft => self.left_fut.try_once(|| {
918                let left_stream = self.left.execute(0, Arc::clone(&context))?;
919
920                let reservation =
921                    MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
922
923                Ok(collect_left_input(
924                    self.random_state.clone(),
925                    left_stream,
926                    on_left.clone(),
927                    join_metrics.clone(),
928                    reservation,
929                    need_produce_result_in_final(self.join_type),
930                    self.right().output_partitioning().partition_count(),
931                    enable_dynamic_filter_pushdown,
932                ))
933            })?,
934            PartitionMode::Partitioned => {
935                let left_stream = self.left.execute(partition, Arc::clone(&context))?;
936
937                let reservation =
938                    MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
939                        .register(context.memory_pool());
940
941                OnceFut::new(collect_left_input(
942                    self.random_state.clone(),
943                    left_stream,
944                    on_left.clone(),
945                    join_metrics.clone(),
946                    reservation,
947                    need_produce_result_in_final(self.join_type),
948                    1,
949                    enable_dynamic_filter_pushdown,
950                ))
951            }
952            PartitionMode::Auto => {
953                return plan_err!(
954                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
955                    PartitionMode::Auto
956                );
957            }
958        };
959
960        let batch_size = context.session_config().batch_size();
961
962        // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
963        let bounds_accumulator = enable_dynamic_filter_pushdown
964            .then(|| {
965                self.dynamic_filter.as_ref().map(|df| {
966                    let filter = Arc::clone(&df.filter);
967                    let on_right = self
968                        .on
969                        .iter()
970                        .map(|(_, right_expr)| Arc::clone(right_expr))
971                        .collect::<Vec<_>>();
972                    Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
973                        Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
974                            self.mode,
975                            self.left.as_ref(),
976                            self.right.as_ref(),
977                            filter,
978                            on_right,
979                        ))
980                    })))
981                })
982            })
983            .flatten()
984            .flatten();
985
986        // we have the batches and the hash map with their keys. We can how create a stream
987        // over the right that uses this information to issue new batches.
988        let right_stream = self.right.execute(partition, context)?;
989
990        // update column indices to reflect the projection
991        let column_indices_after_projection = match &self.projection {
992            Some(projection) => projection
993                .iter()
994                .map(|i| self.column_indices[*i].clone())
995                .collect(),
996            None => self.column_indices.clone(),
997        };
998
999        let on_right = self
1000            .on
1001            .iter()
1002            .map(|(_, right_expr)| Arc::clone(right_expr))
1003            .collect::<Vec<_>>();
1004
1005        Ok(Box::pin(HashJoinStream::new(
1006            partition,
1007            self.schema(),
1008            on_right,
1009            self.filter.clone(),
1010            self.join_type,
1011            right_stream,
1012            self.random_state.clone(),
1013            join_metrics,
1014            column_indices_after_projection,
1015            self.null_equality,
1016            HashJoinStreamState::WaitBuildSide,
1017            BuildSide::Initial(BuildSideInitialState { left_fut }),
1018            batch_size,
1019            vec![],
1020            self.right.output_ordering().is_some(),
1021            bounds_accumulator,
1022        )))
1023    }
1024
1025    fn metrics(&self) -> Option<MetricsSet> {
1026        Some(self.metrics.clone_inner())
1027    }
1028
1029    fn statistics(&self) -> Result<Statistics> {
1030        self.partition_statistics(None)
1031    }
1032
1033    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1034        if partition.is_some() {
1035            return Ok(Statistics::new_unknown(&self.schema()));
1036        }
1037        // TODO stats: it is not possible in general to know the output size of joins
1038        // There are some special cases though, for example:
1039        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1040        let stats = estimate_join_statistics(
1041            self.left.partition_statistics(None)?,
1042            self.right.partition_statistics(None)?,
1043            self.on.clone(),
1044            &self.join_type,
1045            &self.join_schema,
1046        )?;
1047        // Project statistics if there is a projection
1048        Ok(stats.project(self.projection.as_ref()))
1049    }
1050
1051    /// Tries to push `projection` down through `hash_join`. If possible, performs the
1052    /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
1053    /// as its children. Otherwise, returns `None`.
1054    fn try_swapping_with_projection(
1055        &self,
1056        projection: &ProjectionExec,
1057    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1058        // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
1059        if self.contains_projection() {
1060            return Ok(None);
1061        }
1062
1063        if let Some(JoinData {
1064            projected_left_child,
1065            projected_right_child,
1066            join_filter,
1067            join_on,
1068        }) = try_pushdown_through_join(
1069            projection,
1070            self.left(),
1071            self.right(),
1072            self.on(),
1073            self.schema(),
1074            self.filter(),
1075        )? {
1076            Ok(Some(Arc::new(HashJoinExec::try_new(
1077                Arc::new(projected_left_child),
1078                Arc::new(projected_right_child),
1079                join_on,
1080                join_filter,
1081                self.join_type(),
1082                // Returned early if projection is not None
1083                None,
1084                *self.partition_mode(),
1085                self.null_equality,
1086            )?)))
1087        } else {
1088            try_embed_projection(projection, self)
1089        }
1090    }
1091
1092    fn gather_filters_for_pushdown(
1093        &self,
1094        phase: FilterPushdownPhase,
1095        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1096        config: &ConfigOptions,
1097    ) -> Result<FilterDescription> {
1098        // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1099        // For now we don't support them.
1100        // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1101        // See https://github.com/apache/datafusion/issues/16973 for tracking.
1102        if self.join_type != JoinType::Inner {
1103            return Ok(FilterDescription::all_unsupported(
1104                &parent_filters,
1105                &self.children(),
1106            ));
1107        }
1108
1109        // Get basic filter descriptions for both children
1110        let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1111            &parent_filters,
1112            self.left(),
1113        )?;
1114        let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1115            &parent_filters,
1116            self.right(),
1117        )?;
1118
1119        // Add dynamic filters in Post phase if enabled
1120        if matches!(phase, FilterPushdownPhase::Post)
1121            && config.optimizer.enable_dynamic_filter_pushdown
1122        {
1123            // Add actual dynamic filter to right side (probe side)
1124            let dynamic_filter = Self::create_dynamic_filter(&self.on);
1125            right_child = right_child.with_self_filter(dynamic_filter);
1126        }
1127
1128        Ok(FilterDescription::new()
1129            .with_child(left_child)
1130            .with_child(right_child))
1131    }
1132
1133    fn handle_child_pushdown_result(
1134        &self,
1135        _phase: FilterPushdownPhase,
1136        child_pushdown_result: ChildPushdownResult,
1137        _config: &ConfigOptions,
1138    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1139        // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for
1140        // non-inner joins in `gather_filters_for_pushdown`.
1141        // However it's a cheap check and serves to inform future devs touching this function that they need to be really
1142        // careful pushing down filters through non-inner joins.
1143        if self.join_type != JoinType::Inner {
1144            // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1145            // For now we don't support them.
1146            // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1147            return Ok(FilterPushdownPropagation::all_unsupported(
1148                child_pushdown_result,
1149            ));
1150        }
1151
1152        let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1153        assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1154        let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1155                                                                               // We expect 0 or 1 self filters
1156        if let Some(filter) = right_child_self_filters.first() {
1157            // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1158            // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1159            let predicate = Arc::clone(&filter.predicate);
1160            if let Ok(dynamic_filter) =
1161                Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1162            {
1163                // We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1164                let new_node = Arc::new(HashJoinExec {
1165                    left: Arc::clone(&self.left),
1166                    right: Arc::clone(&self.right),
1167                    on: self.on.clone(),
1168                    filter: self.filter.clone(),
1169                    join_type: self.join_type,
1170                    join_schema: Arc::clone(&self.join_schema),
1171                    left_fut: Arc::clone(&self.left_fut),
1172                    random_state: self.random_state.clone(),
1173                    mode: self.mode,
1174                    metrics: ExecutionPlanMetricsSet::new(),
1175                    projection: self.projection.clone(),
1176                    column_indices: self.column_indices.clone(),
1177                    null_equality: self.null_equality,
1178                    cache: self.cache.clone(),
1179                    dynamic_filter: Some(HashJoinExecDynamicFilter {
1180                        filter: dynamic_filter,
1181                        bounds_accumulator: OnceLock::new(),
1182                    }),
1183                });
1184                result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1185            }
1186        }
1187        Ok(result)
1188    }
1189}
1190
1191/// Compute min/max bounds for each column in the given arrays
1192fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
1193    arrays
1194        .iter()
1195        .map(|array| {
1196            if array.is_empty() {
1197                // Return NULL values for empty arrays
1198                return Ok(ColumnBounds::new(
1199                    ScalarValue::try_from(array.data_type())?,
1200                    ScalarValue::try_from(array.data_type())?,
1201                ));
1202            }
1203
1204            // Use Arrow kernels for efficient min/max computation
1205            let min_val = min_batch(array)?;
1206            let max_val = max_batch(array)?;
1207
1208            Ok(ColumnBounds::new(min_val, max_val))
1209        })
1210        .collect()
1211}
1212
1213#[expect(clippy::too_many_arguments)]
1214/// Collects all batches from the left (build) side stream and creates a hash map for joining.
1215///
1216/// This function is responsible for:
1217/// 1. Consuming the entire left stream and collecting all batches into memory
1218/// 2. Building a hash map from the join key columns for efficient probe operations
1219/// 3. Computing bounds for dynamic filter pushdown (if enabled)
1220/// 4. Preparing visited indices bitmap for certain join types
1221///
1222/// # Parameters
1223/// * `random_state` - Random state for consistent hashing across partitions
1224/// * `left_stream` - Stream of record batches from the build side
1225/// * `on_left` - Physical expressions for the left side join keys
1226/// * `metrics` - Metrics collector for tracking memory usage and row counts
1227/// * `reservation` - Memory reservation tracker for the hash table and data
1228/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
1229/// * `probe_threads_count` - Number of threads that will probe this hash table
1230/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering
1231///
1232/// # Dynamic Filter Coordination
1233/// When `should_compute_bounds` is true, this function computes the min/max bounds
1234/// for each join key column but does NOT update the dynamic filter. Instead, the
1235/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1236/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
1237/// before updating the filter exactly once.
1238///
1239/// # Returns
1240/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
1241/// visited indices bitmap, and computed bounds (if requested).
1242async fn collect_left_input(
1243    random_state: RandomState,
1244    left_stream: SendableRecordBatchStream,
1245    on_left: Vec<PhysicalExprRef>,
1246    metrics: BuildProbeJoinMetrics,
1247    reservation: MemoryReservation,
1248    with_visited_indices_bitmap: bool,
1249    probe_threads_count: usize,
1250    should_compute_bounds: bool,
1251) -> Result<JoinLeftData> {
1252    let schema = left_stream.schema();
1253
1254    // This operation performs 2 steps at once:
1255    // 1. creates a [JoinHashMap] of all batches from the stream
1256    // 2. stores the batches in a vector.
1257    let initial = (Vec::new(), 0, metrics, reservation);
1258    let (batches, num_rows, metrics, mut reservation) = left_stream
1259        .try_fold(initial, |mut acc, batch| async {
1260            let batch_size = get_record_batch_memory_size(&batch);
1261            // Reserve memory for incoming batch
1262            acc.3.try_grow(batch_size)?;
1263            // Update metrics
1264            acc.2.build_mem_used.add(batch_size);
1265            acc.2.build_input_batches.add(1);
1266            acc.2.build_input_rows.add(batch.num_rows());
1267            // Update row count
1268            acc.1 += batch.num_rows();
1269            // Push batch to output
1270            acc.0.push(batch);
1271            Ok(acc)
1272        })
1273        .await?;
1274
1275    // Estimation of memory size, required for hashtable, prior to allocation.
1276    // Final result can be verified using `RawTable.allocation_info()`
1277    let fixed_size_u32 = size_of::<JoinHashMapU32>();
1278    let fixed_size_u64 = size_of::<JoinHashMapU64>();
1279
1280    // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
1281    // `u64` indice variant
1282    let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1283        let estimated_hashtable_size =
1284            estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1285        reservation.try_grow(estimated_hashtable_size)?;
1286        metrics.build_mem_used.add(estimated_hashtable_size);
1287        Box::new(JoinHashMapU64::with_capacity(num_rows))
1288    } else {
1289        let estimated_hashtable_size =
1290            estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1291        reservation.try_grow(estimated_hashtable_size)?;
1292        metrics.build_mem_used.add(estimated_hashtable_size);
1293        Box::new(JoinHashMapU32::with_capacity(num_rows))
1294    };
1295
1296    let mut hashes_buffer = Vec::new();
1297    let mut offset = 0;
1298
1299    // Updating hashmap starting from the last batch
1300    let batches_iter = batches.iter().rev();
1301    for batch in batches_iter.clone() {
1302        hashes_buffer.clear();
1303        hashes_buffer.resize(batch.num_rows(), 0);
1304        update_hash(
1305            &on_left,
1306            batch,
1307            &mut *hashmap,
1308            offset,
1309            &random_state,
1310            &mut hashes_buffer,
1311            0,
1312            true,
1313        )?;
1314        offset += batch.num_rows();
1315    }
1316    // Merge all batches into a single batch, so we can directly index into the arrays
1317    let single_batch = concat_batches(&schema, batches_iter)?;
1318
1319    // Reserve additional memory for visited indices bitmap and create shared builder
1320    let visited_indices_bitmap = if with_visited_indices_bitmap {
1321        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1322        reservation.try_grow(bitmap_size)?;
1323        metrics.build_mem_used.add(bitmap_size);
1324
1325        let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1326        bitmap_buffer.append_n(num_rows, false);
1327        bitmap_buffer
1328    } else {
1329        BooleanBufferBuilder::new(0)
1330    };
1331
1332    let left_values = on_left
1333        .iter()
1334        .map(|c| {
1335            c.evaluate(&single_batch)?
1336                .into_array(single_batch.num_rows())
1337        })
1338        .collect::<Result<Vec<_>>>()?;
1339
1340    // Compute bounds for dynamic filter if enabled
1341    let bounds = if should_compute_bounds && num_rows > 0 {
1342        Some(compute_bounds(&left_values)?)
1343    } else {
1344        None
1345    };
1346
1347    let data = JoinLeftData::new(
1348        hashmap,
1349        single_batch,
1350        left_values.clone(),
1351        Mutex::new(visited_indices_bitmap),
1352        AtomicUsize::new(probe_threads_count),
1353        reservation,
1354        bounds,
1355    );
1356
1357    Ok(data)
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use super::*;
1363    use crate::coalesce_partitions::CoalescePartitionsExec;
1364    use crate::joins::hash_join::stream::lookup_join_hashmap;
1365    use crate::test::{assert_join_metrics, TestMemoryExec};
1366    use crate::{
1367        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1368        test::exec::MockExec,
1369    };
1370
1371    use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1372    use arrow::buffer::NullBuffer;
1373    use arrow::datatypes::{DataType, Field};
1374    use arrow_schema::Schema;
1375    use datafusion_common::hash_utils::create_hashes;
1376    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1377    use datafusion_common::{
1378        assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
1379        ScalarValue,
1380    };
1381    use datafusion_execution::config::SessionConfig;
1382    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1383    use datafusion_expr::Operator;
1384    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1385    use datafusion_physical_expr::PhysicalExpr;
1386    use hashbrown::HashTable;
1387    use insta::{allow_duplicates, assert_snapshot};
1388    use rstest::*;
1389    use rstest_reuse::*;
1390
1391    fn div_ceil(a: usize, b: usize) -> usize {
1392        a.div_ceil(b)
1393    }
1394
1395    #[template]
1396    #[rstest]
1397    fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1398
1399    fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1400        let session_config = SessionConfig::default().with_batch_size(batch_size);
1401        Arc::new(TaskContext::default().with_session_config(session_config))
1402    }
1403
1404    fn build_table(
1405        a: (&str, &Vec<i32>),
1406        b: (&str, &Vec<i32>),
1407        c: (&str, &Vec<i32>),
1408    ) -> Arc<dyn ExecutionPlan> {
1409        let batch = build_table_i32(a, b, c);
1410        let schema = batch.schema();
1411        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1412    }
1413
1414    fn join(
1415        left: Arc<dyn ExecutionPlan>,
1416        right: Arc<dyn ExecutionPlan>,
1417        on: JoinOn,
1418        join_type: &JoinType,
1419        null_equality: NullEquality,
1420    ) -> Result<HashJoinExec> {
1421        HashJoinExec::try_new(
1422            left,
1423            right,
1424            on,
1425            None,
1426            join_type,
1427            None,
1428            PartitionMode::CollectLeft,
1429            null_equality,
1430        )
1431    }
1432
1433    fn join_with_filter(
1434        left: Arc<dyn ExecutionPlan>,
1435        right: Arc<dyn ExecutionPlan>,
1436        on: JoinOn,
1437        filter: JoinFilter,
1438        join_type: &JoinType,
1439        null_equality: NullEquality,
1440    ) -> Result<HashJoinExec> {
1441        HashJoinExec::try_new(
1442            left,
1443            right,
1444            on,
1445            Some(filter),
1446            join_type,
1447            None,
1448            PartitionMode::CollectLeft,
1449            null_equality,
1450        )
1451    }
1452
1453    async fn join_collect(
1454        left: Arc<dyn ExecutionPlan>,
1455        right: Arc<dyn ExecutionPlan>,
1456        on: JoinOn,
1457        join_type: &JoinType,
1458        null_equality: NullEquality,
1459        context: Arc<TaskContext>,
1460    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1461        let join = join(left, right, on, join_type, null_equality)?;
1462        let columns_header = columns(&join.schema());
1463
1464        let stream = join.execute(0, context)?;
1465        let batches = common::collect(stream).await?;
1466        let metrics = join.metrics().unwrap();
1467
1468        Ok((columns_header, batches, metrics))
1469    }
1470
1471    async fn partitioned_join_collect(
1472        left: Arc<dyn ExecutionPlan>,
1473        right: Arc<dyn ExecutionPlan>,
1474        on: JoinOn,
1475        join_type: &JoinType,
1476        null_equality: NullEquality,
1477        context: Arc<TaskContext>,
1478    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1479        join_collect_with_partition_mode(
1480            left,
1481            right,
1482            on,
1483            join_type,
1484            PartitionMode::Partitioned,
1485            null_equality,
1486            context,
1487        )
1488        .await
1489    }
1490
1491    async fn join_collect_with_partition_mode(
1492        left: Arc<dyn ExecutionPlan>,
1493        right: Arc<dyn ExecutionPlan>,
1494        on: JoinOn,
1495        join_type: &JoinType,
1496        partition_mode: PartitionMode,
1497        null_equality: NullEquality,
1498        context: Arc<TaskContext>,
1499    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1500        let partition_count = 4;
1501
1502        let (left_expr, right_expr) = on
1503            .iter()
1504            .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1505            .unzip();
1506
1507        let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1508            PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1509            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1510                left,
1511                Partitioning::Hash(left_expr, partition_count),
1512            )?),
1513            PartitionMode::Auto => {
1514                return internal_err!("Unexpected PartitionMode::Auto in join tests")
1515            }
1516        };
1517
1518        let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1519            PartitionMode::CollectLeft => {
1520                let partition_column_name = right.schema().field(0).name().clone();
1521                let partition_expr = vec![Arc::new(Column::new_with_schema(
1522                    &partition_column_name,
1523                    &right.schema(),
1524                )?) as _];
1525                Arc::new(RepartitionExec::try_new(
1526                    right,
1527                    Partitioning::Hash(partition_expr, partition_count),
1528                )?) as _
1529            }
1530            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1531                right,
1532                Partitioning::Hash(right_expr, partition_count),
1533            )?),
1534            PartitionMode::Auto => {
1535                return internal_err!("Unexpected PartitionMode::Auto in join tests")
1536            }
1537        };
1538
1539        let join = HashJoinExec::try_new(
1540            left_repartitioned,
1541            right_repartitioned,
1542            on,
1543            None,
1544            join_type,
1545            None,
1546            partition_mode,
1547            null_equality,
1548        )?;
1549
1550        let columns = columns(&join.schema());
1551
1552        let mut batches = vec![];
1553        for i in 0..partition_count {
1554            let stream = join.execute(i, Arc::clone(&context))?;
1555            let more_batches = common::collect(stream).await?;
1556            batches.extend(
1557                more_batches
1558                    .into_iter()
1559                    .filter(|b| b.num_rows() > 0)
1560                    .collect::<Vec<_>>(),
1561            );
1562        }
1563        let metrics = join.metrics().unwrap();
1564
1565        Ok((columns, batches, metrics))
1566    }
1567
1568    #[apply(batch_sizes)]
1569    #[tokio::test]
1570    async fn join_inner_one(batch_size: usize) -> Result<()> {
1571        let task_ctx = prepare_task_ctx(batch_size);
1572        let left = build_table(
1573            ("a1", &vec![1, 2, 3]),
1574            ("b1", &vec![4, 5, 5]), // this has a repetition
1575            ("c1", &vec![7, 8, 9]),
1576        );
1577        let right = build_table(
1578            ("a2", &vec![10, 20, 30]),
1579            ("b1", &vec![4, 5, 6]),
1580            ("c2", &vec![70, 80, 90]),
1581        );
1582
1583        let on = vec![(
1584            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1585            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1586        )];
1587
1588        let (columns, batches, metrics) = join_collect(
1589            Arc::clone(&left),
1590            Arc::clone(&right),
1591            on.clone(),
1592            &JoinType::Inner,
1593            NullEquality::NullEqualsNothing,
1594            task_ctx,
1595        )
1596        .await?;
1597
1598        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1599
1600        allow_duplicates! {
1601            // Inner join output is expected to preserve both inputs order
1602            assert_snapshot!(batches_to_string(&batches), @r#"
1603                +----+----+----+----+----+----+
1604                | a1 | b1 | c1 | a2 | b1 | c2 |
1605                +----+----+----+----+----+----+
1606                | 1  | 4  | 7  | 10 | 4  | 70 |
1607                | 2  | 5  | 8  | 20 | 5  | 80 |
1608                | 3  | 5  | 9  | 20 | 5  | 80 |
1609                +----+----+----+----+----+----+
1610                "#);
1611        }
1612
1613        assert_join_metrics!(metrics, 3);
1614
1615        Ok(())
1616    }
1617
1618    #[apply(batch_sizes)]
1619    #[tokio::test]
1620    async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1621        let task_ctx = prepare_task_ctx(batch_size);
1622        let left = build_table(
1623            ("a1", &vec![1, 2, 3]),
1624            ("b1", &vec![4, 5, 5]), // this has a repetition
1625            ("c1", &vec![7, 8, 9]),
1626        );
1627        let right = build_table(
1628            ("a2", &vec![10, 20, 30]),
1629            ("b1", &vec![4, 5, 6]),
1630            ("c2", &vec![70, 80, 90]),
1631        );
1632        let on = vec![(
1633            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1634            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1635        )];
1636
1637        let (columns, batches, metrics) = partitioned_join_collect(
1638            Arc::clone(&left),
1639            Arc::clone(&right),
1640            on.clone(),
1641            &JoinType::Inner,
1642            NullEquality::NullEqualsNothing,
1643            task_ctx,
1644        )
1645        .await?;
1646
1647        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1648
1649        allow_duplicates! {
1650            assert_snapshot!(batches_to_sort_string(&batches), @r#"
1651                +----+----+----+----+----+----+
1652                | a1 | b1 | c1 | a2 | b1 | c2 |
1653                +----+----+----+----+----+----+
1654                | 1  | 4  | 7  | 10 | 4  | 70 |
1655                | 2  | 5  | 8  | 20 | 5  | 80 |
1656                | 3  | 5  | 9  | 20 | 5  | 80 |
1657                +----+----+----+----+----+----+
1658                "#);
1659        }
1660
1661        assert_join_metrics!(metrics, 3);
1662
1663        Ok(())
1664    }
1665
1666    #[tokio::test]
1667    async fn join_inner_one_no_shared_column_names() -> Result<()> {
1668        let task_ctx = Arc::new(TaskContext::default());
1669        let left = build_table(
1670            ("a1", &vec![1, 2, 3]),
1671            ("b1", &vec![4, 5, 5]), // this has a repetition
1672            ("c1", &vec![7, 8, 9]),
1673        );
1674        let right = build_table(
1675            ("a2", &vec![10, 20, 30]),
1676            ("b2", &vec![4, 5, 6]),
1677            ("c2", &vec![70, 80, 90]),
1678        );
1679        let on = vec![(
1680            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1681            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1682        )];
1683
1684        let (columns, batches, metrics) = join_collect(
1685            left,
1686            right,
1687            on,
1688            &JoinType::Inner,
1689            NullEquality::NullEqualsNothing,
1690            task_ctx,
1691        )
1692        .await?;
1693
1694        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1695
1696        // Inner join output is expected to preserve both inputs order
1697        allow_duplicates! {
1698            assert_snapshot!(batches_to_string(&batches), @r#"
1699            +----+----+----+----+----+----+
1700            | a1 | b1 | c1 | a2 | b2 | c2 |
1701            +----+----+----+----+----+----+
1702            | 1  | 4  | 7  | 10 | 4  | 70 |
1703            | 2  | 5  | 8  | 20 | 5  | 80 |
1704            | 3  | 5  | 9  | 20 | 5  | 80 |
1705            +----+----+----+----+----+----+
1706                "#);
1707        }
1708
1709        assert_join_metrics!(metrics, 3);
1710
1711        Ok(())
1712    }
1713
1714    #[tokio::test]
1715    async fn join_inner_one_randomly_ordered() -> Result<()> {
1716        let task_ctx = Arc::new(TaskContext::default());
1717        let left = build_table(
1718            ("a1", &vec![0, 3, 2, 1]),
1719            ("b1", &vec![4, 5, 5, 4]),
1720            ("c1", &vec![6, 9, 8, 7]),
1721        );
1722        let right = build_table(
1723            ("a2", &vec![20, 30, 10]),
1724            ("b2", &vec![5, 6, 4]),
1725            ("c2", &vec![80, 90, 70]),
1726        );
1727        let on = vec![(
1728            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1729            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1730        )];
1731
1732        let (columns, batches, metrics) = join_collect(
1733            left,
1734            right,
1735            on,
1736            &JoinType::Inner,
1737            NullEquality::NullEqualsNothing,
1738            task_ctx,
1739        )
1740        .await?;
1741
1742        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1743
1744        // Inner join output is expected to preserve both inputs order
1745        allow_duplicates! {
1746            assert_snapshot!(batches_to_string(&batches), @r#"
1747            +----+----+----+----+----+----+
1748            | a1 | b1 | c1 | a2 | b2 | c2 |
1749            +----+----+----+----+----+----+
1750            | 3  | 5  | 9  | 20 | 5  | 80 |
1751            | 2  | 5  | 8  | 20 | 5  | 80 |
1752            | 0  | 4  | 6  | 10 | 4  | 70 |
1753            | 1  | 4  | 7  | 10 | 4  | 70 |
1754            +----+----+----+----+----+----+
1755                "#);
1756        }
1757
1758        assert_join_metrics!(metrics, 4);
1759
1760        Ok(())
1761    }
1762
1763    #[apply(batch_sizes)]
1764    #[tokio::test]
1765    async fn join_inner_two(batch_size: usize) -> Result<()> {
1766        let task_ctx = prepare_task_ctx(batch_size);
1767        let left = build_table(
1768            ("a1", &vec![1, 2, 2]),
1769            ("b2", &vec![1, 2, 2]),
1770            ("c1", &vec![7, 8, 9]),
1771        );
1772        let right = build_table(
1773            ("a1", &vec![1, 2, 3]),
1774            ("b2", &vec![1, 2, 2]),
1775            ("c2", &vec![70, 80, 90]),
1776        );
1777        let on = vec![
1778            (
1779                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1780                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1781            ),
1782            (
1783                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1784                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1785            ),
1786        ];
1787
1788        let (columns, batches, metrics) = join_collect(
1789            left,
1790            right,
1791            on,
1792            &JoinType::Inner,
1793            NullEquality::NullEqualsNothing,
1794            task_ctx,
1795        )
1796        .await?;
1797
1798        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1799
1800        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1801            // Expected number of hash table matches = 3
1802            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
1803            let mut expected_batch_count = div_ceil(3, batch_size);
1804            if batch_size == 1 {
1805                expected_batch_count += 1;
1806            }
1807            expected_batch_count
1808        } else {
1809            // With hash collisions enabled, all records will match each other
1810            // and filtered later.
1811            div_ceil(9, batch_size)
1812        };
1813
1814        assert_eq!(batches.len(), expected_batch_count);
1815
1816        // Inner join output is expected to preserve both inputs order
1817        allow_duplicates! {
1818            assert_snapshot!(batches_to_string(&batches), @r#"
1819            +----+----+----+----+----+----+
1820            | a1 | b2 | c1 | a1 | b2 | c2 |
1821            +----+----+----+----+----+----+
1822            | 1  | 1  | 7  | 1  | 1  | 70 |
1823            | 2  | 2  | 8  | 2  | 2  | 80 |
1824            | 2  | 2  | 9  | 2  | 2  | 80 |
1825            +----+----+----+----+----+----+
1826                "#);
1827        }
1828
1829        assert_join_metrics!(metrics, 3);
1830
1831        Ok(())
1832    }
1833
1834    /// Test where the left has 2 parts, the right with 1 part => 1 part
1835    #[apply(batch_sizes)]
1836    #[tokio::test]
1837    async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
1838        let task_ctx = prepare_task_ctx(batch_size);
1839        let batch1 = build_table_i32(
1840            ("a1", &vec![1, 2]),
1841            ("b2", &vec![1, 2]),
1842            ("c1", &vec![7, 8]),
1843        );
1844        let batch2 =
1845            build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
1846        let schema = batch1.schema();
1847        let left =
1848            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1849                .unwrap();
1850        let left = Arc::new(CoalescePartitionsExec::new(left));
1851
1852        let right = build_table(
1853            ("a1", &vec![1, 2, 3]),
1854            ("b2", &vec![1, 2, 2]),
1855            ("c2", &vec![70, 80, 90]),
1856        );
1857        let on = vec![
1858            (
1859                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1860                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1861            ),
1862            (
1863                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1864                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1865            ),
1866        ];
1867
1868        let (columns, batches, metrics) = join_collect(
1869            left,
1870            right,
1871            on,
1872            &JoinType::Inner,
1873            NullEquality::NullEqualsNothing,
1874            task_ctx,
1875        )
1876        .await?;
1877
1878        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1879
1880        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1881            // Expected number of hash table matches = 3
1882            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
1883            let mut expected_batch_count = div_ceil(3, batch_size);
1884            if batch_size == 1 {
1885                expected_batch_count += 1;
1886            }
1887            expected_batch_count
1888        } else {
1889            // With hash collisions enabled, all records will match each other
1890            // and filtered later.
1891            div_ceil(9, batch_size)
1892        };
1893
1894        assert_eq!(batches.len(), expected_batch_count);
1895
1896        // Inner join output is expected to preserve both inputs order
1897        allow_duplicates! {
1898            assert_snapshot!(batches_to_string(&batches), @r#"
1899            +----+----+----+----+----+----+
1900            | a1 | b2 | c1 | a1 | b2 | c2 |
1901            +----+----+----+----+----+----+
1902            | 1  | 1  | 7  | 1  | 1  | 70 |
1903            | 2  | 2  | 8  | 2  | 2  | 80 |
1904            | 2  | 2  | 9  | 2  | 2  | 80 |
1905            +----+----+----+----+----+----+
1906                "#);
1907        }
1908
1909        assert_join_metrics!(metrics, 3);
1910
1911        Ok(())
1912    }
1913
1914    #[tokio::test]
1915    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
1916        let task_ctx = Arc::new(TaskContext::default());
1917        let batch1 = build_table_i32(
1918            ("a1", &vec![0, 3]),
1919            ("b1", &vec![4, 5]),
1920            ("c1", &vec![6, 9]),
1921        );
1922        let batch2 = build_table_i32(
1923            ("a1", &vec![2, 1]),
1924            ("b1", &vec![5, 4]),
1925            ("c1", &vec![8, 7]),
1926        );
1927        let schema = batch1.schema();
1928
1929        let left =
1930            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1931                .unwrap();
1932        let left = Arc::new(CoalescePartitionsExec::new(left));
1933        let right = build_table(
1934            ("a2", &vec![20, 30, 10]),
1935            ("b2", &vec![5, 6, 4]),
1936            ("c2", &vec![80, 90, 70]),
1937        );
1938        let on = vec![(
1939            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1940            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1941        )];
1942
1943        let (columns, batches, metrics) = join_collect(
1944            left,
1945            right,
1946            on,
1947            &JoinType::Inner,
1948            NullEquality::NullEqualsNothing,
1949            task_ctx,
1950        )
1951        .await?;
1952
1953        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1954
1955        // Inner join output is expected to preserve both inputs order
1956        allow_duplicates! {
1957            assert_snapshot!(batches_to_string(&batches), @r#"
1958            +----+----+----+----+----+----+
1959            | a1 | b1 | c1 | a2 | b2 | c2 |
1960            +----+----+----+----+----+----+
1961            | 3  | 5  | 9  | 20 | 5  | 80 |
1962            | 2  | 5  | 8  | 20 | 5  | 80 |
1963            | 0  | 4  | 6  | 10 | 4  | 70 |
1964            | 1  | 4  | 7  | 10 | 4  | 70 |
1965            +----+----+----+----+----+----+
1966                "#);
1967        }
1968
1969        assert_join_metrics!(metrics, 4);
1970
1971        Ok(())
1972    }
1973
1974    /// Test where the left has 1 part, the right has 2 parts => 2 parts
1975    #[apply(batch_sizes)]
1976    #[tokio::test]
1977    async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
1978        let task_ctx = prepare_task_ctx(batch_size);
1979        let left = build_table(
1980            ("a1", &vec![1, 2, 3]),
1981            ("b1", &vec![4, 5, 5]), // this has a repetition
1982            ("c1", &vec![7, 8, 9]),
1983        );
1984
1985        let batch1 = build_table_i32(
1986            ("a2", &vec![10, 20]),
1987            ("b1", &vec![4, 6]),
1988            ("c2", &vec![70, 80]),
1989        );
1990        let batch2 =
1991            build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
1992        let schema = batch1.schema();
1993        let right =
1994            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1995                .unwrap();
1996
1997        let on = vec![(
1998            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1999            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2000        )];
2001
2002        let join = join(
2003            left,
2004            right,
2005            on,
2006            &JoinType::Inner,
2007            NullEquality::NullEqualsNothing,
2008        )?;
2009
2010        let columns = columns(&join.schema());
2011        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2012
2013        // first part
2014        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2015        let batches = common::collect(stream).await?;
2016
2017        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2018            // Expected number of hash table matches for first right batch = 1
2019            // and additional empty batch for non-joined 20-6-80
2020            let mut expected_batch_count = div_ceil(1, batch_size);
2021            if batch_size == 1 {
2022                expected_batch_count += 1;
2023            }
2024            expected_batch_count
2025        } else {
2026            // With hash collisions enabled, all records will match each other
2027            // and filtered later.
2028            div_ceil(6, batch_size)
2029        };
2030        assert_eq!(batches.len(), expected_batch_count);
2031
2032        // Inner join output is expected to preserve both inputs order
2033        allow_duplicates! {
2034            assert_snapshot!(batches_to_string(&batches), @r#"
2035            +----+----+----+----+----+----+
2036            | a1 | b1 | c1 | a2 | b1 | c2 |
2037            +----+----+----+----+----+----+
2038            | 1  | 4  | 7  | 10 | 4  | 70 |
2039            +----+----+----+----+----+----+
2040                "#);
2041        }
2042
2043        // second part
2044        let stream = join.execute(1, Arc::clone(&task_ctx))?;
2045        let batches = common::collect(stream).await?;
2046
2047        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2048            // Expected number of hash table matches for second right batch = 2
2049            div_ceil(2, batch_size)
2050        } else {
2051            // With hash collisions enabled, all records will match each other
2052            // and filtered later.
2053            div_ceil(3, batch_size)
2054        };
2055        assert_eq!(batches.len(), expected_batch_count);
2056
2057        // Inner join output is expected to preserve both inputs order
2058        allow_duplicates! {
2059            assert_snapshot!(batches_to_string(&batches), @r#"
2060            +----+----+----+----+----+----+
2061            | a1 | b1 | c1 | a2 | b1 | c2 |
2062            +----+----+----+----+----+----+
2063            | 2  | 5  | 8  | 30 | 5  | 90 |
2064            | 3  | 5  | 9  | 30 | 5  | 90 |
2065            +----+----+----+----+----+----+
2066                "#);
2067        }
2068
2069        Ok(())
2070    }
2071
2072    fn build_table_two_batches(
2073        a: (&str, &Vec<i32>),
2074        b: (&str, &Vec<i32>),
2075        c: (&str, &Vec<i32>),
2076    ) -> Arc<dyn ExecutionPlan> {
2077        let batch = build_table_i32(a, b, c);
2078        let schema = batch.schema();
2079        TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2080    }
2081
2082    #[apply(batch_sizes)]
2083    #[tokio::test]
2084    async fn join_left_multi_batch(batch_size: usize) {
2085        let task_ctx = prepare_task_ctx(batch_size);
2086        let left = build_table(
2087            ("a1", &vec![1, 2, 3]),
2088            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2089            ("c1", &vec![7, 8, 9]),
2090        );
2091        let right = build_table_two_batches(
2092            ("a2", &vec![10, 20, 30]),
2093            ("b1", &vec![4, 5, 6]),
2094            ("c2", &vec![70, 80, 90]),
2095        );
2096        let on = vec![(
2097            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2098            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2099        )];
2100
2101        let join = join(
2102            left,
2103            right,
2104            on,
2105            &JoinType::Left,
2106            NullEquality::NullEqualsNothing,
2107        )
2108        .unwrap();
2109
2110        let columns = columns(&join.schema());
2111        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2112
2113        let stream = join.execute(0, task_ctx).unwrap();
2114        let batches = common::collect(stream).await.unwrap();
2115
2116        allow_duplicates! {
2117            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2118            +----+----+----+----+----+----+
2119            | a1 | b1 | c1 | a2 | b1 | c2 |
2120            +----+----+----+----+----+----+
2121            | 1  | 4  | 7  | 10 | 4  | 70 |
2122            | 1  | 4  | 7  | 10 | 4  | 70 |
2123            | 2  | 5  | 8  | 20 | 5  | 80 |
2124            | 2  | 5  | 8  | 20 | 5  | 80 |
2125            | 3  | 7  | 9  |    |    |    |
2126            +----+----+----+----+----+----+
2127                "#);
2128        }
2129    }
2130
2131    #[apply(batch_sizes)]
2132    #[tokio::test]
2133    async fn join_full_multi_batch(batch_size: usize) {
2134        let task_ctx = prepare_task_ctx(batch_size);
2135        let left = build_table(
2136            ("a1", &vec![1, 2, 3]),
2137            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2138            ("c1", &vec![7, 8, 9]),
2139        );
2140        // create two identical batches for the right side
2141        let right = build_table_two_batches(
2142            ("a2", &vec![10, 20, 30]),
2143            ("b2", &vec![4, 5, 6]),
2144            ("c2", &vec![70, 80, 90]),
2145        );
2146        let on = vec![(
2147            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2148            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2149        )];
2150
2151        let join = join(
2152            left,
2153            right,
2154            on,
2155            &JoinType::Full,
2156            NullEquality::NullEqualsNothing,
2157        )
2158        .unwrap();
2159
2160        let columns = columns(&join.schema());
2161        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2162
2163        let stream = join.execute(0, task_ctx).unwrap();
2164        let batches = common::collect(stream).await.unwrap();
2165
2166        allow_duplicates! {
2167            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2168            +----+----+----+----+----+----+
2169            | a1 | b1 | c1 | a2 | b2 | c2 |
2170            +----+----+----+----+----+----+
2171            |    |    |    | 30 | 6  | 90 |
2172            |    |    |    | 30 | 6  | 90 |
2173            | 1  | 4  | 7  | 10 | 4  | 70 |
2174            | 1  | 4  | 7  | 10 | 4  | 70 |
2175            | 2  | 5  | 8  | 20 | 5  | 80 |
2176            | 2  | 5  | 8  | 20 | 5  | 80 |
2177            | 3  | 7  | 9  |    |    |    |
2178            +----+----+----+----+----+----+
2179                "#);
2180        }
2181    }
2182
2183    #[apply(batch_sizes)]
2184    #[tokio::test]
2185    async fn join_left_empty_right(batch_size: usize) {
2186        let task_ctx = prepare_task_ctx(batch_size);
2187        let left = build_table(
2188            ("a1", &vec![1, 2, 3]),
2189            ("b1", &vec![4, 5, 7]),
2190            ("c1", &vec![7, 8, 9]),
2191        );
2192        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2193        let on = vec![(
2194            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2195            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2196        )];
2197        let schema = right.schema();
2198        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2199        let join = join(
2200            left,
2201            right,
2202            on,
2203            &JoinType::Left,
2204            NullEquality::NullEqualsNothing,
2205        )
2206        .unwrap();
2207
2208        let columns = columns(&join.schema());
2209        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2210
2211        let stream = join.execute(0, task_ctx).unwrap();
2212        let batches = common::collect(stream).await.unwrap();
2213
2214        allow_duplicates! {
2215            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2216            +----+----+----+----+----+----+
2217            | a1 | b1 | c1 | a2 | b1 | c2 |
2218            +----+----+----+----+----+----+
2219            | 1  | 4  | 7  |    |    |    |
2220            | 2  | 5  | 8  |    |    |    |
2221            | 3  | 7  | 9  |    |    |    |
2222            +----+----+----+----+----+----+
2223                "#);
2224        }
2225    }
2226
2227    #[apply(batch_sizes)]
2228    #[tokio::test]
2229    async fn join_full_empty_right(batch_size: usize) {
2230        let task_ctx = prepare_task_ctx(batch_size);
2231        let left = build_table(
2232            ("a1", &vec![1, 2, 3]),
2233            ("b1", &vec![4, 5, 7]),
2234            ("c1", &vec![7, 8, 9]),
2235        );
2236        let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2237        let on = vec![(
2238            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2239            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2240        )];
2241        let schema = right.schema();
2242        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2243        let join = join(
2244            left,
2245            right,
2246            on,
2247            &JoinType::Full,
2248            NullEquality::NullEqualsNothing,
2249        )
2250        .unwrap();
2251
2252        let columns = columns(&join.schema());
2253        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2254
2255        let stream = join.execute(0, task_ctx).unwrap();
2256        let batches = common::collect(stream).await.unwrap();
2257
2258        allow_duplicates! {
2259            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2260            +----+----+----+----+----+----+
2261            | a1 | b1 | c1 | a2 | b2 | c2 |
2262            +----+----+----+----+----+----+
2263            | 1  | 4  | 7  |    |    |    |
2264            | 2  | 5  | 8  |    |    |    |
2265            | 3  | 7  | 9  |    |    |    |
2266            +----+----+----+----+----+----+
2267                "#);
2268        }
2269    }
2270
2271    #[apply(batch_sizes)]
2272    #[tokio::test]
2273    async fn join_left_one(batch_size: usize) -> Result<()> {
2274        let task_ctx = prepare_task_ctx(batch_size);
2275        let left = build_table(
2276            ("a1", &vec![1, 2, 3]),
2277            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2278            ("c1", &vec![7, 8, 9]),
2279        );
2280        let right = build_table(
2281            ("a2", &vec![10, 20, 30]),
2282            ("b1", &vec![4, 5, 6]),
2283            ("c2", &vec![70, 80, 90]),
2284        );
2285        let on = vec![(
2286            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2287            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2288        )];
2289
2290        let (columns, batches, metrics) = join_collect(
2291            Arc::clone(&left),
2292            Arc::clone(&right),
2293            on.clone(),
2294            &JoinType::Left,
2295            NullEquality::NullEqualsNothing,
2296            task_ctx,
2297        )
2298        .await?;
2299
2300        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2301
2302        allow_duplicates! {
2303            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2304            +----+----+----+----+----+----+
2305            | a1 | b1 | c1 | a2 | b1 | c2 |
2306            +----+----+----+----+----+----+
2307            | 1  | 4  | 7  | 10 | 4  | 70 |
2308            | 2  | 5  | 8  | 20 | 5  | 80 |
2309            | 3  | 7  | 9  |    |    |    |
2310            +----+----+----+----+----+----+
2311                "#);
2312        }
2313
2314        assert_join_metrics!(metrics, 3);
2315
2316        Ok(())
2317    }
2318
2319    #[apply(batch_sizes)]
2320    #[tokio::test]
2321    async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2322        let task_ctx = prepare_task_ctx(batch_size);
2323        let left = build_table(
2324            ("a1", &vec![1, 2, 3]),
2325            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2326            ("c1", &vec![7, 8, 9]),
2327        );
2328        let right = build_table(
2329            ("a2", &vec![10, 20, 30]),
2330            ("b1", &vec![4, 5, 6]),
2331            ("c2", &vec![70, 80, 90]),
2332        );
2333        let on = vec![(
2334            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2335            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2336        )];
2337
2338        let (columns, batches, metrics) = partitioned_join_collect(
2339            Arc::clone(&left),
2340            Arc::clone(&right),
2341            on.clone(),
2342            &JoinType::Left,
2343            NullEquality::NullEqualsNothing,
2344            task_ctx,
2345        )
2346        .await?;
2347
2348        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2349
2350        allow_duplicates! {
2351            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2352            +----+----+----+----+----+----+
2353            | a1 | b1 | c1 | a2 | b1 | c2 |
2354            +----+----+----+----+----+----+
2355            | 1  | 4  | 7  | 10 | 4  | 70 |
2356            | 2  | 5  | 8  | 20 | 5  | 80 |
2357            | 3  | 7  | 9  |    |    |    |
2358            +----+----+----+----+----+----+
2359                "#);
2360        }
2361
2362        assert_join_metrics!(metrics, 3);
2363
2364        Ok(())
2365    }
2366
2367    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2368        // just two line match
2369        // b1 = 10
2370        build_table(
2371            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2372            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2373            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2374        )
2375    }
2376
2377    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2378        // just two line match
2379        // b2 = 10
2380        build_table(
2381            ("a2", &vec![8, 12, 6, 2, 10, 4]),
2382            ("b2", &vec![8, 10, 6, 2, 10, 4]),
2383            ("c2", &vec![20, 40, 60, 80, 100, 120]),
2384        )
2385    }
2386
2387    #[apply(batch_sizes)]
2388    #[tokio::test]
2389    async fn join_left_semi(batch_size: usize) -> Result<()> {
2390        let task_ctx = prepare_task_ctx(batch_size);
2391        let left = build_semi_anti_left_table();
2392        let right = build_semi_anti_right_table();
2393        // left_table left semi join right_table on left_table.b1 = right_table.b2
2394        let on = vec![(
2395            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2396            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2397        )];
2398
2399        let join = join(
2400            left,
2401            right,
2402            on,
2403            &JoinType::LeftSemi,
2404            NullEquality::NullEqualsNothing,
2405        )?;
2406
2407        let columns = columns(&join.schema());
2408        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2409
2410        let stream = join.execute(0, task_ctx)?;
2411        let batches = common::collect(stream).await?;
2412
2413        // ignore the order
2414        allow_duplicates! {
2415            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2416            +----+----+-----+
2417            | a1 | b1 | c1  |
2418            +----+----+-----+
2419            | 11 | 8  | 110 |
2420            | 13 | 10 | 130 |
2421            | 9  | 8  | 90  |
2422            +----+----+-----+
2423                "#);
2424        }
2425
2426        Ok(())
2427    }
2428
2429    #[apply(batch_sizes)]
2430    #[tokio::test]
2431    async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2432        let task_ctx = prepare_task_ctx(batch_size);
2433        let left = build_semi_anti_left_table();
2434        let right = build_semi_anti_right_table();
2435
2436        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 10
2437        let on = vec![(
2438            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2439            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2440        )];
2441
2442        let column_indices = vec![ColumnIndex {
2443            index: 0,
2444            side: JoinSide::Right,
2445        }];
2446        let intermediate_schema =
2447            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2448
2449        let filter_expression = Arc::new(BinaryExpr::new(
2450            Arc::new(Column::new("x", 0)),
2451            Operator::NotEq,
2452            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2453        )) as Arc<dyn PhysicalExpr>;
2454
2455        let filter = JoinFilter::new(
2456            filter_expression,
2457            column_indices.clone(),
2458            Arc::new(intermediate_schema.clone()),
2459        );
2460
2461        let join = join_with_filter(
2462            Arc::clone(&left),
2463            Arc::clone(&right),
2464            on.clone(),
2465            filter,
2466            &JoinType::LeftSemi,
2467            NullEquality::NullEqualsNothing,
2468        )?;
2469
2470        let columns_header = columns(&join.schema());
2471        assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2472
2473        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2474        let batches = common::collect(stream).await?;
2475
2476        allow_duplicates! {
2477            assert_snapshot!(batches_to_sort_string(&batches), @r"
2478            +----+----+-----+
2479            | a1 | b1 | c1  |
2480            +----+----+-----+
2481            | 11 | 8  | 110 |
2482            | 13 | 10 | 130 |
2483            | 9  | 8  | 90  |
2484            +----+----+-----+
2485            ");
2486        }
2487
2488        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10
2489        let filter_expression = Arc::new(BinaryExpr::new(
2490            Arc::new(Column::new("x", 0)),
2491            Operator::Gt,
2492            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2493        )) as Arc<dyn PhysicalExpr>;
2494        let filter = JoinFilter::new(
2495            filter_expression,
2496            column_indices,
2497            Arc::new(intermediate_schema),
2498        );
2499
2500        let join = join_with_filter(
2501            left,
2502            right,
2503            on,
2504            filter,
2505            &JoinType::LeftSemi,
2506            NullEquality::NullEqualsNothing,
2507        )?;
2508
2509        let columns_header = columns(&join.schema());
2510        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2511
2512        let stream = join.execute(0, task_ctx)?;
2513        let batches = common::collect(stream).await?;
2514
2515        allow_duplicates! {
2516            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2517            +----+----+-----+
2518            | a1 | b1 | c1  |
2519            +----+----+-----+
2520            | 13 | 10 | 130 |
2521            +----+----+-----+
2522                "#);
2523        }
2524
2525        Ok(())
2526    }
2527
2528    #[apply(batch_sizes)]
2529    #[tokio::test]
2530    async fn join_right_semi(batch_size: usize) -> Result<()> {
2531        let task_ctx = prepare_task_ctx(batch_size);
2532        let left = build_semi_anti_left_table();
2533        let right = build_semi_anti_right_table();
2534
2535        // left_table right semi join right_table on left_table.b1 = right_table.b2
2536        let on = vec![(
2537            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2538            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2539        )];
2540
2541        let join = join(
2542            left,
2543            right,
2544            on,
2545            &JoinType::RightSemi,
2546            NullEquality::NullEqualsNothing,
2547        )?;
2548
2549        let columns = columns(&join.schema());
2550        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2551
2552        let stream = join.execute(0, task_ctx)?;
2553        let batches = common::collect(stream).await?;
2554
2555        // RightSemi join output is expected to preserve right input order
2556        allow_duplicates! {
2557            assert_snapshot!(batches_to_string(&batches), @r#"
2558            +----+----+-----+
2559            | a2 | b2 | c2  |
2560            +----+----+-----+
2561            | 8  | 8  | 20  |
2562            | 12 | 10 | 40  |
2563            | 10 | 10 | 100 |
2564            +----+----+-----+
2565                "#);
2566        }
2567
2568        Ok(())
2569    }
2570
2571    #[apply(batch_sizes)]
2572    #[tokio::test]
2573    async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2574        let task_ctx = prepare_task_ctx(batch_size);
2575        let left = build_semi_anti_left_table();
2576        let right = build_semi_anti_right_table();
2577
2578        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2579        let on = vec![(
2580            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2581            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2582        )];
2583
2584        let column_indices = vec![ColumnIndex {
2585            index: 0,
2586            side: JoinSide::Left,
2587        }];
2588        let intermediate_schema =
2589            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2590
2591        let filter_expression = Arc::new(BinaryExpr::new(
2592            Arc::new(Column::new("x", 0)),
2593            Operator::NotEq,
2594            Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2595        )) as Arc<dyn PhysicalExpr>;
2596
2597        let filter = JoinFilter::new(
2598            filter_expression,
2599            column_indices.clone(),
2600            Arc::new(intermediate_schema.clone()),
2601        );
2602
2603        let join = join_with_filter(
2604            Arc::clone(&left),
2605            Arc::clone(&right),
2606            on.clone(),
2607            filter,
2608            &JoinType::RightSemi,
2609            NullEquality::NullEqualsNothing,
2610        )?;
2611
2612        let columns = columns(&join.schema());
2613        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2614
2615        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2616        let batches = common::collect(stream).await?;
2617
2618        // RightSemi join output is expected to preserve right input order
2619        allow_duplicates! {
2620            assert_snapshot!(batches_to_string(&batches), @r#"
2621            +----+----+-----+
2622            | a2 | b2 | c2  |
2623            +----+----+-----+
2624            | 8  | 8  | 20  |
2625            | 12 | 10 | 40  |
2626            | 10 | 10 | 100 |
2627            +----+----+-----+
2628                "#);
2629        }
2630
2631        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2632        let filter_expression = Arc::new(BinaryExpr::new(
2633            Arc::new(Column::new("x", 0)),
2634            Operator::Gt,
2635            Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2636        )) as Arc<dyn PhysicalExpr>;
2637
2638        let filter = JoinFilter::new(
2639            filter_expression,
2640            column_indices,
2641            Arc::new(intermediate_schema.clone()),
2642        );
2643
2644        let join = join_with_filter(
2645            left,
2646            right,
2647            on,
2648            filter,
2649            &JoinType::RightSemi,
2650            NullEquality::NullEqualsNothing,
2651        )?;
2652        let stream = join.execute(0, task_ctx)?;
2653        let batches = common::collect(stream).await?;
2654
2655        // RightSemi join output is expected to preserve right input order
2656        allow_duplicates! {
2657            assert_snapshot!(batches_to_string(&batches), @r#"
2658            +----+----+-----+
2659            | a2 | b2 | c2  |
2660            +----+----+-----+
2661            | 12 | 10 | 40  |
2662            | 10 | 10 | 100 |
2663            +----+----+-----+
2664                "#);
2665        }
2666
2667        Ok(())
2668    }
2669
2670    #[apply(batch_sizes)]
2671    #[tokio::test]
2672    async fn join_left_anti(batch_size: usize) -> Result<()> {
2673        let task_ctx = prepare_task_ctx(batch_size);
2674        let left = build_semi_anti_left_table();
2675        let right = build_semi_anti_right_table();
2676        // left_table left anti join right_table on left_table.b1 = right_table.b2
2677        let on = vec![(
2678            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2679            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2680        )];
2681
2682        let join = join(
2683            left,
2684            right,
2685            on,
2686            &JoinType::LeftAnti,
2687            NullEquality::NullEqualsNothing,
2688        )?;
2689
2690        let columns = columns(&join.schema());
2691        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2692
2693        let stream = join.execute(0, task_ctx)?;
2694        let batches = common::collect(stream).await?;
2695
2696        allow_duplicates! {
2697            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2698            +----+----+----+
2699            | a1 | b1 | c1 |
2700            +----+----+----+
2701            | 1  | 1  | 10 |
2702            | 3  | 3  | 30 |
2703            | 5  | 5  | 50 |
2704            | 7  | 7  | 70 |
2705            +----+----+----+
2706                "#);
2707        }
2708        Ok(())
2709    }
2710
2711    #[apply(batch_sizes)]
2712    #[tokio::test]
2713    async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2714        let task_ctx = prepare_task_ctx(batch_size);
2715        let left = build_semi_anti_left_table();
2716        let right = build_semi_anti_right_table();
2717        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8
2718        let on = vec![(
2719            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2720            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2721        )];
2722
2723        let column_indices = vec![ColumnIndex {
2724            index: 0,
2725            side: JoinSide::Right,
2726        }];
2727        let intermediate_schema =
2728            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2729        let filter_expression = Arc::new(BinaryExpr::new(
2730            Arc::new(Column::new("x", 0)),
2731            Operator::NotEq,
2732            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2733        )) as Arc<dyn PhysicalExpr>;
2734
2735        let filter = JoinFilter::new(
2736            filter_expression,
2737            column_indices.clone(),
2738            Arc::new(intermediate_schema.clone()),
2739        );
2740
2741        let join = join_with_filter(
2742            Arc::clone(&left),
2743            Arc::clone(&right),
2744            on.clone(),
2745            filter,
2746            &JoinType::LeftAnti,
2747            NullEquality::NullEqualsNothing,
2748        )?;
2749
2750        let columns_header = columns(&join.schema());
2751        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2752
2753        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2754        let batches = common::collect(stream).await?;
2755
2756        allow_duplicates! {
2757            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2758            +----+----+-----+
2759            | a1 | b1 | c1  |
2760            +----+----+-----+
2761            | 1  | 1  | 10  |
2762            | 11 | 8  | 110 |
2763            | 3  | 3  | 30  |
2764            | 5  | 5  | 50  |
2765            | 7  | 7  | 70  |
2766            | 9  | 8  | 90  |
2767            +----+----+-----+
2768                "#);
2769        }
2770
2771        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13
2772        let filter_expression = Arc::new(BinaryExpr::new(
2773            Arc::new(Column::new("x", 0)),
2774            Operator::NotEq,
2775            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2776        )) as Arc<dyn PhysicalExpr>;
2777
2778        let filter = JoinFilter::new(
2779            filter_expression,
2780            column_indices,
2781            Arc::new(intermediate_schema),
2782        );
2783
2784        let join = join_with_filter(
2785            left,
2786            right,
2787            on,
2788            filter,
2789            &JoinType::LeftAnti,
2790            NullEquality::NullEqualsNothing,
2791        )?;
2792
2793        let columns_header = columns(&join.schema());
2794        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2795
2796        let stream = join.execute(0, task_ctx)?;
2797        let batches = common::collect(stream).await?;
2798
2799        allow_duplicates! {
2800            assert_snapshot!(batches_to_sort_string(&batches), @r#"
2801            +----+----+-----+
2802            | a1 | b1 | c1  |
2803            +----+----+-----+
2804            | 1  | 1  | 10  |
2805            | 11 | 8  | 110 |
2806            | 3  | 3  | 30  |
2807            | 5  | 5  | 50  |
2808            | 7  | 7  | 70  |
2809            | 9  | 8  | 90  |
2810            +----+----+-----+
2811                "#);
2812        }
2813
2814        Ok(())
2815    }
2816
2817    #[apply(batch_sizes)]
2818    #[tokio::test]
2819    async fn join_right_anti(batch_size: usize) -> Result<()> {
2820        let task_ctx = prepare_task_ctx(batch_size);
2821        let left = build_semi_anti_left_table();
2822        let right = build_semi_anti_right_table();
2823        let on = vec![(
2824            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2825            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2826        )];
2827
2828        let join = join(
2829            left,
2830            right,
2831            on,
2832            &JoinType::RightAnti,
2833            NullEquality::NullEqualsNothing,
2834        )?;
2835
2836        let columns = columns(&join.schema());
2837        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2838
2839        let stream = join.execute(0, task_ctx)?;
2840        let batches = common::collect(stream).await?;
2841
2842        // RightAnti join output is expected to preserve right input order
2843        allow_duplicates! {
2844            assert_snapshot!(batches_to_string(&batches), @r#"
2845            +----+----+-----+
2846            | a2 | b2 | c2  |
2847            +----+----+-----+
2848            | 6  | 6  | 60  |
2849            | 2  | 2  | 80  |
2850            | 4  | 4  | 120 |
2851            +----+----+-----+
2852                "#);
2853        }
2854        Ok(())
2855    }
2856
2857    #[apply(batch_sizes)]
2858    #[tokio::test]
2859    async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
2860        let task_ctx = prepare_task_ctx(batch_size);
2861        let left = build_semi_anti_left_table();
2862        let right = build_semi_anti_right_table();
2863        // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13
2864        let on = vec![(
2865            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2866            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2867        )];
2868
2869        let column_indices = vec![ColumnIndex {
2870            index: 0,
2871            side: JoinSide::Left,
2872        }];
2873        let intermediate_schema =
2874            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2875
2876        let filter_expression = Arc::new(BinaryExpr::new(
2877            Arc::new(Column::new("x", 0)),
2878            Operator::NotEq,
2879            Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
2880        )) as Arc<dyn PhysicalExpr>;
2881
2882        let filter = JoinFilter::new(
2883            filter_expression,
2884            column_indices,
2885            Arc::new(intermediate_schema.clone()),
2886        );
2887
2888        let join = join_with_filter(
2889            Arc::clone(&left),
2890            Arc::clone(&right),
2891            on.clone(),
2892            filter,
2893            &JoinType::RightAnti,
2894            NullEquality::NullEqualsNothing,
2895        )?;
2896
2897        let columns_header = columns(&join.schema());
2898        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
2899
2900        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2901        let batches = common::collect(stream).await?;
2902
2903        // RightAnti join output is expected to preserve right input order
2904        allow_duplicates! {
2905            assert_snapshot!(batches_to_string(&batches), @r#"
2906            +----+----+-----+
2907            | a2 | b2 | c2  |
2908            +----+----+-----+
2909            | 12 | 10 | 40  |
2910            | 6  | 6  | 60  |
2911            | 2  | 2  | 80  |
2912            | 10 | 10 | 100 |
2913            | 4  | 4  | 120 |
2914            +----+----+-----+
2915                "#);
2916        }
2917
2918        // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
2919        let column_indices = vec![ColumnIndex {
2920            index: 1,
2921            side: JoinSide::Right,
2922        }];
2923        let filter_expression = Arc::new(BinaryExpr::new(
2924            Arc::new(Column::new("x", 0)),
2925            Operator::NotEq,
2926            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2927        )) as Arc<dyn PhysicalExpr>;
2928
2929        let filter = JoinFilter::new(
2930            filter_expression,
2931            column_indices,
2932            Arc::new(intermediate_schema),
2933        );
2934
2935        let join = join_with_filter(
2936            left,
2937            right,
2938            on,
2939            filter,
2940            &JoinType::RightAnti,
2941            NullEquality::NullEqualsNothing,
2942        )?;
2943
2944        let columns_header = columns(&join.schema());
2945        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
2946
2947        let stream = join.execute(0, task_ctx)?;
2948        let batches = common::collect(stream).await?;
2949
2950        // RightAnti join output is expected to preserve right input order
2951        allow_duplicates! {
2952            assert_snapshot!(batches_to_string(&batches), @r#"
2953            +----+----+-----+
2954            | a2 | b2 | c2  |
2955            +----+----+-----+
2956            | 8  | 8  | 20  |
2957            | 6  | 6  | 60  |
2958            | 2  | 2  | 80  |
2959            | 4  | 4  | 120 |
2960            +----+----+-----+
2961                "#);
2962        }
2963
2964        Ok(())
2965    }
2966
2967    #[apply(batch_sizes)]
2968    #[tokio::test]
2969    async fn join_right_one(batch_size: usize) -> Result<()> {
2970        let task_ctx = prepare_task_ctx(batch_size);
2971        let left = build_table(
2972            ("a1", &vec![1, 2, 3]),
2973            ("b1", &vec![4, 5, 7]),
2974            ("c1", &vec![7, 8, 9]),
2975        );
2976        let right = build_table(
2977            ("a2", &vec![10, 20, 30]),
2978            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
2979            ("c2", &vec![70, 80, 90]),
2980        );
2981        let on = vec![(
2982            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2983            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2984        )];
2985
2986        let (columns, batches, metrics) = join_collect(
2987            left,
2988            right,
2989            on,
2990            &JoinType::Right,
2991            NullEquality::NullEqualsNothing,
2992            task_ctx,
2993        )
2994        .await?;
2995
2996        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2997
2998        allow_duplicates! {
2999            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3000            +----+----+----+----+----+----+
3001            | a1 | b1 | c1 | a2 | b1 | c2 |
3002            +----+----+----+----+----+----+
3003            |    |    |    | 30 | 6  | 90 |
3004            | 1  | 4  | 7  | 10 | 4  | 70 |
3005            | 2  | 5  | 8  | 20 | 5  | 80 |
3006            +----+----+----+----+----+----+
3007                "#);
3008        }
3009
3010        assert_join_metrics!(metrics, 3);
3011
3012        Ok(())
3013    }
3014
3015    #[apply(batch_sizes)]
3016    #[tokio::test]
3017    async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3018        let task_ctx = prepare_task_ctx(batch_size);
3019        let left = build_table(
3020            ("a1", &vec![1, 2, 3]),
3021            ("b1", &vec![4, 5, 7]),
3022            ("c1", &vec![7, 8, 9]),
3023        );
3024        let right = build_table(
3025            ("a2", &vec![10, 20, 30]),
3026            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3027            ("c2", &vec![70, 80, 90]),
3028        );
3029        let on = vec![(
3030            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3031            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3032        )];
3033
3034        let (columns, batches, metrics) = partitioned_join_collect(
3035            left,
3036            right,
3037            on,
3038            &JoinType::Right,
3039            NullEquality::NullEqualsNothing,
3040            task_ctx,
3041        )
3042        .await?;
3043
3044        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3045
3046        allow_duplicates! {
3047            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3048            +----+----+----+----+----+----+
3049            | a1 | b1 | c1 | a2 | b1 | c2 |
3050            +----+----+----+----+----+----+
3051            |    |    |    | 30 | 6  | 90 |
3052            | 1  | 4  | 7  | 10 | 4  | 70 |
3053            | 2  | 5  | 8  | 20 | 5  | 80 |
3054            +----+----+----+----+----+----+
3055                "#);
3056        }
3057
3058        assert_join_metrics!(metrics, 3);
3059
3060        Ok(())
3061    }
3062
3063    #[apply(batch_sizes)]
3064    #[tokio::test]
3065    async fn join_full_one(batch_size: usize) -> Result<()> {
3066        let task_ctx = prepare_task_ctx(batch_size);
3067        let left = build_table(
3068            ("a1", &vec![1, 2, 3]),
3069            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3070            ("c1", &vec![7, 8, 9]),
3071        );
3072        let right = build_table(
3073            ("a2", &vec![10, 20, 30]),
3074            ("b2", &vec![4, 5, 6]),
3075            ("c2", &vec![70, 80, 90]),
3076        );
3077        let on = vec![(
3078            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3079            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3080        )];
3081
3082        let join = join(
3083            left,
3084            right,
3085            on,
3086            &JoinType::Full,
3087            NullEquality::NullEqualsNothing,
3088        )?;
3089
3090        let columns = columns(&join.schema());
3091        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3092
3093        let stream = join.execute(0, task_ctx)?;
3094        let batches = common::collect(stream).await?;
3095
3096        allow_duplicates! {
3097            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3098            +----+----+----+----+----+----+
3099            | a1 | b1 | c1 | a2 | b2 | c2 |
3100            +----+----+----+----+----+----+
3101            |    |    |    | 30 | 6  | 90 |
3102            | 1  | 4  | 7  | 10 | 4  | 70 |
3103            | 2  | 5  | 8  | 20 | 5  | 80 |
3104            | 3  | 7  | 9  |    |    |    |
3105            +----+----+----+----+----+----+
3106                "#);
3107        }
3108
3109        Ok(())
3110    }
3111
3112    #[apply(batch_sizes)]
3113    #[tokio::test]
3114    async fn join_left_mark(batch_size: usize) -> Result<()> {
3115        let task_ctx = prepare_task_ctx(batch_size);
3116        let left = build_table(
3117            ("a1", &vec![1, 2, 3]),
3118            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3119            ("c1", &vec![7, 8, 9]),
3120        );
3121        let right = build_table(
3122            ("a2", &vec![10, 20, 30]),
3123            ("b1", &vec![4, 5, 6]),
3124            ("c2", &vec![70, 80, 90]),
3125        );
3126        let on = vec![(
3127            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3128            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3129        )];
3130
3131        let (columns, batches, metrics) = join_collect(
3132            Arc::clone(&left),
3133            Arc::clone(&right),
3134            on.clone(),
3135            &JoinType::LeftMark,
3136            NullEquality::NullEqualsNothing,
3137            task_ctx,
3138        )
3139        .await?;
3140
3141        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3142
3143        allow_duplicates! {
3144            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3145            +----+----+----+-------+
3146            | a1 | b1 | c1 | mark  |
3147            +----+----+----+-------+
3148            | 1  | 4  | 7  | true  |
3149            | 2  | 5  | 8  | true  |
3150            | 3  | 7  | 9  | false |
3151            +----+----+----+-------+
3152                "#);
3153        }
3154
3155        assert_join_metrics!(metrics, 3);
3156
3157        Ok(())
3158    }
3159
3160    #[apply(batch_sizes)]
3161    #[tokio::test]
3162    async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3163        let task_ctx = prepare_task_ctx(batch_size);
3164        let left = build_table(
3165            ("a1", &vec![1, 2, 3]),
3166            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3167            ("c1", &vec![7, 8, 9]),
3168        );
3169        let right = build_table(
3170            ("a2", &vec![10, 20, 30, 40]),
3171            ("b1", &vec![4, 4, 5, 6]),
3172            ("c2", &vec![60, 70, 80, 90]),
3173        );
3174        let on = vec![(
3175            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3176            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3177        )];
3178
3179        let (columns, batches, metrics) = partitioned_join_collect(
3180            Arc::clone(&left),
3181            Arc::clone(&right),
3182            on.clone(),
3183            &JoinType::LeftMark,
3184            NullEquality::NullEqualsNothing,
3185            task_ctx,
3186        )
3187        .await?;
3188
3189        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3190
3191        allow_duplicates! {
3192            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3193            +----+----+----+-------+
3194            | a1 | b1 | c1 | mark  |
3195            +----+----+----+-------+
3196            | 1  | 4  | 7  | true  |
3197            | 2  | 5  | 8  | true  |
3198            | 3  | 7  | 9  | false |
3199            +----+----+----+-------+
3200                "#);
3201        }
3202
3203        assert_join_metrics!(metrics, 3);
3204
3205        Ok(())
3206    }
3207
3208    #[apply(batch_sizes)]
3209    #[tokio::test]
3210    async fn join_right_mark(batch_size: usize) -> Result<()> {
3211        let task_ctx = prepare_task_ctx(batch_size);
3212        let left = build_table(
3213            ("a1", &vec![1, 2, 3]),
3214            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3215            ("c1", &vec![7, 8, 9]),
3216        );
3217        let right = build_table(
3218            ("a2", &vec![10, 20, 30]),
3219            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3220            ("c2", &vec![70, 80, 90]),
3221        );
3222        let on = vec![(
3223            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3224            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3225        )];
3226
3227        let (columns, batches, metrics) = join_collect(
3228            Arc::clone(&left),
3229            Arc::clone(&right),
3230            on.clone(),
3231            &JoinType::RightMark,
3232            NullEquality::NullEqualsNothing,
3233            task_ctx,
3234        )
3235        .await?;
3236
3237        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3238
3239        let expected = [
3240            "+----+----+----+-------+",
3241            "| a2 | b1 | c2 | mark  |",
3242            "+----+----+----+-------+",
3243            "| 10 | 4  | 70 | true  |",
3244            "| 20 | 5  | 80 | true  |",
3245            "| 30 | 6  | 90 | false |",
3246            "+----+----+----+-------+",
3247        ];
3248        assert_batches_sorted_eq!(expected, &batches);
3249
3250        assert_join_metrics!(metrics, 3);
3251
3252        Ok(())
3253    }
3254
3255    #[apply(batch_sizes)]
3256    #[tokio::test]
3257    async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3258        let task_ctx = prepare_task_ctx(batch_size);
3259        let left = build_table(
3260            ("a1", &vec![1, 2, 3]),
3261            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3262            ("c1", &vec![7, 8, 9]),
3263        );
3264        let right = build_table(
3265            ("a2", &vec![10, 20, 30, 40]),
3266            ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left
3267            ("c2", &vec![60, 70, 80, 90]),
3268        );
3269        let on = vec![(
3270            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3271            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3272        )];
3273
3274        let (columns, batches, metrics) = partitioned_join_collect(
3275            Arc::clone(&left),
3276            Arc::clone(&right),
3277            on.clone(),
3278            &JoinType::RightMark,
3279            NullEquality::NullEqualsNothing,
3280            task_ctx,
3281        )
3282        .await?;
3283
3284        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3285
3286        let expected = [
3287            "+----+----+----+-------+",
3288            "| a2 | b1 | c2 | mark  |",
3289            "+----+----+----+-------+",
3290            "| 10 | 4  | 60 | true  |",
3291            "| 20 | 4  | 70 | true  |",
3292            "| 30 | 5  | 80 | true  |",
3293            "| 40 | 6  | 90 | false |",
3294            "+----+----+----+-------+",
3295        ];
3296        assert_batches_sorted_eq!(expected, &batches);
3297
3298        assert_join_metrics!(metrics, 4);
3299
3300        Ok(())
3301    }
3302
3303    #[test]
3304    fn join_with_hash_collisions_64() -> Result<()> {
3305        let mut hashmap_left = HashTable::with_capacity(4);
3306        let left = build_table_i32(
3307            ("a", &vec![10, 20]),
3308            ("x", &vec![100, 200]),
3309            ("y", &vec![200, 300]),
3310        );
3311
3312        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3313        let hashes_buff = &mut vec![0; left.num_rows()];
3314        let hashes = create_hashes(
3315            &[Arc::clone(&left.columns()[0])],
3316            &random_state,
3317            hashes_buff,
3318        )?;
3319
3320        // Maps both values to both indices (1 and 2, representing input 0 and 1)
3321        // 0 -> (0, 1)
3322        // 1 -> (0, 2)
3323        // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
3324        hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3325        hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3326
3327        hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3328        hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3329
3330        let next = vec![2, 0];
3331
3332        let right = build_table_i32(
3333            ("a", &vec![10, 20]),
3334            ("b", &vec![0, 0]),
3335            ("c", &vec![30, 40]),
3336        );
3337
3338        // Join key column for both join sides
3339        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3340
3341        let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3342
3343        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3344        let right_keys_values =
3345            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3346        let mut hashes_buffer = vec![0; right.num_rows()];
3347        create_hashes(
3348            &[Arc::clone(&right_keys_values)],
3349            &random_state,
3350            &mut hashes_buffer,
3351        )?;
3352
3353        let (l, r, _) = lookup_join_hashmap(
3354            &join_hash_map,
3355            &[left_keys_values],
3356            &[right_keys_values],
3357            NullEquality::NullEqualsNothing,
3358            &hashes_buffer,
3359            8192,
3360            (0, None),
3361        )?;
3362
3363        let left_ids: UInt64Array = vec![0, 1].into();
3364
3365        let right_ids: UInt32Array = vec![0, 1].into();
3366
3367        assert_eq!(left_ids, l);
3368
3369        assert_eq!(right_ids, r);
3370
3371        Ok(())
3372    }
3373
3374    #[test]
3375    fn join_with_hash_collisions_u32() -> Result<()> {
3376        let mut hashmap_left = HashTable::with_capacity(4);
3377        let left = build_table_i32(
3378            ("a", &vec![10, 20]),
3379            ("x", &vec![100, 200]),
3380            ("y", &vec![200, 300]),
3381        );
3382
3383        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3384        let hashes_buff = &mut vec![0; left.num_rows()];
3385        let hashes = create_hashes(
3386            &[Arc::clone(&left.columns()[0])],
3387            &random_state,
3388            hashes_buff,
3389        )?;
3390
3391        hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3392        hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3393        hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3394        hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3395
3396        let next: Vec<u32> = vec![2, 0];
3397
3398        let right = build_table_i32(
3399            ("a", &vec![10, 20]),
3400            ("b", &vec![0, 0]),
3401            ("c", &vec![30, 40]),
3402        );
3403
3404        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3405
3406        let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3407
3408        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3409        let right_keys_values =
3410            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3411        let mut hashes_buffer = vec![0; right.num_rows()];
3412        create_hashes(
3413            &[Arc::clone(&right_keys_values)],
3414            &random_state,
3415            &mut hashes_buffer,
3416        )?;
3417
3418        let (l, r, _) = lookup_join_hashmap(
3419            &join_hash_map,
3420            &[left_keys_values],
3421            &[right_keys_values],
3422            NullEquality::NullEqualsNothing,
3423            &hashes_buffer,
3424            8192,
3425            (0, None),
3426        )?;
3427
3428        // We still expect to match rows 0 and 1 on both sides
3429        let left_ids: UInt64Array = vec![0, 1].into();
3430        let right_ids: UInt32Array = vec![0, 1].into();
3431
3432        assert_eq!(left_ids, l);
3433        assert_eq!(right_ids, r);
3434
3435        Ok(())
3436    }
3437
3438    #[tokio::test]
3439    async fn join_with_duplicated_column_names() -> Result<()> {
3440        let task_ctx = Arc::new(TaskContext::default());
3441        let left = build_table(
3442            ("a", &vec![1, 2, 3]),
3443            ("b", &vec![4, 5, 7]),
3444            ("c", &vec![7, 8, 9]),
3445        );
3446        let right = build_table(
3447            ("a", &vec![10, 20, 30]),
3448            ("b", &vec![1, 2, 7]),
3449            ("c", &vec![70, 80, 90]),
3450        );
3451        let on = vec![(
3452            // join on a=b so there are duplicate column names on unjoined columns
3453            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3454            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3455        )];
3456
3457        let join = join(
3458            left,
3459            right,
3460            on,
3461            &JoinType::Inner,
3462            NullEquality::NullEqualsNothing,
3463        )?;
3464
3465        let columns = columns(&join.schema());
3466        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3467
3468        let stream = join.execute(0, task_ctx)?;
3469        let batches = common::collect(stream).await?;
3470
3471        allow_duplicates! {
3472            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3473            +---+---+---+----+---+----+
3474            | a | b | c | a  | b | c  |
3475            +---+---+---+----+---+----+
3476            | 1 | 4 | 7 | 10 | 1 | 70 |
3477            | 2 | 5 | 8 | 20 | 2 | 80 |
3478            +---+---+---+----+---+----+
3479                "#);
3480        }
3481
3482        Ok(())
3483    }
3484
3485    fn prepare_join_filter() -> JoinFilter {
3486        let column_indices = vec![
3487            ColumnIndex {
3488                index: 2,
3489                side: JoinSide::Left,
3490            },
3491            ColumnIndex {
3492                index: 2,
3493                side: JoinSide::Right,
3494            },
3495        ];
3496        let intermediate_schema = Schema::new(vec![
3497            Field::new("c", DataType::Int32, true),
3498            Field::new("c", DataType::Int32, true),
3499        ]);
3500        let filter_expression = Arc::new(BinaryExpr::new(
3501            Arc::new(Column::new("c", 0)),
3502            Operator::Gt,
3503            Arc::new(Column::new("c", 1)),
3504        )) as Arc<dyn PhysicalExpr>;
3505
3506        JoinFilter::new(
3507            filter_expression,
3508            column_indices,
3509            Arc::new(intermediate_schema),
3510        )
3511    }
3512
3513    #[apply(batch_sizes)]
3514    #[tokio::test]
3515    async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3516        let task_ctx = prepare_task_ctx(batch_size);
3517        let left = build_table(
3518            ("a", &vec![0, 1, 2, 2]),
3519            ("b", &vec![4, 5, 7, 8]),
3520            ("c", &vec![7, 8, 9, 1]),
3521        );
3522        let right = build_table(
3523            ("a", &vec![10, 20, 30, 40]),
3524            ("b", &vec![2, 2, 3, 4]),
3525            ("c", &vec![7, 5, 6, 4]),
3526        );
3527        let on = vec![(
3528            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3529            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3530        )];
3531        let filter = prepare_join_filter();
3532
3533        let join = join_with_filter(
3534            left,
3535            right,
3536            on,
3537            filter,
3538            &JoinType::Inner,
3539            NullEquality::NullEqualsNothing,
3540        )?;
3541
3542        let columns = columns(&join.schema());
3543        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3544
3545        let stream = join.execute(0, task_ctx)?;
3546        let batches = common::collect(stream).await?;
3547
3548        allow_duplicates! {
3549            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3550            +---+---+---+----+---+---+
3551            | a | b | c | a  | b | c |
3552            +---+---+---+----+---+---+
3553            | 2 | 7 | 9 | 10 | 2 | 7 |
3554            | 2 | 7 | 9 | 20 | 2 | 5 |
3555            +---+---+---+----+---+---+
3556                "#);
3557        }
3558
3559        Ok(())
3560    }
3561
3562    #[apply(batch_sizes)]
3563    #[tokio::test]
3564    async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3565        let task_ctx = prepare_task_ctx(batch_size);
3566        let left = build_table(
3567            ("a", &vec![0, 1, 2, 2]),
3568            ("b", &vec![4, 5, 7, 8]),
3569            ("c", &vec![7, 8, 9, 1]),
3570        );
3571        let right = build_table(
3572            ("a", &vec![10, 20, 30, 40]),
3573            ("b", &vec![2, 2, 3, 4]),
3574            ("c", &vec![7, 5, 6, 4]),
3575        );
3576        let on = vec![(
3577            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3578            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3579        )];
3580        let filter = prepare_join_filter();
3581
3582        let join = join_with_filter(
3583            left,
3584            right,
3585            on,
3586            filter,
3587            &JoinType::Left,
3588            NullEquality::NullEqualsNothing,
3589        )?;
3590
3591        let columns = columns(&join.schema());
3592        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3593
3594        let stream = join.execute(0, task_ctx)?;
3595        let batches = common::collect(stream).await?;
3596
3597        allow_duplicates! {
3598            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3599            +---+---+---+----+---+---+
3600            | a | b | c | a  | b | c |
3601            +---+---+---+----+---+---+
3602            | 0 | 4 | 7 |    |   |   |
3603            | 1 | 5 | 8 |    |   |   |
3604            | 2 | 7 | 9 | 10 | 2 | 7 |
3605            | 2 | 7 | 9 | 20 | 2 | 5 |
3606            | 2 | 8 | 1 |    |   |   |
3607            +---+---+---+----+---+---+
3608                "#);
3609        }
3610
3611        Ok(())
3612    }
3613
3614    #[apply(batch_sizes)]
3615    #[tokio::test]
3616    async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3617        let task_ctx = prepare_task_ctx(batch_size);
3618        let left = build_table(
3619            ("a", &vec![0, 1, 2, 2]),
3620            ("b", &vec![4, 5, 7, 8]),
3621            ("c", &vec![7, 8, 9, 1]),
3622        );
3623        let right = build_table(
3624            ("a", &vec![10, 20, 30, 40]),
3625            ("b", &vec![2, 2, 3, 4]),
3626            ("c", &vec![7, 5, 6, 4]),
3627        );
3628        let on = vec![(
3629            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3630            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3631        )];
3632        let filter = prepare_join_filter();
3633
3634        let join = join_with_filter(
3635            left,
3636            right,
3637            on,
3638            filter,
3639            &JoinType::Right,
3640            NullEquality::NullEqualsNothing,
3641        )?;
3642
3643        let columns = columns(&join.schema());
3644        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3645
3646        let stream = join.execute(0, task_ctx)?;
3647        let batches = common::collect(stream).await?;
3648
3649        allow_duplicates! {
3650            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3651            +---+---+---+----+---+---+
3652            | a | b | c | a  | b | c |
3653            +---+---+---+----+---+---+
3654            |   |   |   | 30 | 3 | 6 |
3655            |   |   |   | 40 | 4 | 4 |
3656            | 2 | 7 | 9 | 10 | 2 | 7 |
3657            | 2 | 7 | 9 | 20 | 2 | 5 |
3658            +---+---+---+----+---+---+
3659                "#);
3660        }
3661
3662        Ok(())
3663    }
3664
3665    #[apply(batch_sizes)]
3666    #[tokio::test]
3667    async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3668        let task_ctx = prepare_task_ctx(batch_size);
3669        let left = build_table(
3670            ("a", &vec![0, 1, 2, 2]),
3671            ("b", &vec![4, 5, 7, 8]),
3672            ("c", &vec![7, 8, 9, 1]),
3673        );
3674        let right = build_table(
3675            ("a", &vec![10, 20, 30, 40]),
3676            ("b", &vec![2, 2, 3, 4]),
3677            ("c", &vec![7, 5, 6, 4]),
3678        );
3679        let on = vec![(
3680            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3681            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3682        )];
3683        let filter = prepare_join_filter();
3684
3685        let join = join_with_filter(
3686            left,
3687            right,
3688            on,
3689            filter,
3690            &JoinType::Full,
3691            NullEquality::NullEqualsNothing,
3692        )?;
3693
3694        let columns = columns(&join.schema());
3695        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3696
3697        let stream = join.execute(0, task_ctx)?;
3698        let batches = common::collect(stream).await?;
3699
3700        let expected = [
3701            "+---+---+---+----+---+---+",
3702            "| a | b | c | a  | b | c |",
3703            "+---+---+---+----+---+---+",
3704            "|   |   |   | 30 | 3 | 6 |",
3705            "|   |   |   | 40 | 4 | 4 |",
3706            "| 2 | 7 | 9 | 10 | 2 | 7 |",
3707            "| 2 | 7 | 9 | 20 | 2 | 5 |",
3708            "| 0 | 4 | 7 |    |   |   |",
3709            "| 1 | 5 | 8 |    |   |   |",
3710            "| 2 | 8 | 1 |    |   |   |",
3711            "+---+---+---+----+---+---+",
3712        ];
3713        assert_batches_sorted_eq!(expected, &batches);
3714
3715        // THIS MIGRATION HALTED DUE TO ISSUE #15312
3716        //allow_duplicates! {
3717        //    assert_snapshot!(batches_to_sort_string(&batches), @r#"
3718        //    +---+---+---+----+---+---+
3719        //    | a | b | c | a  | b | c |
3720        //    +---+---+---+----+---+---+
3721        //    |   |   |   | 30 | 3 | 6 |
3722        //    |   |   |   | 40 | 4 | 4 |
3723        //    | 2 | 7 | 9 | 10 | 2 | 7 |
3724        //    | 2 | 7 | 9 | 20 | 2 | 5 |
3725        //    | 0 | 4 | 7 |    |   |   |
3726        //    | 1 | 5 | 8 |    |   |   |
3727        //    | 2 | 8 | 1 |    |   |   |
3728        //    +---+---+---+----+---+---+
3729        //        "#)
3730        //}
3731
3732        Ok(())
3733    }
3734
3735    /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft
3736    #[tokio::test]
3737    async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3738        let task_ctx = Arc::new(TaskContext::default());
3739        let left = build_table(
3740            ("a1", &vec![1, 2, 3]),
3741            ("b1", &vec![4, 5, 7]),
3742            ("c1", &vec![7, 8, 9]),
3743        );
3744        let right = build_table(
3745            ("a2", &vec![10, 20, 30]),
3746            ("b2", &vec![4, 5, 6]),
3747            ("c2", &vec![70, 80, 90]),
3748        );
3749        let on = vec![(
3750            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3751            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3752        )];
3753
3754        let expected_inner = vec![
3755            "+----+----+----+----+----+----+",
3756            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3757            "+----+----+----+----+----+----+",
3758            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3759            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3760            "+----+----+----+----+----+----+",
3761        ];
3762        let expected_left = vec![
3763            "+----+----+----+----+----+----+",
3764            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3765            "+----+----+----+----+----+----+",
3766            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3767            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3768            "| 3  | 7  | 9  |    |    |    |",
3769            "+----+----+----+----+----+----+",
3770        ];
3771        let expected_right = vec![
3772            "+----+----+----+----+----+----+",
3773            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3774            "+----+----+----+----+----+----+",
3775            "|    |    |    | 30 | 6  | 90 |",
3776            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3777            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3778            "+----+----+----+----+----+----+",
3779        ];
3780        let expected_full = vec![
3781            "+----+----+----+----+----+----+",
3782            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3783            "+----+----+----+----+----+----+",
3784            "|    |    |    | 30 | 6  | 90 |",
3785            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3786            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3787            "| 3  | 7  | 9  |    |    |    |",
3788            "+----+----+----+----+----+----+",
3789        ];
3790        let expected_left_semi = vec![
3791            "+----+----+----+",
3792            "| a1 | b1 | c1 |",
3793            "+----+----+----+",
3794            "| 1  | 4  | 7  |",
3795            "| 2  | 5  | 8  |",
3796            "+----+----+----+",
3797        ];
3798        let expected_left_anti = vec![
3799            "+----+----+----+",
3800            "| a1 | b1 | c1 |",
3801            "+----+----+----+",
3802            "| 3  | 7  | 9  |",
3803            "+----+----+----+",
3804        ];
3805        let expected_right_semi = vec![
3806            "+----+----+----+",
3807            "| a2 | b2 | c2 |",
3808            "+----+----+----+",
3809            "| 10 | 4  | 70 |",
3810            "| 20 | 5  | 80 |",
3811            "+----+----+----+",
3812        ];
3813        let expected_right_anti = vec![
3814            "+----+----+----+",
3815            "| a2 | b2 | c2 |",
3816            "+----+----+----+",
3817            "| 30 | 6  | 90 |",
3818            "+----+----+----+",
3819        ];
3820        let expected_left_mark = vec![
3821            "+----+----+----+-------+",
3822            "| a1 | b1 | c1 | mark  |",
3823            "+----+----+----+-------+",
3824            "| 1  | 4  | 7  | true  |",
3825            "| 2  | 5  | 8  | true  |",
3826            "| 3  | 7  | 9  | false |",
3827            "+----+----+----+-------+",
3828        ];
3829        let expected_right_mark = vec![
3830            "+----+----+----+-------+",
3831            "| a2 | b2 | c2 | mark  |",
3832            "+----+----+----+-------+",
3833            "| 10 | 4  | 70 | true  |",
3834            "| 20 | 5  | 80 | true  |",
3835            "| 30 | 6  | 90 | false |",
3836            "+----+----+----+-------+",
3837        ];
3838
3839        let test_cases = vec![
3840            (JoinType::Inner, expected_inner),
3841            (JoinType::Left, expected_left),
3842            (JoinType::Right, expected_right),
3843            (JoinType::Full, expected_full),
3844            (JoinType::LeftSemi, expected_left_semi),
3845            (JoinType::LeftAnti, expected_left_anti),
3846            (JoinType::RightSemi, expected_right_semi),
3847            (JoinType::RightAnti, expected_right_anti),
3848            (JoinType::LeftMark, expected_left_mark),
3849            (JoinType::RightMark, expected_right_mark),
3850        ];
3851
3852        for (join_type, expected) in test_cases {
3853            let (_, batches, metrics) = join_collect_with_partition_mode(
3854                Arc::clone(&left),
3855                Arc::clone(&right),
3856                on.clone(),
3857                &join_type,
3858                PartitionMode::CollectLeft,
3859                NullEquality::NullEqualsNothing,
3860                Arc::clone(&task_ctx),
3861            )
3862            .await?;
3863            assert_batches_sorted_eq!(expected, &batches);
3864            assert_join_metrics!(metrics, expected.len() - 4);
3865        }
3866
3867        Ok(())
3868    }
3869
3870    #[tokio::test]
3871    async fn join_date32() -> Result<()> {
3872        let schema = Arc::new(Schema::new(vec![
3873            Field::new("date", DataType::Date32, false),
3874            Field::new("n", DataType::Int32, false),
3875        ]));
3876
3877        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
3878        let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
3879        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
3880        let left =
3881            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
3882                .unwrap();
3883        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
3884        let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
3885        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
3886        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
3887        let on = vec![(
3888            Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
3889            Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
3890        )];
3891
3892        let join = join(
3893            left,
3894            right,
3895            on,
3896            &JoinType::Inner,
3897            NullEquality::NullEqualsNothing,
3898        )?;
3899
3900        let task_ctx = Arc::new(TaskContext::default());
3901        let stream = join.execute(0, task_ctx)?;
3902        let batches = common::collect(stream).await?;
3903
3904        allow_duplicates! {
3905            assert_snapshot!(batches_to_sort_string(&batches), @r#"
3906            +------------+---+------------+---+
3907            | date       | n | date       | n |
3908            +------------+---+------------+---+
3909            | 2022-04-26 | 2 | 2022-04-26 | 4 |
3910            | 2022-04-26 | 2 | 2022-04-26 | 5 |
3911            | 2022-04-27 | 3 | 2022-04-27 | 6 |
3912            +------------+---+------------+---+
3913                "#);
3914        }
3915
3916        Ok(())
3917    }
3918
3919    #[tokio::test]
3920    async fn join_with_error_right() {
3921        let left = build_table(
3922            ("a1", &vec![1, 2, 3]),
3923            ("b1", &vec![4, 5, 7]),
3924            ("c1", &vec![7, 8, 9]),
3925        );
3926
3927        // right input stream returns one good batch and then one error.
3928        // The error should be returned.
3929        let err = exec_err!("bad data error");
3930        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3931
3932        let on = vec![(
3933            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3934            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3935        )];
3936        let schema = right.schema();
3937        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3938        let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
3939
3940        let join_types = vec![
3941            JoinType::Inner,
3942            JoinType::Left,
3943            JoinType::Right,
3944            JoinType::Full,
3945            JoinType::LeftSemi,
3946            JoinType::LeftAnti,
3947            JoinType::RightSemi,
3948            JoinType::RightAnti,
3949        ];
3950
3951        for join_type in join_types {
3952            let join = join(
3953                Arc::clone(&left),
3954                Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
3955                on.clone(),
3956                &join_type,
3957                NullEquality::NullEqualsNothing,
3958            )
3959            .unwrap();
3960            let task_ctx = Arc::new(TaskContext::default());
3961
3962            let stream = join.execute(0, task_ctx).unwrap();
3963
3964            // Expect that an error is returned
3965            let result_string = common::collect(stream).await.unwrap_err().to_string();
3966            assert!(
3967                result_string.contains("bad data error"),
3968                "actual: {result_string}"
3969            );
3970        }
3971    }
3972
3973    #[tokio::test]
3974    async fn join_split_batch() {
3975        let left = build_table(
3976            ("a1", &vec![1, 2, 3, 4]),
3977            ("b1", &vec![1, 1, 1, 1]),
3978            ("c1", &vec![0, 0, 0, 0]),
3979        );
3980        let right = build_table(
3981            ("a2", &vec![10, 20, 30, 40, 50]),
3982            ("b2", &vec![1, 1, 1, 1, 1]),
3983            ("c2", &vec![0, 0, 0, 0, 0]),
3984        );
3985        let on = vec![(
3986            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3987            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3988        )];
3989
3990        let join_types = vec![
3991            JoinType::Inner,
3992            JoinType::Left,
3993            JoinType::Right,
3994            JoinType::Full,
3995            JoinType::RightSemi,
3996            JoinType::RightAnti,
3997            JoinType::LeftSemi,
3998            JoinType::LeftAnti,
3999        ];
4000        let expected_resultset_records = 20;
4001        let common_result = [
4002            "+----+----+----+----+----+----+",
4003            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4004            "+----+----+----+----+----+----+",
4005            "| 1  | 1  | 0  | 10 | 1  | 0  |",
4006            "| 2  | 1  | 0  | 10 | 1  | 0  |",
4007            "| 3  | 1  | 0  | 10 | 1  | 0  |",
4008            "| 4  | 1  | 0  | 10 | 1  | 0  |",
4009            "| 1  | 1  | 0  | 20 | 1  | 0  |",
4010            "| 2  | 1  | 0  | 20 | 1  | 0  |",
4011            "| 3  | 1  | 0  | 20 | 1  | 0  |",
4012            "| 4  | 1  | 0  | 20 | 1  | 0  |",
4013            "| 1  | 1  | 0  | 30 | 1  | 0  |",
4014            "| 2  | 1  | 0  | 30 | 1  | 0  |",
4015            "| 3  | 1  | 0  | 30 | 1  | 0  |",
4016            "| 4  | 1  | 0  | 30 | 1  | 0  |",
4017            "| 1  | 1  | 0  | 40 | 1  | 0  |",
4018            "| 2  | 1  | 0  | 40 | 1  | 0  |",
4019            "| 3  | 1  | 0  | 40 | 1  | 0  |",
4020            "| 4  | 1  | 0  | 40 | 1  | 0  |",
4021            "| 1  | 1  | 0  | 50 | 1  | 0  |",
4022            "| 2  | 1  | 0  | 50 | 1  | 0  |",
4023            "| 3  | 1  | 0  | 50 | 1  | 0  |",
4024            "| 4  | 1  | 0  | 50 | 1  | 0  |",
4025            "+----+----+----+----+----+----+",
4026        ];
4027        let left_batch = [
4028            "+----+----+----+",
4029            "| a1 | b1 | c1 |",
4030            "+----+----+----+",
4031            "| 1  | 1  | 0  |",
4032            "| 2  | 1  | 0  |",
4033            "| 3  | 1  | 0  |",
4034            "| 4  | 1  | 0  |",
4035            "+----+----+----+",
4036        ];
4037        let right_batch = [
4038            "+----+----+----+",
4039            "| a2 | b2 | c2 |",
4040            "+----+----+----+",
4041            "| 10 | 1  | 0  |",
4042            "| 20 | 1  | 0  |",
4043            "| 30 | 1  | 0  |",
4044            "| 40 | 1  | 0  |",
4045            "| 50 | 1  | 0  |",
4046            "+----+----+----+",
4047        ];
4048        let right_empty = [
4049            "+----+----+----+",
4050            "| a2 | b2 | c2 |",
4051            "+----+----+----+",
4052            "+----+----+----+",
4053        ];
4054        let left_empty = [
4055            "+----+----+----+",
4056            "| a1 | b1 | c1 |",
4057            "+----+----+----+",
4058            "+----+----+----+",
4059        ];
4060
4061        // validation of partial join results output for different batch_size setting
4062        for join_type in join_types {
4063            for batch_size in (1..21).rev() {
4064                let task_ctx = prepare_task_ctx(batch_size);
4065
4066                let join = join(
4067                    Arc::clone(&left),
4068                    Arc::clone(&right),
4069                    on.clone(),
4070                    &join_type,
4071                    NullEquality::NullEqualsNothing,
4072                )
4073                .unwrap();
4074
4075                let stream = join.execute(0, task_ctx).unwrap();
4076                let batches = common::collect(stream).await.unwrap();
4077
4078                // For inner/right join expected batch count equals dev_ceil result,
4079                // as there is no need to append non-joined build side data.
4080                // For other join types it'll be div_ceil + 1 -- for additional batch
4081                // containing not visited build side rows (empty in this test case).
4082                let expected_batch_count = match join_type {
4083                    JoinType::Inner
4084                    | JoinType::Right
4085                    | JoinType::RightSemi
4086                    | JoinType::RightAnti => {
4087                        div_ceil(expected_resultset_records, batch_size)
4088                    }
4089                    _ => div_ceil(expected_resultset_records, batch_size) + 1,
4090                };
4091                assert_eq!(
4092                    batches.len(),
4093                    expected_batch_count,
4094                    "expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4095                );
4096
4097                let expected = match join_type {
4098                    JoinType::RightSemi => right_batch.to_vec(),
4099                    JoinType::RightAnti => right_empty.to_vec(),
4100                    JoinType::LeftSemi => left_batch.to_vec(),
4101                    JoinType::LeftAnti => left_empty.to_vec(),
4102                    _ => common_result.to_vec(),
4103                };
4104                assert_batches_eq!(expected, &batches);
4105            }
4106        }
4107    }
4108
4109    #[tokio::test]
4110    async fn single_partition_join_overallocation() -> Result<()> {
4111        let left = build_table(
4112            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4113            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4114            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4115        );
4116        let right = build_table(
4117            ("a2", &vec![10, 11]),
4118            ("b2", &vec![12, 13]),
4119            ("c2", &vec![14, 15]),
4120        );
4121        let on = vec![(
4122            Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4123            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4124        )];
4125
4126        let join_types = vec![
4127            JoinType::Inner,
4128            JoinType::Left,
4129            JoinType::Right,
4130            JoinType::Full,
4131            JoinType::LeftSemi,
4132            JoinType::LeftAnti,
4133            JoinType::RightSemi,
4134            JoinType::RightAnti,
4135            JoinType::LeftMark,
4136            JoinType::RightMark,
4137        ];
4138
4139        for join_type in join_types {
4140            let runtime = RuntimeEnvBuilder::new()
4141                .with_memory_limit(100, 1.0)
4142                .build_arc()?;
4143            let task_ctx = TaskContext::default().with_runtime(runtime);
4144            let task_ctx = Arc::new(task_ctx);
4145
4146            let join = join(
4147                Arc::clone(&left),
4148                Arc::clone(&right),
4149                on.clone(),
4150                &join_type,
4151                NullEquality::NullEqualsNothing,
4152            )?;
4153
4154            let stream = join.execute(0, task_ctx)?;
4155            let err = common::collect(stream).await.unwrap_err();
4156
4157            // Asserting that operator-level reservation attempting to overallocate
4158            assert_contains!(
4159                err.to_string(),
4160                "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n  HashJoinInput"
4161            );
4162
4163            assert_contains!(
4164                err.to_string(),
4165                "Failed to allocate additional 120.0 B for HashJoinInput"
4166            );
4167        }
4168
4169        Ok(())
4170    }
4171
4172    #[tokio::test]
4173    async fn partitioned_join_overallocation() -> Result<()> {
4174        // Prepare partitioned inputs for HashJoinExec
4175        // No need to adjust partitioning, as execution should fail with `Resources exhausted` error
4176        let left_batch = build_table_i32(
4177            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4178            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4179            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4180        );
4181        let left = TestMemoryExec::try_new_exec(
4182            &[vec![left_batch.clone()], vec![left_batch.clone()]],
4183            left_batch.schema(),
4184            None,
4185        )
4186        .unwrap();
4187        let right_batch = build_table_i32(
4188            ("a2", &vec![10, 11]),
4189            ("b2", &vec![12, 13]),
4190            ("c2", &vec![14, 15]),
4191        );
4192        let right = TestMemoryExec::try_new_exec(
4193            &[vec![right_batch.clone()], vec![right_batch.clone()]],
4194            right_batch.schema(),
4195            None,
4196        )
4197        .unwrap();
4198        let on = vec![(
4199            Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4200            Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4201        )];
4202
4203        let join_types = vec![
4204            JoinType::Inner,
4205            JoinType::Left,
4206            JoinType::Right,
4207            JoinType::Full,
4208            JoinType::LeftSemi,
4209            JoinType::LeftAnti,
4210            JoinType::RightSemi,
4211            JoinType::RightAnti,
4212        ];
4213
4214        for join_type in join_types {
4215            let runtime = RuntimeEnvBuilder::new()
4216                .with_memory_limit(100, 1.0)
4217                .build_arc()?;
4218            let session_config = SessionConfig::default().with_batch_size(50);
4219            let task_ctx = TaskContext::default()
4220                .with_session_config(session_config)
4221                .with_runtime(runtime);
4222            let task_ctx = Arc::new(task_ctx);
4223
4224            let join = HashJoinExec::try_new(
4225                Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4226                Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4227                on.clone(),
4228                None,
4229                &join_type,
4230                None,
4231                PartitionMode::Partitioned,
4232                NullEquality::NullEqualsNothing,
4233            )?;
4234
4235            let stream = join.execute(1, task_ctx)?;
4236            let err = common::collect(stream).await.unwrap_err();
4237
4238            // Asserting that stream-level reservation attempting to overallocate
4239            assert_contains!(
4240                err.to_string(),
4241                "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n  HashJoinInput[1]"
4242
4243            );
4244
4245            assert_contains!(
4246                err.to_string(),
4247                "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4248            );
4249        }
4250
4251        Ok(())
4252    }
4253
4254    fn build_table_struct(
4255        struct_name: &str,
4256        field_name_and_values: (&str, &Vec<Option<i32>>),
4257        nulls: Option<NullBuffer>,
4258    ) -> Arc<dyn ExecutionPlan> {
4259        let (field_name, values) = field_name_and_values;
4260        let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4261        let schema = Schema::new(vec![Field::new(
4262            struct_name,
4263            DataType::Struct(inner_fields.clone().into()),
4264            nulls.is_some(),
4265        )]);
4266
4267        let batch = RecordBatch::try_new(
4268            Arc::new(schema),
4269            vec![Arc::new(StructArray::new(
4270                inner_fields.into(),
4271                vec![Arc::new(Int32Array::from(values.clone()))],
4272                nulls,
4273            ))],
4274        )
4275        .unwrap();
4276        let schema_ref = batch.schema();
4277        TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4278    }
4279
4280    #[tokio::test]
4281    async fn join_on_struct() -> Result<()> {
4282        let task_ctx = Arc::new(TaskContext::default());
4283        let left =
4284            build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4285        let right =
4286            build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4287        let on = vec![(
4288            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4289            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4290        )];
4291
4292        let (columns, batches, metrics) = join_collect(
4293            left,
4294            right,
4295            on,
4296            &JoinType::Inner,
4297            NullEquality::NullEqualsNothing,
4298            task_ctx,
4299        )
4300        .await?;
4301
4302        assert_eq!(columns, vec!["n1", "n2"]);
4303
4304        allow_duplicates! {
4305            assert_snapshot!(batches_to_string(&batches), @r#"
4306            +--------+--------+
4307            | n1     | n2     |
4308            +--------+--------+
4309            | {a: }  | {a: }  |
4310            | {a: 1} | {a: 1} |
4311            | {a: 2} | {a: 2} |
4312            +--------+--------+
4313                "#);
4314        }
4315
4316        assert_join_metrics!(metrics, 3);
4317
4318        Ok(())
4319    }
4320
4321    #[tokio::test]
4322    async fn join_on_struct_with_nulls() -> Result<()> {
4323        let task_ctx = Arc::new(TaskContext::default());
4324        let left =
4325            build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4326        let right =
4327            build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4328        let on = vec![(
4329            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4330            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4331        )];
4332
4333        let (_, batches_null_eq, metrics) = join_collect(
4334            Arc::clone(&left),
4335            Arc::clone(&right),
4336            on.clone(),
4337            &JoinType::Inner,
4338            NullEquality::NullEqualsNull,
4339            Arc::clone(&task_ctx),
4340        )
4341        .await?;
4342
4343        allow_duplicates! {
4344            assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#"
4345            +----+----+
4346            | n1 | n2 |
4347            +----+----+
4348            |    |    |
4349            +----+----+
4350                "#);
4351        }
4352
4353        assert_join_metrics!(metrics, 1);
4354
4355        let (_, batches_null_neq, metrics) = join_collect(
4356            left,
4357            right,
4358            on,
4359            &JoinType::Inner,
4360            NullEquality::NullEqualsNothing,
4361            task_ctx,
4362        )
4363        .await?;
4364
4365        assert_join_metrics!(metrics, 0);
4366
4367        let expected_null_neq =
4368            ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4369        assert_batches_eq!(expected_null_neq, &batches_null_neq);
4370
4371        Ok(())
4372    }
4373
4374    /// Returns the column names on the schema
4375    fn columns(schema: &Schema) -> Vec<String> {
4376        schema.fields().iter().map(|f| f.name().clone()).collect()
4377    }
4378}