Skip to main content

datafusion_physical_plan/joins/hash_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::fmt;
20use std::mem::size_of;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, OnceLock};
23use std::{any::Any, vec};
24
25use crate::ExecutionPlanProperties;
26use crate::execution_plan::{
27    EmissionType, boundedness_from_children, has_same_children_properties,
28    stub_properties,
29};
30use crate::filter_pushdown::{
31    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32    FilterPushdownPropagation,
33};
34use crate::joins::Map;
35use crate::joins::array_map::ArrayMap;
36use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
37use crate::joins::hash_join::shared_bounds::{
38    ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
39};
40use crate::joins::hash_join::stream::{
41    BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
42};
43use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
44use crate::joins::utils::{
45    OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
46    swap_join_projection, update_hash,
47};
48use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
49use crate::metrics::{Count, MetricBuilder};
50use crate::projection::{
51    EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
52    try_pushdown_through_join,
53};
54use crate::repartition::REPARTITION_RANDOM_STATE;
55use crate::spill::get_record_batch_memory_size;
56use crate::{
57    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
58    PlanProperties, SendableRecordBatchStream, Statistics,
59    common::can_project,
60    joins::utils::{
61        BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
62        build_join_schema, check_join_is_valid, estimate_join_statistics,
63        need_produce_result_in_final, symmetric_join_output_partitioning,
64    },
65    metrics::{ExecutionPlanMetricsSet, MetricsSet},
66};
67
68use arrow::array::{ArrayRef, BooleanBufferBuilder};
69use arrow::compute::concat_batches;
70use arrow::datatypes::SchemaRef;
71use arrow::record_batch::RecordBatch;
72use arrow::util::bit_util;
73use arrow_schema::{DataType, Schema};
74use datafusion_common::config::ConfigOptions;
75use datafusion_common::utils::memory::estimate_memory_size;
76use datafusion_common::{
77    JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
78    plan_err, project_schema,
79};
80use datafusion_execution::TaskContext;
81use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
82use datafusion_expr::Accumulator;
83use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
84use datafusion_physical_expr::equivalence::{
85    ProjectionMapping, join_equivalence_properties,
86};
87use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
88use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
89use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
90
91use ahash::RandomState;
92use datafusion_physical_expr_common::physical_expr::fmt_sql;
93use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
94use futures::TryStreamExt;
95use parking_lot::Mutex;
96
97use super::partitioned_hash_eval::SeededRandomState;
98
99/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
100pub(crate) const HASH_JOIN_SEED: SeededRandomState =
101    SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
102
103const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";
104
105#[expect(clippy::too_many_arguments)]
106fn try_create_array_map(
107    bounds: &Option<PartitionBounds>,
108    schema: &SchemaRef,
109    batches: &[RecordBatch],
110    on_left: &[PhysicalExprRef],
111    reservation: &mut MemoryReservation,
112    perfect_hash_join_small_build_threshold: usize,
113    perfect_hash_join_min_key_density: f64,
114    null_equality: NullEquality,
115) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
116    if on_left.len() != 1 {
117        return Ok(None);
118    }
119
120    if null_equality == NullEquality::NullEqualsNull {
121        for batch in batches.iter() {
122            let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
123            if arrays[0].null_count() > 0 {
124                return Ok(None);
125            }
126        }
127    }
128
129    let (min_val, max_val) = if let Some(bounds) = bounds {
130        let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) {
131            (cb.min.clone(), cb.max.clone())
132        } else {
133            return Ok(None);
134        };
135
136        if min_val.is_null() || max_val.is_null() {
137            return Ok(None);
138        }
139
140        if min_val > max_val {
141            return internal_err!("min_val>max_val");
142        }
143
144        if let Some((mi, ma)) =
145            ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val))
146        {
147            (mi, ma)
148        } else {
149            return Ok(None);
150        }
151    } else {
152        return Ok(None);
153    };
154
155    let range = ArrayMap::calculate_range(min_val, max_val);
156    let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
157
158    // TODO: support create ArrayMap<u64>
159    if num_row >= u32::MAX as usize {
160        return Ok(None);
161    }
162
163    // When the key range spans the full integer domain (e.g. i64::MIN to i64::MAX),
164    // range is u64::MAX and `range + 1` below would overflow.
165    if range == usize::MAX as u64 {
166        return Ok(None);
167    }
168
169    let dense_ratio = (num_row as f64) / ((range + 1) as f64);
170
171    if range >= perfect_hash_join_small_build_threshold as u64
172        && dense_ratio <= perfect_hash_join_min_key_density
173    {
174        return Ok(None);
175    }
176
177    let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
178    reservation.try_grow(mem_size)?;
179
180    let batch = concat_batches(schema, batches)?;
181    let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;
182
183    let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
184
185    Ok(Some((array_map, batch, left_values)))
186}
187
188/// HashTable and input data for the left (build side) of a join
189pub(super) struct JoinLeftData {
190    /// The hash table with indices into `batch`
191    /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
192    pub(super) map: Arc<Map>,
193    /// The input rows for the build side
194    batch: RecordBatch,
195    /// The build side on expressions values
196    values: Vec<ArrayRef>,
197    /// Shared bitmap builder for visited left indices
198    visited_indices_bitmap: SharedBitmapBuilder,
199    /// Counter of running probe-threads, potentially
200    /// able to update `visited_indices_bitmap`
201    probe_threads_counter: AtomicUsize,
202    /// We need to keep this field to maintain accurate memory accounting, even though we don't directly use it.
203    /// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
204    /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
205    /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
206    _reservation: MemoryReservation,
207    /// Bounds computed from the build side for dynamic filter pushdown.
208    /// If the partition is empty (no rows) this will be None.
209    /// If the partition has some rows this will be Some with the bounds for each join key column.
210    pub(super) bounds: Option<PartitionBounds>,
211    /// Membership testing strategy for filter pushdown
212    /// Contains either InList values for small build sides or hash table reference for large build sides
213    pub(super) membership: PushdownStrategy,
214    /// Shared atomic flag indicating if any probe partition saw data (for null-aware anti joins)
215    /// This is shared across all probe partitions to provide global knowledge
216    pub(super) probe_side_non_empty: AtomicBool,
217    /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins)
218    pub(super) probe_side_has_null: AtomicBool,
219}
220
221impl JoinLeftData {
222    /// return a reference to the map
223    pub(super) fn map(&self) -> &Map {
224        &self.map
225    }
226
227    /// returns a reference to the build side batch
228    pub(super) fn batch(&self) -> &RecordBatch {
229        &self.batch
230    }
231
232    /// returns a reference to the build side expressions values
233    pub(super) fn values(&self) -> &[ArrayRef] {
234        &self.values
235    }
236
237    /// returns a reference to the visited indices bitmap
238    pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
239        &self.visited_indices_bitmap
240    }
241
242    /// returns a reference to the InList values for filter pushdown
243    pub(super) fn membership(&self) -> &PushdownStrategy {
244        &self.membership
245    }
246
247    /// Decrements the counter of running threads, and returns `true`
248    /// if caller is the last running thread
249    pub(super) fn report_probe_completed(&self) -> bool {
250        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
251    }
252}
253
254/// Helps to build [`HashJoinExec`].
255///
256/// Builder can be created from an existing [`HashJoinExec`] using [`From::from`].
257/// In this case, all its fields are inherited. If a field that affects the node's
258/// properties is modified, they will be automatically recomputed during the build.
259///
260/// # Adding setters
261///
262/// When adding a new setter, it is necessary to ensure that the `preserve_properties`
263/// flag is set to false if modifying the field requires a recomputation of the plan's
264/// properties.
265///
266pub struct HashJoinExecBuilder {
267    exec: HashJoinExec,
268    preserve_properties: bool,
269}
270
271impl HashJoinExecBuilder {
272    /// Make a new [`HashJoinExecBuilder`].
273    pub fn new(
274        left: Arc<dyn ExecutionPlan>,
275        right: Arc<dyn ExecutionPlan>,
276        on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
277        join_type: JoinType,
278    ) -> Self {
279        Self {
280            exec: HashJoinExec {
281                left,
282                right,
283                on,
284                filter: None,
285                join_type,
286                left_fut: Default::default(),
287                random_state: HASH_JOIN_SEED,
288                mode: PartitionMode::Auto,
289                fetch: None,
290                metrics: ExecutionPlanMetricsSet::new(),
291                projection: None,
292                column_indices: vec![],
293                null_equality: NullEquality::NullEqualsNothing,
294                null_aware: false,
295                dynamic_filter: None,
296                // Will be computed at when plan will be built.
297                cache: stub_properties(),
298                join_schema: Arc::new(Schema::empty()),
299            },
300            // As `exec` is initialized with stub properties,
301            // they will be properly computed when plan will be built.
302            preserve_properties: false,
303        }
304    }
305
306    /// Set join type.
307    pub fn with_type(mut self, join_type: JoinType) -> Self {
308        self.exec.join_type = join_type;
309        self.preserve_properties = false;
310        self
311    }
312
313    /// Set projection from the vector.
314    pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
315        self.with_projection_ref(projection.map(Into::into))
316    }
317
318    /// Set projection from the shared reference.
319    pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self {
320        self.exec.projection = projection;
321        self.preserve_properties = false;
322        self
323    }
324
325    /// Set optional filter.
326    pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
327        self.exec.filter = filter;
328        self
329    }
330
331    /// Set expressions to join on.
332    pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) -> Self {
333        self.exec.on = on;
334        self.preserve_properties = false;
335        self
336    }
337
338    /// Set partition mode.
339    pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
340        self.exec.mode = mode;
341        self.preserve_properties = false;
342        self
343    }
344
345    /// Set null equality property.
346    pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
347        self.exec.null_equality = null_equality;
348        self
349    }
350
351    /// Set null aware property.
352    pub fn with_null_aware(mut self, null_aware: bool) -> Self {
353        self.exec.null_aware = null_aware;
354        self
355    }
356
357    /// Set fetch property.
358    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
359        self.exec.fetch = fetch;
360        self
361    }
362
363    /// Require to recompute plan properties.
364    pub fn recompute_properties(mut self) -> Self {
365        self.preserve_properties = false;
366        self
367    }
368
369    /// Replace children.
370    pub fn with_new_children(
371        mut self,
372        mut children: Vec<Arc<dyn ExecutionPlan>>,
373    ) -> Result<Self> {
374        assert_or_internal_err!(
375            children.len() == 2,
376            "wrong number of children passed into `HashJoinExecBuilder`"
377        );
378        self.preserve_properties &= has_same_children_properties(&self.exec, &children)?;
379        self.exec.right = children.swap_remove(1);
380        self.exec.left = children.swap_remove(0);
381        Ok(self)
382    }
383
384    /// Reset runtime state.
385    pub fn reset_state(mut self) -> Self {
386        self.exec.left_fut = Default::default();
387        self.exec.dynamic_filter = None;
388        self.exec.metrics = ExecutionPlanMetricsSet::new();
389        self
390    }
391
392    /// Build result as a dyn execution plan.
393    pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
394        self.build().map(|p| Arc::new(p) as _)
395    }
396
397    /// Build resulting execution plan.
398    pub fn build(self) -> Result<HashJoinExec> {
399        let Self {
400            exec,
401            preserve_properties,
402        } = self;
403
404        // Validate null_aware flag
405        if exec.null_aware {
406            let join_type = exec.join_type();
407            if !matches!(join_type, JoinType::LeftAnti) {
408                return plan_err!(
409                    "null_aware can only be true for LeftAnti joins, got {join_type}"
410                );
411            }
412            let on = exec.on();
413            if on.len() != 1 {
414                return plan_err!(
415                    "null_aware anti join only supports single column join key, got {} columns",
416                    on.len()
417                );
418            }
419        }
420
421        if preserve_properties {
422            return Ok(exec);
423        }
424
425        let HashJoinExec {
426            left,
427            right,
428            on,
429            filter,
430            join_type,
431            left_fut,
432            random_state,
433            mode,
434            metrics,
435            projection,
436            null_equality,
437            null_aware,
438            dynamic_filter,
439            fetch,
440            // Recomputed.
441            join_schema: _,
442            column_indices: _,
443            cache: _,
444        } = exec;
445
446        let left_schema = left.schema();
447        let right_schema = right.schema();
448        if on.is_empty() {
449            return plan_err!("On constraints in HashJoinExec should be non-empty");
450        }
451
452        check_join_is_valid(&left_schema, &right_schema, &on)?;
453        let (join_schema, column_indices) =
454            build_join_schema(&left_schema, &right_schema, &join_type);
455
456        let join_schema = Arc::new(join_schema);
457
458        // Check if the projection is valid.
459        can_project(&join_schema, projection.as_deref())?;
460
461        let cache = HashJoinExec::compute_properties(
462            &left,
463            &right,
464            &join_schema,
465            join_type,
466            &on,
467            mode,
468            projection.as_deref(),
469        )?;
470
471        Ok(HashJoinExec {
472            left,
473            right,
474            on,
475            filter,
476            join_type,
477            join_schema,
478            left_fut,
479            random_state,
480            mode,
481            metrics,
482            projection,
483            column_indices,
484            null_equality,
485            null_aware,
486            cache: Arc::new(cache),
487            dynamic_filter,
488            fetch,
489        })
490    }
491
492    fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
493        self.exec.dynamic_filter = filter;
494        self
495    }
496}
497
498impl From<&HashJoinExec> for HashJoinExecBuilder {
499    fn from(exec: &HashJoinExec) -> Self {
500        Self {
501            exec: HashJoinExec {
502                left: Arc::clone(exec.left()),
503                right: Arc::clone(exec.right()),
504                on: exec.on.clone(),
505                filter: exec.filter.clone(),
506                join_type: exec.join_type,
507                join_schema: Arc::clone(&exec.join_schema),
508                left_fut: Arc::clone(&exec.left_fut),
509                random_state: exec.random_state.clone(),
510                mode: exec.mode,
511                metrics: exec.metrics.clone(),
512                projection: exec.projection.clone(),
513                column_indices: exec.column_indices.clone(),
514                null_equality: exec.null_equality,
515                null_aware: exec.null_aware,
516                cache: Arc::clone(&exec.cache),
517                dynamic_filter: exec.dynamic_filter.clone(),
518                fetch: exec.fetch,
519            },
520            preserve_properties: true,
521        }
522    }
523}
524
525#[expect(rustdoc::private_intra_doc_links)]
526/// Join execution plan: Evaluates equijoin predicates in parallel on multiple
527/// partitions using a hash table and an optional filter list to apply post
528/// join.
529///
530/// # Join Expressions
531///
532/// This implementation is optimized for evaluating equijoin predicates  (
533/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
534/// in [`Self::on`].
535///
536/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
537/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
538/// after the equijoin predicates.
539///
540/// # ArrayMap Optimization
541///
542/// For joins with a single integer-based join key, `HashJoinExec` may use an [`ArrayMap`]
543/// (also known as a "perfect hash join") instead of a general-purpose hash map.
544/// This optimization is used when:
545/// 1. There is exactly one join key.
546/// 2. The join key is an integer type up to 64 bits wide that can be losslessly converted
547///    to `u64` (128-bit integer types such as `i128` and `u128` are not supported).
548/// 3. The range of keys is small enough (controlled by `perfect_hash_join_small_build_threshold`)
549///    OR the keys are sufficiently dense (controlled by `perfect_hash_join_min_key_density`).
550/// 4. build_side.num_rows() < u32::MAX
551/// 5. NullEqualsNothing || (NullEqualsNull && build side doesn't contain null)
552///
553/// See [`try_create_array_map`] for more details.
554///
555/// Note that when using [`PartitionMode::Partitioned`], the build side is split into multiple
556/// partitions. This can cause a dense build side to become sparse within each partition,
557/// potentially disabling this optimization.
558///
559/// For example, consider:
560/// ```sql
561/// SELECT t1.value, t2.value
562/// FROM range(10000) AS t1
563/// JOIN range(10000) AS t2
564///   ON t1.value = t2.value;
565/// ```
566/// With 24 partitions, each partition will only receive a subset of the 10,000 rows.
567/// The first partition might contain values like `3, 10, 18, 39, 43`, which are sparse
568/// relative to the original range, even though the overall data set is dense.
569///
570/// # "Build Side" vs "Probe Side"
571///
572/// HashJoin takes two inputs, which are referred to as the "build" and the
573/// "probe". The build side is the first child, and the probe side is the second
574/// child.
575///
576/// The two inputs are treated differently and it is VERY important that the
577/// *smaller* input is placed on the build side to minimize the work of creating
578/// the hash table.
579///
580/// ```text
581///          ┌───────────┐
582///          │ HashJoin  │
583///          │           │
584///          └───────────┘
585///              │   │
586///        ┌─────┘   └─────┐
587///        ▼               ▼
588/// ┌────────────┐  ┌─────────────┐
589/// │   Input    │  │    Input    │
590/// │    [0]     │  │     [1]     │
591/// └────────────┘  └─────────────┘
592///
593///  "build side"    "probe side"
594/// ```
595///
596/// Execution proceeds in 2 stages:
597///
598/// 1. the **build phase** creates a hash table from the tuples of the build side,
599///    and single concatenated batch containing data from all fetched record batches.
600///    Resulting hash table stores hashed join-key fields for each row as a key, and
601///    indices of corresponding rows in concatenated batch.
602///
603/// When using the standard `JoinHashMap`, hash join uses LIFO data structure as a hash table,
604/// and in order to retain original build-side input order while obtaining data during probe phase,
605/// hash table is updated by iterating batch sequence in reverse order -- it allows to
606/// keep rows with smaller indices "on the top" of hash table, and still maintain
607/// correct indexing for concatenated build-side data batch.
608///
609/// Example of build phase for 3 record batches:
610///
611///
612/// ```text
613///
614///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
615///                                                                         ┌───────────────────────────┐
616///                             hashmap.insert(row-hash, row-idx + offset)  │                      idx  │
617///            ┌───────┐                                                    │          ┌───────┐        │
618///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
619///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
620///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
621///            └───────┘                                                    │          └───────┘        │
622///                                                                         │                           │
623///            ┌───────┐                                                    │          ┌───────┐        │
624///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
625///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
626///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
627///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
628///            │ Row 5 │                                                    │          │ Row 5 │    4   │
629///            └───────┘                                                    │          └───────┘        │
630///                                                                         │                           │
631///            ┌───────┐                                                    │          ┌───────┐        │
632///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
633///   Batch 3  │       │           - hashmap.insert(Row 2, idx 6)           │ Batch 1  │       │        │
634///            │ Row 7 │           - hashmap.insert(Row 1, idx 5)           │          │ Row 2 │    6   │
635///            └───────┘                                                    │          └───────┘        │
636///                                                                         │                           │
637///                                                                         └───────────────────────────┘
638/// ```
639///
640/// 2. the **probe phase** where the tuples of the probe side are streamed
641///    through, checking for matches of the join keys in the hash table.
642///
643/// ```text
644///                 ┌────────────────┐          ┌────────────────┐
645///                 │ ┌─────────┐    │          │ ┌─────────┐    │
646///                 │ │  Hash   │    │          │ │  Hash   │    │
647///                 │ │  Table  │    │          │ │  Table  │    │
648///                 │ │(keys are│    │          │ │(keys are│    │
649///                 │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
650///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
651/// *entire* build  │ │         │    │          │ │         │    │  streamed through, and
652///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
653/// into the hash   │      ▲         │          │          ▲     │   contents of the hash
654///     table       │       HashJoin │          │  HashJoin      │          table
655///                 └──────┼─────────┘          └──────────┼─────┘
656///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
657///            │                                                         │
658///
659///            │                                                         │
660///     ┌────────────┐                                            ┌────────────┐
661///     │RecordBatch │                                            │RecordBatch │
662///     └────────────┘                                            └────────────┘
663///     ┌────────────┐                                            ┌────────────┐
664///     │RecordBatch │                                            │RecordBatch │
665///     └────────────┘                                            └────────────┘
666///           ...                                                       ...
667///     ┌────────────┐                                            ┌────────────┐
668///     │RecordBatch │                                            │RecordBatch │
669///     └────────────┘                                            └────────────┘
670///
671///        build side                                                probe side
672/// ```
673///
674/// # Example "Optimal" Plans
675///
676/// The differences in the inputs means that for classic "Star Schema Query",
677/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
678/// one where there is one large table and several smaller "dimension" tables,
679/// joined on `Foreign Key = Primary Key` predicates.
680///
681/// A "Right Deep Tree" looks like this large table as the probe side on the
682/// lowest join:
683///
684/// ```text
685///             ┌───────────┐
686///             │ HashJoin  │
687///             │           │
688///             └───────────┘
689///                 │   │
690///         ┌───────┘   └──────────┐
691///         ▼                      ▼
692/// ┌───────────────┐        ┌───────────┐
693/// │ small table 1 │        │ HashJoin  │
694/// │  "dimension"  │        │           │
695/// └───────────────┘        └───┬───┬───┘
696///                   ┌──────────┘   └───────┐
697///                   │                      │
698///                   ▼                      ▼
699///           ┌───────────────┐        ┌───────────┐
700///           │ small table 2 │        │ HashJoin  │
701///           │  "dimension"  │        │           │
702///           └───────────────┘        └───┬───┬───┘
703///                               ┌────────┘   └────────┐
704///                               │                     │
705///                               ▼                     ▼
706///                       ┌───────────────┐     ┌───────────────┐
707///                       │ small table 3 │     │  large table  │
708///                       │  "dimension"  │     │    "fact"     │
709///                       └───────────────┘     └───────────────┘
710/// ```
711///
712/// # Clone / Shared State
713///
714/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
715/// loading of the left side with the processing in each output stream.
716/// Therefore it can not be [`Clone`]
717pub struct HashJoinExec {
718    /// left (build) side which gets hashed
719    pub left: Arc<dyn ExecutionPlan>,
720    /// right (probe) side which are filtered by the hash table
721    pub right: Arc<dyn ExecutionPlan>,
722    /// Set of equijoin columns from the relations: `(left_col, right_col)`
723    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
724    /// Filters which are applied while finding matching rows
725    pub filter: Option<JoinFilter>,
726    /// How the join is performed (`OUTER`, `INNER`, etc)
727    pub join_type: JoinType,
728    /// The schema after join. Please be careful when using this schema,
729    /// if there is a projection, the schema isn't the same as the output schema.
730    join_schema: SchemaRef,
731    /// Future that consumes left input and builds the hash table
732    ///
733    /// For CollectLeft partition mode, this structure is *shared* across all output streams.
734    ///
735    /// Each output stream waits on the `OnceAsync` to signal the completion of
736    /// the hash table creation.
737    left_fut: Arc<OnceAsync<JoinLeftData>>,
738    /// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization)
739    random_state: SeededRandomState,
740    /// Partitioning mode to use
741    pub mode: PartitionMode,
742    /// Execution metrics
743    metrics: ExecutionPlanMetricsSet,
744    /// The projection indices of the columns in the output schema of join
745    pub projection: Option<ProjectionRef>,
746    /// Information of index and left / right placement of columns
747    column_indices: Vec<ColumnIndex>,
748    /// The equality null-handling behavior of the join algorithm.
749    pub null_equality: NullEquality,
750    /// Flag to indicate if this is a null-aware anti join
751    pub null_aware: bool,
752    /// Cache holding plan properties like equivalences, output partitioning etc.
753    cache: Arc<PlanProperties>,
754    /// Dynamic filter for pushing down to the probe side
755    /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
756    /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
757    dynamic_filter: Option<HashJoinExecDynamicFilter>,
758    /// Maximum number of rows to return
759    fetch: Option<usize>,
760}
761
762#[derive(Clone)]
763struct HashJoinExecDynamicFilter {
764    /// Dynamic filter that we'll update with the results of the build side once that is done.
765    filter: Arc<DynamicFilterPhysicalExpr>,
766    /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
767    /// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
768    build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
769}
770
771impl fmt::Debug for HashJoinExec {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        f.debug_struct("HashJoinExec")
774            .field("left", &self.left)
775            .field("right", &self.right)
776            .field("on", &self.on)
777            .field("filter", &self.filter)
778            .field("join_type", &self.join_type)
779            .field("join_schema", &self.join_schema)
780            .field("left_fut", &self.left_fut)
781            .field("random_state", &self.random_state)
782            .field("mode", &self.mode)
783            .field("metrics", &self.metrics)
784            .field("projection", &self.projection)
785            .field("column_indices", &self.column_indices)
786            .field("null_equality", &self.null_equality)
787            .field("cache", &self.cache)
788            // Explicitly exclude dynamic_filter to avoid runtime state differences in tests
789            .finish()
790    }
791}
792
793impl EmbeddedProjection for HashJoinExec {
794    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
795        self.with_projection(projection)
796    }
797}
798
799impl HashJoinExec {
800    /// Tries to create a new [`HashJoinExec`].
801    ///
802    /// # Error
803    /// This function errors when it is not possible to join the left and right sides on keys `on`.
804    #[expect(clippy::too_many_arguments)]
805    pub fn try_new(
806        left: Arc<dyn ExecutionPlan>,
807        right: Arc<dyn ExecutionPlan>,
808        on: JoinOn,
809        filter: Option<JoinFilter>,
810        join_type: &JoinType,
811        projection: Option<Vec<usize>>,
812        partition_mode: PartitionMode,
813        null_equality: NullEquality,
814        null_aware: bool,
815    ) -> Result<Self> {
816        HashJoinExecBuilder::new(left, right, on, *join_type)
817            .with_filter(filter)
818            .with_projection(projection)
819            .with_partition_mode(partition_mode)
820            .with_null_equality(null_equality)
821            .with_null_aware(null_aware)
822            .build()
823    }
824
825    /// Create a builder based on the existing [`HashJoinExec`].
826    ///
827    /// Returned builder preserves all existing fields. If a field requiring properties
828    /// recomputation is modified, this will be done automatically during the node build.
829    ///
830    pub fn builder(&self) -> HashJoinExecBuilder {
831        self.into()
832    }
833
834    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
835        // Extract the right-side keys (probe side keys) from the `on` clauses
836        // Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
837        let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
838        // Initialize with a placeholder expression (true) that will be updated when the hash table is built
839        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
840    }
841
842    fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
843        if self.join_type != JoinType::Inner
844            || !config.optimizer.enable_join_dynamic_filter_pushdown
845        {
846            return false;
847        }
848
849        // `preserve_file_partitions` can report Hash partitioning for Hive-style
850        // file groups, but those partitions are not actually hash-distributed.
851        // Partitioned dynamic filters rely on hash routing, so disable them in
852        // this mode to avoid incorrect results. Follow-up work: enable dynamic
853        // filtering for preserve_file_partitioned scans (issue #20195).
854        // https://github.com/apache/datafusion/issues/20195
855        if config.optimizer.preserve_file_partitions > 0
856            && self.mode == PartitionMode::Partitioned
857        {
858            return false;
859        }
860
861        true
862    }
863
864    /// left (build) side which gets hashed
865    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
866        &self.left
867    }
868
869    /// right (probe) side which are filtered by the hash table
870    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
871        &self.right
872    }
873
874    /// Set of common columns used to join on
875    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
876        &self.on
877    }
878
879    /// Filters applied before join output
880    pub fn filter(&self) -> Option<&JoinFilter> {
881        self.filter.as_ref()
882    }
883
884    /// How the join is performed
885    pub fn join_type(&self) -> &JoinType {
886        &self.join_type
887    }
888
889    /// The schema after join. Please be careful when using this schema,
890    /// if there is a projection, the schema isn't the same as the output schema.
891    pub fn join_schema(&self) -> &SchemaRef {
892        &self.join_schema
893    }
894
895    /// The partitioning mode of this hash join
896    pub fn partition_mode(&self) -> &PartitionMode {
897        &self.mode
898    }
899
900    /// Get null_equality
901    pub fn null_equality(&self) -> NullEquality {
902        self.null_equality
903    }
904
905    /// Get the dynamic filter expression for testing purposes.
906    /// Returns `None` if no dynamic filter has been set.
907    ///
908    /// This method is intended for testing only and should not be used in production code.
909    #[doc(hidden)]
910    pub fn dynamic_filter_for_test(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
911        self.dynamic_filter.as_ref().map(|df| &df.filter)
912    }
913
914    /// Calculate order preservation flags for this hash join.
915    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
916        vec![
917            false,
918            matches!(
919                join_type,
920                JoinType::Inner
921                    | JoinType::Right
922                    | JoinType::RightAnti
923                    | JoinType::RightSemi
924                    | JoinType::RightMark
925            ),
926        ]
927    }
928
929    /// Get probe side information for the hash join.
930    pub fn probe_side() -> JoinSide {
931        // In current implementation right side is always probe side.
932        JoinSide::Right
933    }
934
935    /// Return whether the join contains a projection
936    pub fn contains_projection(&self) -> bool {
937        self.projection.is_some()
938    }
939
940    /// Return new instance of [HashJoinExec] with the given projection.
941    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
942        let projection = projection.map(Into::into);
943        //  check if the projection is valid
944        can_project(&self.schema(), projection.as_deref())?;
945        let projection =
946            combine_projections(projection.as_ref(), self.projection.as_ref())?;
947        self.builder().with_projection_ref(projection).build()
948    }
949
950    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
951    fn compute_properties(
952        left: &Arc<dyn ExecutionPlan>,
953        right: &Arc<dyn ExecutionPlan>,
954        schema: &SchemaRef,
955        join_type: JoinType,
956        on: JoinOnRef,
957        mode: PartitionMode,
958        projection: Option<&[usize]>,
959    ) -> Result<PlanProperties> {
960        // Calculate equivalence properties:
961        let mut eq_properties = join_equivalence_properties(
962            left.equivalence_properties().clone(),
963            right.equivalence_properties().clone(),
964            &join_type,
965            Arc::clone(schema),
966            &Self::maintains_input_order(join_type),
967            Some(Self::probe_side()),
968            on,
969        )?;
970
971        let mut output_partitioning = match mode {
972            PartitionMode::CollectLeft => {
973                asymmetric_join_output_partitioning(left, right, &join_type)?
974            }
975            PartitionMode::Auto => Partitioning::UnknownPartitioning(
976                right.output_partitioning().partition_count(),
977            ),
978            PartitionMode::Partitioned => {
979                symmetric_join_output_partitioning(left, right, &join_type)?
980            }
981        };
982
983        let emission_type = if left.boundedness().is_unbounded() {
984            EmissionType::Final
985        } else if right.pipeline_behavior() == EmissionType::Incremental {
986            match join_type {
987                // If we only need to generate matched rows from the probe side,
988                // we can emit rows incrementally.
989                JoinType::Inner
990                | JoinType::LeftSemi
991                | JoinType::RightSemi
992                | JoinType::Right
993                | JoinType::RightAnti
994                | JoinType::RightMark => EmissionType::Incremental,
995                // If we need to generate unmatched rows from the *build side*,
996                // we need to emit them at the end.
997                JoinType::Left
998                | JoinType::LeftAnti
999                | JoinType::LeftMark
1000                | JoinType::Full => EmissionType::Both,
1001            }
1002        } else {
1003            right.pipeline_behavior()
1004        };
1005
1006        // If contains projection, update the PlanProperties.
1007        if let Some(projection) = projection {
1008            // construct a map from the input expressions to the output expression of the Projection
1009            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
1010            let out_schema = project_schema(schema, Some(&projection))?;
1011            output_partitioning =
1012                output_partitioning.project(&projection_mapping, &eq_properties);
1013            eq_properties = eq_properties.project(&projection_mapping, out_schema);
1014        }
1015
1016        Ok(PlanProperties::new(
1017            eq_properties,
1018            output_partitioning,
1019            emission_type,
1020            boundedness_from_children([left, right]),
1021        ))
1022    }
1023
1024    /// Returns a new `ExecutionPlan` that computes the same join as this one,
1025    /// with the left and right inputs swapped using the  specified
1026    /// `partition_mode`.
1027    ///
1028    /// # Notes:
1029    ///
1030    /// This function is public so other downstream projects can use it to
1031    /// construct `HashJoinExec` with right side as the build side.
1032    ///
1033    /// For using this interface directly, please refer to below:
1034    ///
1035    /// Hash join execution may require specific input partitioning (for example,
1036    /// the left child may have a single partition while the right child has multiple).
1037    ///
1038    /// Calling this function on join nodes whose children have already been repartitioned
1039    /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning
1040    /// requirements of the hash join. Therefore, ensure you call this function
1041    /// before inserting any repartitioning operators on the join's children.
1042    ///
1043    /// In DataFusion's default SQL interface, this function is used by the `JoinSelection`
1044    /// physical optimizer rule to determine a good join order, which is
1045    /// executed before the `EnforceDistribution` rule (the rule that may
1046    /// insert `RepartitionExec` operators).
1047    pub fn swap_inputs(
1048        &self,
1049        partition_mode: PartitionMode,
1050    ) -> Result<Arc<dyn ExecutionPlan>> {
1051        let left = self.left();
1052        let right = self.right();
1053        let new_join = self
1054            .builder()
1055            .with_type(self.join_type.swap())
1056            .with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
1057            .with_on(
1058                self.on()
1059                    .iter()
1060                    .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
1061                    .collect(),
1062            )
1063            .with_filter(self.filter().map(JoinFilter::swap))
1064            .with_projection(swap_join_projection(
1065                left.schema().fields().len(),
1066                right.schema().fields().len(),
1067                self.projection.as_deref(),
1068                self.join_type(),
1069            ))
1070            .with_partition_mode(partition_mode)
1071            .build()?;
1072        // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
1073        if matches!(
1074            self.join_type(),
1075            JoinType::LeftSemi
1076                | JoinType::RightSemi
1077                | JoinType::LeftAnti
1078                | JoinType::RightAnti
1079                | JoinType::LeftMark
1080                | JoinType::RightMark
1081        ) || self.projection.is_some()
1082        {
1083            Ok(Arc::new(new_join))
1084        } else {
1085            reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
1086        }
1087    }
1088}
1089
1090impl DisplayAs for HashJoinExec {
1091    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
1092        match t {
1093            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1094                let display_filter = self.filter.as_ref().map_or_else(
1095                    || "".to_string(),
1096                    |f| format!(", filter={}", f.expression()),
1097                );
1098                let display_projections = if self.contains_projection() {
1099                    format!(
1100                        ", projection=[{}]",
1101                        self.projection
1102                            .as_ref()
1103                            .unwrap()
1104                            .iter()
1105                            .map(|index| format!(
1106                                "{}@{}",
1107                                self.join_schema.fields().get(*index).unwrap().name(),
1108                                index
1109                            ))
1110                            .collect::<Vec<_>>()
1111                            .join(", ")
1112                    )
1113                } else {
1114                    "".to_string()
1115                };
1116                let display_null_equality =
1117                    if self.null_equality() == NullEquality::NullEqualsNull {
1118                        ", NullsEqual: true"
1119                    } else {
1120                        ""
1121                    };
1122                let display_fetch = self
1123                    .fetch
1124                    .map_or_else(String::new, |f| format!(", fetch={f}"));
1125                let on = self
1126                    .on
1127                    .iter()
1128                    .map(|(c1, c2)| format!("({c1}, {c2})"))
1129                    .collect::<Vec<String>>()
1130                    .join(", ");
1131                write!(
1132                    f,
1133                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
1134                    self.mode,
1135                    self.join_type,
1136                    on,
1137                    display_filter,
1138                    display_projections,
1139                    display_null_equality,
1140                    display_fetch,
1141                )
1142            }
1143            DisplayFormatType::TreeRender => {
1144                let on = self
1145                    .on
1146                    .iter()
1147                    .map(|(c1, c2)| {
1148                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
1149                    })
1150                    .collect::<Vec<String>>()
1151                    .join(", ");
1152
1153                if *self.join_type() != JoinType::Inner {
1154                    writeln!(f, "join_type={:?}", self.join_type)?;
1155                }
1156
1157                writeln!(f, "on={on}")?;
1158
1159                if self.null_equality() == NullEquality::NullEqualsNull {
1160                    writeln!(f, "NullsEqual: true")?;
1161                }
1162
1163                if let Some(filter) = self.filter.as_ref() {
1164                    writeln!(f, "filter={filter}")?;
1165                }
1166
1167                if let Some(fetch) = self.fetch {
1168                    writeln!(f, "fetch={fetch}")?;
1169                }
1170
1171                Ok(())
1172            }
1173        }
1174    }
1175}
1176
1177impl ExecutionPlan for HashJoinExec {
1178    fn name(&self) -> &'static str {
1179        "HashJoinExec"
1180    }
1181
1182    fn as_any(&self) -> &dyn Any {
1183        self
1184    }
1185
1186    fn properties(&self) -> &Arc<PlanProperties> {
1187        &self.cache
1188    }
1189
1190    fn required_input_distribution(&self) -> Vec<Distribution> {
1191        match self.mode {
1192            PartitionMode::CollectLeft => vec![
1193                Distribution::SinglePartition,
1194                Distribution::UnspecifiedDistribution,
1195            ],
1196            PartitionMode::Partitioned => {
1197                let (left_expr, right_expr) = self
1198                    .on
1199                    .iter()
1200                    .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1201                    .unzip();
1202                vec![
1203                    Distribution::HashPartitioned(left_expr),
1204                    Distribution::HashPartitioned(right_expr),
1205                ]
1206            }
1207            PartitionMode::Auto => vec![
1208                Distribution::UnspecifiedDistribution,
1209                Distribution::UnspecifiedDistribution,
1210            ],
1211        }
1212    }
1213
1214    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by
1215    // applying the hash function to convert the join key(s) in each row into a hash value from the
1216    // probe side table in the order they're arranged. The hash value is used to look up corresponding
1217    // entries in the hash table that was constructed from the build side table during the build phase.
1218    //
1219    // Because of the immediate generation of result rows once a match is found,
1220    // the output of the join tends to follow the order in which the rows were read from
1221    // the probe side table. This is simply due to the sequence in which the rows were processed.
1222    // Hence, it appears that the hash join is preserving the order of the probe side.
1223    //
1224    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
1225    // the unmatched rows from the probe side are also kept in order.
1226    // This is because the **`RightAnti`** join is designed to return rows from the right
1227    // (probe side) table that have no match in the left (build side) table. Because the rows
1228    // are processed sequentially in the probe phase, and unmatched rows are directly output
1229    // as results, these results tend to retain the order of the probe side table.
1230    fn maintains_input_order(&self) -> Vec<bool> {
1231        Self::maintains_input_order(self.join_type)
1232    }
1233
1234    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1235        vec![&self.left, &self.right]
1236    }
1237
1238    /// Creates a new HashJoinExec with different children while preserving configuration.
1239    ///
1240    /// This method is called during query optimization when the optimizer creates new
1241    /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
1242    /// rather than cloning the existing one because partitioning may have changed.
1243    fn with_new_children(
1244        self: Arc<Self>,
1245        children: Vec<Arc<dyn ExecutionPlan>>,
1246    ) -> Result<Arc<dyn ExecutionPlan>> {
1247        self.builder().with_new_children(children)?.build_exec()
1248    }
1249
1250    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1251        self.builder().reset_state().build_exec()
1252    }
1253
1254    fn execute(
1255        &self,
1256        partition: usize,
1257        context: Arc<TaskContext>,
1258    ) -> Result<SendableRecordBatchStream> {
1259        let on_left = self
1260            .on
1261            .iter()
1262            .map(|on| Arc::clone(&on.0))
1263            .collect::<Vec<_>>();
1264        let left_partitions = self.left.output_partitioning().partition_count();
1265        let right_partitions = self.right.output_partitioning().partition_count();
1266
1267        assert_or_internal_err!(
1268            self.mode != PartitionMode::Partitioned
1269                || left_partitions == right_partitions,
1270            "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
1271             consider using RepartitionExec"
1272        );
1273
1274        assert_or_internal_err!(
1275            self.mode != PartitionMode::CollectLeft || left_partitions == 1,
1276            "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
1277             consider using CoalescePartitionsExec or the EnforceDistribution rule"
1278        );
1279
1280        // Only enable dynamic filter pushdown if:
1281        // - The session config enables dynamic filter pushdown
1282        // - A dynamic filter exists
1283        // - At least one consumer is holding a reference to it, this avoids expensive filter
1284        //   computation when disabled or when no consumer will use it.
1285        let enable_dynamic_filter_pushdown = self
1286            .allow_join_dynamic_filter_pushdown(context.session_config().options())
1287            && self
1288                .dynamic_filter
1289                .as_ref()
1290                .map(|df| df.filter.is_used())
1291                .unwrap_or(false);
1292
1293        let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
1294
1295        let array_map_created_count = MetricBuilder::new(&self.metrics)
1296            .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
1297
1298        let left_fut = match self.mode {
1299            PartitionMode::CollectLeft => self.left_fut.try_once(|| {
1300                let left_stream = self.left.execute(0, Arc::clone(&context))?;
1301
1302                let reservation =
1303                    MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
1304
1305                Ok(collect_left_input(
1306                    self.random_state.random_state().clone(),
1307                    left_stream,
1308                    on_left.clone(),
1309                    join_metrics.clone(),
1310                    reservation,
1311                    need_produce_result_in_final(self.join_type),
1312                    self.right().output_partitioning().partition_count(),
1313                    enable_dynamic_filter_pushdown,
1314                    Arc::clone(context.session_config().options()),
1315                    self.null_equality,
1316                    array_map_created_count,
1317                ))
1318            })?,
1319            PartitionMode::Partitioned => {
1320                let left_stream = self.left.execute(partition, Arc::clone(&context))?;
1321
1322                let reservation =
1323                    MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
1324                        .register(context.memory_pool());
1325
1326                OnceFut::new(collect_left_input(
1327                    self.random_state.random_state().clone(),
1328                    left_stream,
1329                    on_left.clone(),
1330                    join_metrics.clone(),
1331                    reservation,
1332                    need_produce_result_in_final(self.join_type),
1333                    1,
1334                    enable_dynamic_filter_pushdown,
1335                    Arc::clone(context.session_config().options()),
1336                    self.null_equality,
1337                    array_map_created_count,
1338                ))
1339            }
1340            PartitionMode::Auto => {
1341                return plan_err!(
1342                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1343                    PartitionMode::Auto
1344                );
1345            }
1346        };
1347
1348        let batch_size = context.session_config().batch_size();
1349
1350        // Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1351        // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1352        let repartition_random_state = REPARTITION_RANDOM_STATE;
1353        let build_accumulator = enable_dynamic_filter_pushdown
1354            .then(|| {
1355                self.dynamic_filter.as_ref().map(|df| {
1356                    let filter = Arc::clone(&df.filter);
1357                    let on_right = self
1358                        .on
1359                        .iter()
1360                        .map(|(_, right_expr)| Arc::clone(right_expr))
1361                        .collect::<Vec<_>>();
1362                    Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1363                        Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1364                            self.mode,
1365                            self.left.as_ref(),
1366                            self.right.as_ref(),
1367                            filter,
1368                            on_right,
1369                            repartition_random_state,
1370                        ))
1371                    })))
1372                })
1373            })
1374            .flatten()
1375            .flatten();
1376
1377        // we have the batches and the hash map with their keys. We can how create a stream
1378        // over the right that uses this information to issue new batches.
1379        let right_stream = self.right.execute(partition, context)?;
1380
1381        // update column indices to reflect the projection
1382        let column_indices_after_projection = match self.projection.as_ref() {
1383            Some(projection) => projection
1384                .iter()
1385                .map(|i| self.column_indices[*i].clone())
1386                .collect(),
1387            None => self.column_indices.clone(),
1388        };
1389
1390        let on_right = self
1391            .on
1392            .iter()
1393            .map(|(_, right_expr)| Arc::clone(right_expr))
1394            .collect::<Vec<_>>();
1395
1396        Ok(Box::pin(HashJoinStream::new(
1397            partition,
1398            self.schema(),
1399            on_right,
1400            self.filter.clone(),
1401            self.join_type,
1402            right_stream,
1403            self.random_state.random_state().clone(),
1404            join_metrics,
1405            column_indices_after_projection,
1406            self.null_equality,
1407            HashJoinStreamState::WaitBuildSide,
1408            BuildSide::Initial(BuildSideInitialState { left_fut }),
1409            batch_size,
1410            vec![],
1411            self.right.output_ordering().is_some(),
1412            build_accumulator,
1413            self.mode,
1414            self.null_aware,
1415            self.fetch,
1416        )))
1417    }
1418
1419    fn metrics(&self) -> Option<MetricsSet> {
1420        Some(self.metrics.clone_inner())
1421    }
1422
1423    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1424        if partition.is_some() {
1425            return Ok(Statistics::new_unknown(&self.schema()));
1426        }
1427        // TODO stats: it is not possible in general to know the output size of joins
1428        // There are some special cases though, for example:
1429        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1430        let stats = estimate_join_statistics(
1431            self.left.partition_statistics(None)?,
1432            self.right.partition_statistics(None)?,
1433            &self.on,
1434            &self.join_type,
1435            &self.join_schema,
1436        )?;
1437        // Project statistics if there is a projection
1438        let stats = stats.project(self.projection.as_ref());
1439        // Apply fetch limit to statistics
1440        stats.with_fetch(self.fetch, 0, 1)
1441    }
1442
1443    /// Tries to push `projection` down through `hash_join`. If possible, performs the
1444    /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
1445    /// as its children. Otherwise, returns `None`.
1446    fn try_swapping_with_projection(
1447        &self,
1448        projection: &ProjectionExec,
1449    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1450        // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
1451        if self.contains_projection() {
1452            return Ok(None);
1453        }
1454
1455        let schema = self.schema();
1456        if let Some(JoinData {
1457            projected_left_child,
1458            projected_right_child,
1459            join_filter,
1460            join_on,
1461        }) = try_pushdown_through_join(
1462            projection,
1463            self.left(),
1464            self.right(),
1465            self.on(),
1466            &schema,
1467            self.filter(),
1468        )? {
1469            self.builder()
1470                .with_new_children(vec![
1471                    Arc::new(projected_left_child),
1472                    Arc::new(projected_right_child),
1473                ])?
1474                .with_on(join_on)
1475                .with_filter(join_filter)
1476                // Returned early if projection is not None
1477                .with_projection(None)
1478                .build_exec()
1479                .map(Some)
1480        } else {
1481            try_embed_projection(projection, self)
1482        }
1483    }
1484
1485    fn gather_filters_for_pushdown(
1486        &self,
1487        phase: FilterPushdownPhase,
1488        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1489        config: &ConfigOptions,
1490    ) -> Result<FilterDescription> {
1491        // This is the physical-plan equivalent of `push_down_all_join` in
1492        // `datafusion/optimizer/src/push_down_filter.rs`. That function uses `lr_is_preserved`
1493        // to decide which parent predicates can be pushed past a logical join to its children,
1494        // then checks column references to route each predicate to the correct side.
1495        //
1496        // We apply the same two-level logic here:
1497        // 1. `lr_is_preserved` gates whether a side is eligible at all.
1498        // 2. For each filter, we check that all column references belong to the
1499        //    target child (using `column_indices` to map output column positions
1500        //    to join sides). This is critical for correctness: name-based matching
1501        //    alone (as done by `ChildFilterDescription::from_child`) can incorrectly
1502        //    push filters when different join sides have columns with the same name
1503        //    (e.g. nested mark joins both producing "mark" columns).
1504        let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);
1505
1506        // Build the set of allowed column indices for each side
1507        let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
1508            Some(projection) => projection
1509                .iter()
1510                .map(|i| self.column_indices[*i].clone())
1511                .collect(),
1512            None => self.column_indices.clone(),
1513        };
1514
1515        let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new());
1516        column_indices
1517            .iter()
1518            .enumerate()
1519            .for_each(|(output_idx, ci)| {
1520                match ci.side {
1521                    JoinSide::Left => left_allowed.insert(output_idx),
1522                    JoinSide::Right => right_allowed.insert(output_idx),
1523                    // Mark columns - don't allow pushdown to either side
1524                    JoinSide::None => false,
1525                };
1526            });
1527
1528        // For semi/anti joins, the non-preserved side's columns are not in the
1529        // output, but filters on join key columns can still be pushed there.
1530        // We find output columns that are join keys on the preserved side and
1531        // add their output indices to the non-preserved side's allowed set.
1532        // The name-based remap in FilterRemapper will then match them to the
1533        // corresponding column in the non-preserved child's schema.
1534        match self.join_type {
1535            JoinType::LeftSemi | JoinType::LeftAnti => {
1536                let left_key_indices: HashSet<usize> = self
1537                    .on
1538                    .iter()
1539                    .filter_map(|(left_key, _)| {
1540                        left_key
1541                            .as_any()
1542                            .downcast_ref::<Column>()
1543                            .map(|c| c.index())
1544                    })
1545                    .collect();
1546                for (output_idx, ci) in column_indices.iter().enumerate() {
1547                    if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) {
1548                        right_allowed.insert(output_idx);
1549                    }
1550                }
1551            }
1552            JoinType::RightSemi | JoinType::RightAnti => {
1553                let right_key_indices: HashSet<usize> = self
1554                    .on
1555                    .iter()
1556                    .filter_map(|(_, right_key)| {
1557                        right_key
1558                            .as_any()
1559                            .downcast_ref::<Column>()
1560                            .map(|c| c.index())
1561                    })
1562                    .collect();
1563                for (output_idx, ci) in column_indices.iter().enumerate() {
1564                    if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index)
1565                    {
1566                        left_allowed.insert(output_idx);
1567                    }
1568                }
1569            }
1570            _ => {}
1571        }
1572
1573        let left_child = if left_preserved {
1574            ChildFilterDescription::from_child_with_allowed_indices(
1575                &parent_filters,
1576                left_allowed,
1577                self.left(),
1578            )?
1579        } else {
1580            ChildFilterDescription::all_unsupported(&parent_filters)
1581        };
1582
1583        let mut right_child = if right_preserved {
1584            ChildFilterDescription::from_child_with_allowed_indices(
1585                &parent_filters,
1586                right_allowed,
1587                self.right(),
1588            )?
1589        } else {
1590            ChildFilterDescription::all_unsupported(&parent_filters)
1591        };
1592
1593        // Add dynamic filters in Post phase if enabled
1594        if phase == FilterPushdownPhase::Post
1595            && self.allow_join_dynamic_filter_pushdown(config)
1596        {
1597            // Add actual dynamic filter to right side (probe side)
1598            let dynamic_filter = Self::create_dynamic_filter(&self.on);
1599            right_child = right_child.with_self_filter(dynamic_filter);
1600        }
1601
1602        Ok(FilterDescription::new()
1603            .with_child(left_child)
1604            .with_child(right_child))
1605    }
1606
1607    fn handle_child_pushdown_result(
1608        &self,
1609        _phase: FilterPushdownPhase,
1610        child_pushdown_result: ChildPushdownResult,
1611        _config: &ConfigOptions,
1612    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1613        let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1614        assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1615        let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1616        // We expect 0 or 1 self filters
1617        if let Some(filter) = right_child_self_filters.first() {
1618            // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1619            // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1620            let predicate = Arc::clone(&filter.predicate);
1621            if let Ok(dynamic_filter) =
1622                Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1623            {
1624                // We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1625                let new_node = self
1626                    .builder()
1627                    .with_dynamic_filter(Some(HashJoinExecDynamicFilter {
1628                        filter: dynamic_filter,
1629                        build_accumulator: OnceLock::new(),
1630                    }))
1631                    .build_exec()?;
1632                result = result.with_updated_node(new_node);
1633            }
1634        }
1635        Ok(result)
1636    }
1637
1638    fn supports_limit_pushdown(&self) -> bool {
1639        // Hash join execution plan does not support pushing limit down through to children
1640        // because the children don't know about the join condition and can't
1641        // determine how many rows to produce
1642        false
1643    }
1644
1645    fn fetch(&self) -> Option<usize> {
1646        self.fetch
1647    }
1648
1649    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1650        self.builder()
1651            .with_fetch(limit)
1652            .build()
1653            .ok()
1654            .map(|exec| Arc::new(exec) as _)
1655    }
1656}
1657
1658/// Determines which sides of a join are "preserved" for filter pushdown.
1659///
1660/// A preserved side means filters on that side's columns can be safely pushed
1661/// below the join. This mirrors the logic in the logical optimizer's
1662/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`.
1663fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
1664    match join_type {
1665        JoinType::Inner => (true, true),
1666        JoinType::Left => (true, false),
1667        JoinType::Right => (false, true),
1668        JoinType::Full => (false, false),
1669        // Filters in semi/anti joins are either on the preserved side, or on join keys,
1670        // as all output columns come from the preserved side. Join key filters can be
1671        // safely pushed down into the other side.
1672        JoinType::LeftSemi | JoinType::LeftAnti => (true, true),
1673        JoinType::RightSemi | JoinType::RightAnti => (true, true),
1674        JoinType::LeftMark => (true, false),
1675        JoinType::RightMark => (false, true),
1676    }
1677}
1678
1679/// Accumulator for collecting min/max bounds from build-side data during hash join.
1680///
1681/// This struct encapsulates the logic for progressively computing column bounds
1682/// (minimum and maximum values) for a specific join key expression as batches
1683/// are processed during the build phase of a hash join.
1684///
1685/// The bounds are used for dynamic filter pushdown optimization, where filters
1686/// based on the actual data ranges can be pushed down to the probe side to
1687/// eliminate unnecessary data early.
1688struct CollectLeftAccumulator {
1689    /// The physical expression to evaluate for each batch
1690    expr: Arc<dyn PhysicalExpr>,
1691    /// Accumulator for tracking the minimum value across all batches
1692    min: MinAccumulator,
1693    /// Accumulator for tracking the maximum value across all batches
1694    max: MaxAccumulator,
1695}
1696
1697impl CollectLeftAccumulator {
1698    /// Creates a new accumulator for tracking bounds of a join key expression.
1699    ///
1700    /// # Arguments
1701    /// * `expr` - The physical expression to track bounds for
1702    /// * `schema` - The schema of the input data
1703    ///
1704    /// # Returns
1705    /// A new `CollectLeftAccumulator` instance configured for the expression's data type
1706    fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1707        /// Recursively unwraps dictionary types to get the underlying value type.
1708        fn dictionary_value_type(data_type: &DataType) -> DataType {
1709            match data_type {
1710                DataType::Dictionary(_, value_type) => {
1711                    dictionary_value_type(value_type.as_ref())
1712                }
1713                _ => data_type.clone(),
1714            }
1715        }
1716
1717        let data_type = expr
1718            .data_type(schema)
1719            // Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
1720            .map(|dt| dictionary_value_type(&dt))?;
1721        Ok(Self {
1722            expr,
1723            min: MinAccumulator::try_new(&data_type)?,
1724            max: MaxAccumulator::try_new(&data_type)?,
1725        })
1726    }
1727
1728    /// Updates the accumulators with values from a new batch.
1729    ///
1730    /// Evaluates the expression on the batch and updates both min and max
1731    /// accumulators with the resulting values.
1732    ///
1733    /// # Arguments
1734    /// * `batch` - The record batch to process
1735    ///
1736    /// # Returns
1737    /// Ok(()) if the update succeeds, or an error if expression evaluation fails
1738    fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1739        let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1740        self.min.update_batch(std::slice::from_ref(&array))?;
1741        self.max.update_batch(std::slice::from_ref(&array))?;
1742        Ok(())
1743    }
1744
1745    /// Finalizes the accumulation and returns the computed bounds.
1746    ///
1747    /// Consumes self to extract the final min and max values from the accumulators.
1748    ///
1749    /// # Returns
1750    /// The `ColumnBounds` containing the minimum and maximum values observed
1751    fn evaluate(mut self) -> Result<ColumnBounds> {
1752        Ok(ColumnBounds::new(
1753            self.min.evaluate()?,
1754            self.max.evaluate()?,
1755        ))
1756    }
1757}
1758
1759/// State for collecting the build-side data during hash join
1760struct BuildSideState {
1761    batches: Vec<RecordBatch>,
1762    num_rows: usize,
1763    metrics: BuildProbeJoinMetrics,
1764    reservation: MemoryReservation,
1765    bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1766}
1767
1768impl BuildSideState {
1769    /// Create a new BuildSideState with optional accumulators for bounds computation
1770    fn try_new(
1771        metrics: BuildProbeJoinMetrics,
1772        reservation: MemoryReservation,
1773        on_left: Vec<Arc<dyn PhysicalExpr>>,
1774        schema: &SchemaRef,
1775        should_compute_dynamic_filters: bool,
1776    ) -> Result<Self> {
1777        Ok(Self {
1778            batches: Vec::new(),
1779            num_rows: 0,
1780            metrics,
1781            reservation,
1782            bounds_accumulators: should_compute_dynamic_filters
1783                .then(|| {
1784                    on_left
1785                        .into_iter()
1786                        .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1787                        .collect::<Result<Vec<_>>>()
1788                })
1789                .transpose()?,
1790        })
1791    }
1792}
1793
1794fn should_collect_min_max_for_perfect_hash(
1795    on_left: &[PhysicalExprRef],
1796    schema: &SchemaRef,
1797) -> Result<bool> {
1798    if on_left.len() != 1 {
1799        return Ok(false);
1800    }
1801
1802    let expr = &on_left[0];
1803    let data_type = expr.data_type(schema)?;
1804    Ok(ArrayMap::is_supported_type(&data_type))
1805}
1806
1807/// Collects all batches from the left (build) side stream and creates a hash map for joining.
1808///
1809/// This function is responsible for:
1810/// 1. Consuming the entire left stream and collecting all batches into memory
1811/// 2. Building a hash map from the join key columns for efficient probe operations
1812/// 3. Computing bounds for dynamic filter pushdown (if enabled)
1813/// 4. Preparing visited indices bitmap for certain join types
1814///
1815/// # Parameters
1816/// * `random_state` - Random state for consistent hashing across partitions
1817/// * `left_stream` - Stream of record batches from the build side
1818/// * `on_left` - Physical expressions for the left side join keys
1819/// * `metrics` - Metrics collector for tracking memory usage and row counts
1820/// * `reservation` - Memory reservation tracker for the hash table and data
1821/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
1822/// * `probe_threads_count` - Number of threads that will probe this hash table
1823/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering
1824///
1825/// # Dynamic Filter Coordination
1826/// When `should_compute_dynamic_filters` is true, this function computes the min/max bounds
1827/// for each join key column but does NOT update the dynamic filter. Instead, the
1828/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1829/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
1830/// before updating the filter exactly once.
1831///
1832/// # Returns
1833/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
1834/// visited indices bitmap, and computed bounds (if requested).
1835#[expect(clippy::too_many_arguments)]
1836async fn collect_left_input(
1837    random_state: RandomState,
1838    left_stream: SendableRecordBatchStream,
1839    on_left: Vec<PhysicalExprRef>,
1840    metrics: BuildProbeJoinMetrics,
1841    reservation: MemoryReservation,
1842    with_visited_indices_bitmap: bool,
1843    probe_threads_count: usize,
1844    should_compute_dynamic_filters: bool,
1845    config: Arc<ConfigOptions>,
1846    null_equality: NullEquality,
1847    array_map_created_count: Count,
1848) -> Result<JoinLeftData> {
1849    let schema = left_stream.schema();
1850
1851    let should_collect_min_max_for_phj =
1852        should_collect_min_max_for_perfect_hash(&on_left, &schema)?;
1853
1854    let initial = BuildSideState::try_new(
1855        metrics,
1856        reservation,
1857        on_left.clone(),
1858        &schema,
1859        should_compute_dynamic_filters || should_collect_min_max_for_phj,
1860    )?;
1861
1862    let state = left_stream
1863        .try_fold(initial, |mut state, batch| async move {
1864            // Update accumulators if computing bounds
1865            if let Some(ref mut accumulators) = state.bounds_accumulators {
1866                for accumulator in accumulators {
1867                    accumulator.update_batch(&batch)?;
1868                }
1869            }
1870
1871            // Decide if we spill or not
1872            let batch_size = get_record_batch_memory_size(&batch);
1873            // Reserve memory for incoming batch
1874            state.reservation.try_grow(batch_size)?;
1875            // Update metrics
1876            state.metrics.build_mem_used.add(batch_size);
1877            state.metrics.build_input_batches.add(1);
1878            state.metrics.build_input_rows.add(batch.num_rows());
1879            // Update row count
1880            state.num_rows += batch.num_rows();
1881            // Push batch to output
1882            state.batches.push(batch);
1883            Ok(state)
1884        })
1885        .await?;
1886
1887    // Extract fields from state
1888    let BuildSideState {
1889        batches,
1890        num_rows,
1891        metrics,
1892        mut reservation,
1893        bounds_accumulators,
1894    } = state;
1895
1896    // Compute bounds
1897    let mut bounds = match bounds_accumulators {
1898        Some(accumulators) if num_rows > 0 => {
1899            let bounds = accumulators
1900                .into_iter()
1901                .map(CollectLeftAccumulator::evaluate)
1902                .collect::<Result<Vec<_>>>()?;
1903            Some(PartitionBounds::new(bounds))
1904        }
1905        _ => None,
1906    };
1907
1908    let (join_hash_map, batch, left_values) =
1909        if let Some((array_map, batch, left_value)) = try_create_array_map(
1910            &bounds,
1911            &schema,
1912            &batches,
1913            &on_left,
1914            &mut reservation,
1915            config.execution.perfect_hash_join_small_build_threshold,
1916            config.execution.perfect_hash_join_min_key_density,
1917            null_equality,
1918        )? {
1919            array_map_created_count.add(1);
1920            metrics.build_mem_used.add(array_map.size());
1921
1922            (Map::ArrayMap(array_map), batch, left_value)
1923        } else {
1924            // Estimation of memory size, required for hashtable, prior to allocation.
1925            // Final result can be verified using `RawTable.allocation_info()`
1926            let fixed_size_u32 = size_of::<JoinHashMapU32>();
1927            let fixed_size_u64 = size_of::<JoinHashMapU64>();
1928
1929            // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
1930            // `u64` indice variant
1931            // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
1932            let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1933                let estimated_hashtable_size =
1934                    estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1935                reservation.try_grow(estimated_hashtable_size)?;
1936                metrics.build_mem_used.add(estimated_hashtable_size);
1937                Box::new(JoinHashMapU64::with_capacity(num_rows))
1938            } else {
1939                let estimated_hashtable_size =
1940                    estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1941                reservation.try_grow(estimated_hashtable_size)?;
1942                metrics.build_mem_used.add(estimated_hashtable_size);
1943                Box::new(JoinHashMapU32::with_capacity(num_rows))
1944            };
1945
1946            let mut hashes_buffer = Vec::new();
1947            let mut offset = 0;
1948
1949            let batches_iter = batches.iter().rev();
1950
1951            // Updating hashmap starting from the last batch
1952            for batch in batches_iter.clone() {
1953                hashes_buffer.clear();
1954                hashes_buffer.resize(batch.num_rows(), 0);
1955                update_hash(
1956                    &on_left,
1957                    batch,
1958                    &mut *hashmap,
1959                    offset,
1960                    &random_state,
1961                    &mut hashes_buffer,
1962                    0,
1963                    true,
1964                )?;
1965                offset += batch.num_rows();
1966            }
1967
1968            // Merge all batches into a single batch, so we can directly index into the arrays
1969            let batch = concat_batches(&schema, batches_iter.clone())?;
1970
1971            let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
1972
1973            (Map::HashMap(hashmap), batch, left_values)
1974        };
1975
1976    // Reserve additional memory for visited indices bitmap and create shared builder
1977    let visited_indices_bitmap = if with_visited_indices_bitmap {
1978        let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
1979        reservation.try_grow(bitmap_size)?;
1980        metrics.build_mem_used.add(bitmap_size);
1981
1982        let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
1983        bitmap_buffer.append_n(num_rows, false);
1984        bitmap_buffer
1985    } else {
1986        BooleanBufferBuilder::new(0)
1987    };
1988
1989    let map = Arc::new(join_hash_map);
1990
1991    let membership = if num_rows == 0 {
1992        PushdownStrategy::Empty
1993    } else {
1994        // If the build side is small enough we can use IN list pushdown.
1995        // If it's too big we fall back to pushing down a reference to the hash table.
1996        // See `PushdownStrategy` for more details.
1997        let estimated_size = left_values
1998            .iter()
1999            .map(|arr| arr.get_array_memory_size())
2000            .sum::<usize>();
2001        if left_values.is_empty()
2002            || left_values[0].is_empty()
2003            || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
2004            || map.num_of_distinct_key()
2005                > config
2006                    .optimizer
2007                    .hash_join_inlist_pushdown_max_distinct_values
2008        {
2009            PushdownStrategy::Map(Arc::clone(&map))
2010        } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
2011            PushdownStrategy::InList(in_list_values)
2012        } else {
2013            PushdownStrategy::Map(Arc::clone(&map))
2014        }
2015    };
2016
2017    if should_collect_min_max_for_phj && !should_compute_dynamic_filters {
2018        bounds = None;
2019    }
2020
2021    let data = JoinLeftData {
2022        map,
2023        batch,
2024        values: left_values,
2025        visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
2026        probe_threads_counter: AtomicUsize::new(probe_threads_count),
2027        _reservation: reservation,
2028        bounds,
2029        membership,
2030        probe_side_non_empty: AtomicBool::new(false),
2031        probe_side_has_null: AtomicBool::new(false),
2032    };
2033
2034    Ok(data)
2035}
2036
2037#[cfg(test)]
2038mod tests {
2039    use super::*;
2040
2041    fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) {
2042        if use_phj {
2043            assert!(
2044                metrics
2045                    .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2046                    .expect("should have array_map_created_count metrics")
2047                    .as_usize()
2048                    >= 1
2049            );
2050        } else {
2051            assert_eq!(
2052                metrics
2053                    .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2054                    .map(|v| v.as_usize())
2055                    .unwrap_or(0),
2056                0
2057            )
2058        }
2059    }
2060
2061    fn build_schema_and_on() -> Result<(SchemaRef, SchemaRef, JoinOn)> {
2062        let left_schema = Arc::new(Schema::new(vec![
2063            Field::new("a1", DataType::Int32, true),
2064            Field::new("b1", DataType::Int32, true),
2065        ]));
2066        let right_schema = Arc::new(Schema::new(vec![
2067            Field::new("a2", DataType::Int32, true),
2068            Field::new("b1", DataType::Int32, true),
2069        ]));
2070        let on = vec![(
2071            Arc::new(Column::new_with_schema("b1", &left_schema)?) as _,
2072            Arc::new(Column::new_with_schema("b1", &right_schema)?) as _,
2073        )];
2074        Ok((left_schema, right_schema, on))
2075    }
2076
2077    use crate::coalesce_partitions::CoalescePartitionsExec;
2078    use crate::joins::hash_join::stream::lookup_join_hashmap;
2079    use crate::test::{TestMemoryExec, assert_join_metrics};
2080    use crate::{
2081        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2082        test::exec::MockExec,
2083    };
2084
2085    use arrow::array::{
2086        Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
2087    };
2088    use arrow::buffer::NullBuffer;
2089    use arrow::datatypes::{DataType, Field};
2090    use arrow_schema::Schema;
2091    use datafusion_common::hash_utils::create_hashes;
2092    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
2093    use datafusion_common::{
2094        ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
2095        exec_err, internal_err,
2096    };
2097    use datafusion_execution::config::SessionConfig;
2098    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2099    use datafusion_expr::Operator;
2100    use datafusion_physical_expr::PhysicalExpr;
2101    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2102    use hashbrown::HashTable;
2103    use insta::{allow_duplicates, assert_snapshot};
2104    use rstest::*;
2105    use rstest_reuse::*;
2106
2107    fn div_ceil(a: usize, b: usize) -> usize {
2108        a.div_ceil(b)
2109    }
2110
2111    #[template]
2112    #[rstest]
2113    fn hash_join_exec_configs(
2114        #[values(8192, 10, 5, 2, 1)] batch_size: usize,
2115        #[values(true, false)] use_perfect_hash_join_as_possible: bool,
2116    ) {
2117    }
2118
2119    fn prepare_task_ctx(
2120        batch_size: usize,
2121        use_perfect_hash_join_as_possible: bool,
2122    ) -> Arc<TaskContext> {
2123        let mut session_config = SessionConfig::default().with_batch_size(batch_size);
2124
2125        if use_perfect_hash_join_as_possible {
2126            session_config
2127                .options_mut()
2128                .execution
2129                .perfect_hash_join_small_build_threshold = 819200;
2130            session_config
2131                .options_mut()
2132                .execution
2133                .perfect_hash_join_min_key_density = 0.0;
2134        } else {
2135            session_config
2136                .options_mut()
2137                .execution
2138                .perfect_hash_join_small_build_threshold = 0;
2139            session_config
2140                .options_mut()
2141                .execution
2142                .perfect_hash_join_min_key_density = f64::INFINITY;
2143        }
2144        Arc::new(TaskContext::default().with_session_config(session_config))
2145    }
2146
2147    fn build_table(
2148        a: (&str, &Vec<i32>),
2149        b: (&str, &Vec<i32>),
2150        c: (&str, &Vec<i32>),
2151    ) -> Arc<dyn ExecutionPlan> {
2152        let batch = build_table_i32(a, b, c);
2153        let schema = batch.schema();
2154        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2155    }
2156
2157    /// Build a table with two columns supporting nullable values
2158    fn build_table_two_cols(
2159        a: (&str, &Vec<Option<i32>>),
2160        b: (&str, &Vec<Option<i32>>),
2161    ) -> Arc<dyn ExecutionPlan> {
2162        let schema = Arc::new(Schema::new(vec![
2163            Field::new(a.0, DataType::Int32, true),
2164            Field::new(b.0, DataType::Int32, true),
2165        ]));
2166        let batch = RecordBatch::try_new(
2167            Arc::clone(&schema),
2168            vec![
2169                Arc::new(Int32Array::from(a.1.clone())),
2170                Arc::new(Int32Array::from(b.1.clone())),
2171            ],
2172        )
2173        .unwrap();
2174        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2175    }
2176
2177    fn join(
2178        left: Arc<dyn ExecutionPlan>,
2179        right: Arc<dyn ExecutionPlan>,
2180        on: JoinOn,
2181        join_type: &JoinType,
2182        null_equality: NullEquality,
2183    ) -> Result<HashJoinExec> {
2184        HashJoinExec::try_new(
2185            left,
2186            right,
2187            on,
2188            None,
2189            join_type,
2190            None,
2191            PartitionMode::CollectLeft,
2192            null_equality,
2193            false,
2194        )
2195    }
2196
2197    fn join_with_filter(
2198        left: Arc<dyn ExecutionPlan>,
2199        right: Arc<dyn ExecutionPlan>,
2200        on: JoinOn,
2201        filter: JoinFilter,
2202        join_type: &JoinType,
2203        null_equality: NullEquality,
2204    ) -> Result<HashJoinExec> {
2205        HashJoinExec::try_new(
2206            left,
2207            right,
2208            on,
2209            Some(filter),
2210            join_type,
2211            None,
2212            PartitionMode::CollectLeft,
2213            null_equality,
2214            false,
2215        )
2216    }
2217
2218    async fn join_collect(
2219        left: Arc<dyn ExecutionPlan>,
2220        right: Arc<dyn ExecutionPlan>,
2221        on: JoinOn,
2222        join_type: &JoinType,
2223        null_equality: NullEquality,
2224        context: Arc<TaskContext>,
2225    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2226        let join = join(left, right, on, join_type, null_equality)?;
2227        let columns_header = columns(&join.schema());
2228
2229        let stream = join.execute(0, context)?;
2230        let batches = common::collect(stream).await?;
2231        let metrics = join.metrics().unwrap();
2232
2233        Ok((columns_header, batches, metrics))
2234    }
2235
2236    async fn partitioned_join_collect(
2237        left: Arc<dyn ExecutionPlan>,
2238        right: Arc<dyn ExecutionPlan>,
2239        on: JoinOn,
2240        join_type: &JoinType,
2241        null_equality: NullEquality,
2242        context: Arc<TaskContext>,
2243    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2244        join_collect_with_partition_mode(
2245            left,
2246            right,
2247            on,
2248            join_type,
2249            PartitionMode::Partitioned,
2250            null_equality,
2251            context,
2252        )
2253        .await
2254    }
2255
2256    async fn join_collect_with_partition_mode(
2257        left: Arc<dyn ExecutionPlan>,
2258        right: Arc<dyn ExecutionPlan>,
2259        on: JoinOn,
2260        join_type: &JoinType,
2261        partition_mode: PartitionMode,
2262        null_equality: NullEquality,
2263        context: Arc<TaskContext>,
2264    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2265        let partition_count = 4;
2266
2267        let (left_expr, right_expr) = on
2268            .iter()
2269            .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
2270            .unzip();
2271
2272        let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2273            PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
2274            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2275                left,
2276                Partitioning::Hash(left_expr, partition_count),
2277            )?),
2278            PartitionMode::Auto => {
2279                return internal_err!("Unexpected PartitionMode::Auto in join tests");
2280            }
2281        };
2282
2283        let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2284            PartitionMode::CollectLeft => {
2285                let partition_column_name = right.schema().field(0).name().clone();
2286                let partition_expr = vec![Arc::new(Column::new_with_schema(
2287                    &partition_column_name,
2288                    &right.schema(),
2289                )?) as _];
2290                Arc::new(RepartitionExec::try_new(
2291                    right,
2292                    Partitioning::Hash(partition_expr, partition_count),
2293                )?) as _
2294            }
2295            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2296                right,
2297                Partitioning::Hash(right_expr, partition_count),
2298            )?),
2299            PartitionMode::Auto => {
2300                return internal_err!("Unexpected PartitionMode::Auto in join tests");
2301            }
2302        };
2303
2304        let join = HashJoinExec::try_new(
2305            left_repartitioned,
2306            right_repartitioned,
2307            on,
2308            None,
2309            join_type,
2310            None,
2311            partition_mode,
2312            null_equality,
2313            false,
2314        )?;
2315
2316        let columns = columns(&join.schema());
2317
2318        let mut batches = vec![];
2319        for i in 0..partition_count {
2320            let stream = join.execute(i, Arc::clone(&context))?;
2321            let more_batches = common::collect(stream).await?;
2322            batches.extend(
2323                more_batches
2324                    .into_iter()
2325                    .filter(|b| b.num_rows() > 0)
2326                    .collect::<Vec<_>>(),
2327            );
2328        }
2329        let metrics = join.metrics().unwrap();
2330
2331        Ok((columns, batches, metrics))
2332    }
2333
2334    #[apply(hash_join_exec_configs)]
2335    #[tokio::test]
2336    async fn join_inner_one(
2337        batch_size: usize,
2338        use_perfect_hash_join_as_possible: bool,
2339    ) -> Result<()> {
2340        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2341        let left = build_table(
2342            ("a1", &vec![1, 2, 3]),
2343            ("b1", &vec![4, 5, 5]), // this has a repetition
2344            ("c1", &vec![7, 8, 9]),
2345        );
2346        let right = build_table(
2347            ("a2", &vec![10, 20, 30]),
2348            ("b1", &vec![4, 5, 6]),
2349            ("c2", &vec![70, 80, 90]),
2350        );
2351
2352        let on = vec![(
2353            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2354            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2355        )];
2356
2357        let (columns, batches, metrics) = join_collect(
2358            Arc::clone(&left),
2359            Arc::clone(&right),
2360            on.clone(),
2361            &JoinType::Inner,
2362            NullEquality::NullEqualsNothing,
2363            task_ctx,
2364        )
2365        .await?;
2366
2367        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2368
2369        allow_duplicates! {
2370            // Inner join output is expected to preserve both inputs order
2371            assert_snapshot!(batches_to_string(&batches), @r"
2372            +----+----+----+----+----+----+
2373            | a1 | b1 | c1 | a2 | b1 | c2 |
2374            +----+----+----+----+----+----+
2375            | 1  | 4  | 7  | 10 | 4  | 70 |
2376            | 2  | 5  | 8  | 20 | 5  | 80 |
2377            | 3  | 5  | 9  | 20 | 5  | 80 |
2378            +----+----+----+----+----+----+
2379            ");
2380        }
2381
2382        assert_join_metrics!(metrics, 3);
2383        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2384
2385        Ok(())
2386    }
2387
2388    #[apply(hash_join_exec_configs)]
2389    #[tokio::test]
2390    async fn partitioned_join_inner_one(
2391        batch_size: usize,
2392        use_perfect_hash_join_as_possible: bool,
2393    ) -> Result<()> {
2394        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2395        let left = build_table(
2396            ("a1", &vec![1, 2, 3]),
2397            ("b1", &vec![4, 5, 5]), // this has a repetition
2398            ("c1", &vec![7, 8, 9]),
2399        );
2400        let right = build_table(
2401            ("a2", &vec![10, 20, 30]),
2402            ("b1", &vec![4, 5, 6]),
2403            ("c2", &vec![70, 80, 90]),
2404        );
2405        let on = vec![(
2406            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2407            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2408        )];
2409
2410        let (columns, batches, metrics) = partitioned_join_collect(
2411            Arc::clone(&left),
2412            Arc::clone(&right),
2413            on.clone(),
2414            &JoinType::Inner,
2415            NullEquality::NullEqualsNothing,
2416            task_ctx,
2417        )
2418        .await?;
2419
2420        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2421
2422        allow_duplicates! {
2423            assert_snapshot!(batches_to_sort_string(&batches), @r"
2424            +----+----+----+----+----+----+
2425            | a1 | b1 | c1 | a2 | b1 | c2 |
2426            +----+----+----+----+----+----+
2427            | 1  | 4  | 7  | 10 | 4  | 70 |
2428            | 2  | 5  | 8  | 20 | 5  | 80 |
2429            | 3  | 5  | 9  | 20 | 5  | 80 |
2430            +----+----+----+----+----+----+
2431            ");
2432        }
2433
2434        assert_join_metrics!(metrics, 3);
2435        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2436
2437        Ok(())
2438    }
2439
2440    #[tokio::test]
2441    async fn join_inner_one_no_shared_column_names() -> Result<()> {
2442        let task_ctx = Arc::new(TaskContext::default());
2443        let left = build_table(
2444            ("a1", &vec![1, 2, 3]),
2445            ("b1", &vec![4, 5, 5]), // this has a repetition
2446            ("c1", &vec![7, 8, 9]),
2447        );
2448        let right = build_table(
2449            ("a2", &vec![10, 20, 30]),
2450            ("b2", &vec![4, 5, 6]),
2451            ("c2", &vec![70, 80, 90]),
2452        );
2453        let on = vec![(
2454            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2455            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2456        )];
2457
2458        let (columns, batches, metrics) = join_collect(
2459            left,
2460            right,
2461            on,
2462            &JoinType::Inner,
2463            NullEquality::NullEqualsNothing,
2464            task_ctx,
2465        )
2466        .await?;
2467
2468        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2469
2470        // Inner join output is expected to preserve both inputs order
2471        allow_duplicates! {
2472            assert_snapshot!(batches_to_string(&batches), @r"
2473            +----+----+----+----+----+----+
2474            | a1 | b1 | c1 | a2 | b2 | c2 |
2475            +----+----+----+----+----+----+
2476            | 1  | 4  | 7  | 10 | 4  | 70 |
2477            | 2  | 5  | 8  | 20 | 5  | 80 |
2478            | 3  | 5  | 9  | 20 | 5  | 80 |
2479            +----+----+----+----+----+----+
2480            ");
2481        }
2482
2483        assert_join_metrics!(metrics, 3);
2484
2485        Ok(())
2486    }
2487
2488    #[tokio::test]
2489    async fn join_inner_one_randomly_ordered() -> Result<()> {
2490        let task_ctx = Arc::new(TaskContext::default());
2491        let left = build_table(
2492            ("a1", &vec![0, 3, 2, 1]),
2493            ("b1", &vec![4, 5, 5, 4]),
2494            ("c1", &vec![6, 9, 8, 7]),
2495        );
2496        let right = build_table(
2497            ("a2", &vec![20, 30, 10]),
2498            ("b2", &vec![5, 6, 4]),
2499            ("c2", &vec![80, 90, 70]),
2500        );
2501        let on = vec![(
2502            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2503            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2504        )];
2505
2506        let (columns, batches, metrics) = join_collect(
2507            left,
2508            right,
2509            on,
2510            &JoinType::Inner,
2511            NullEquality::NullEqualsNothing,
2512            task_ctx,
2513        )
2514        .await?;
2515
2516        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2517
2518        // Inner join output is expected to preserve both inputs order
2519        allow_duplicates! {
2520            assert_snapshot!(batches_to_string(&batches), @r"
2521            +----+----+----+----+----+----+
2522            | a1 | b1 | c1 | a2 | b2 | c2 |
2523            +----+----+----+----+----+----+
2524            | 3  | 5  | 9  | 20 | 5  | 80 |
2525            | 2  | 5  | 8  | 20 | 5  | 80 |
2526            | 0  | 4  | 6  | 10 | 4  | 70 |
2527            | 1  | 4  | 7  | 10 | 4  | 70 |
2528            +----+----+----+----+----+----+
2529            ");
2530        }
2531
2532        assert_join_metrics!(metrics, 4);
2533
2534        Ok(())
2535    }
2536
2537    #[apply(hash_join_exec_configs)]
2538    #[tokio::test]
2539    async fn join_inner_two(
2540        batch_size: usize,
2541        use_perfect_hash_join_as_possible: bool,
2542    ) -> Result<()> {
2543        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2544        let left = build_table(
2545            ("a1", &vec![1, 2, 2]),
2546            ("b2", &vec![1, 2, 2]),
2547            ("c1", &vec![7, 8, 9]),
2548        );
2549        let right = build_table(
2550            ("a1", &vec![1, 2, 3]),
2551            ("b2", &vec![1, 2, 2]),
2552            ("c2", &vec![70, 80, 90]),
2553        );
2554        let on = vec![
2555            (
2556                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2557                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2558            ),
2559            (
2560                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2561                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2562            ),
2563        ];
2564
2565        let (columns, batches, metrics) = join_collect(
2566            left,
2567            right,
2568            on,
2569            &JoinType::Inner,
2570            NullEquality::NullEqualsNothing,
2571            task_ctx,
2572        )
2573        .await?;
2574
2575        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2576
2577        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2578            // Expected number of hash table matches = 3
2579            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2580            let mut expected_batch_count = div_ceil(3, batch_size);
2581            if batch_size == 1 {
2582                expected_batch_count += 1;
2583            }
2584            expected_batch_count
2585        } else {
2586            // With hash collisions enabled, all records will match each other
2587            // and filtered later.
2588            div_ceil(9, batch_size)
2589        };
2590
2591        // With batch coalescing, we may have fewer batches than expected
2592        assert!(
2593            batches.len() <= expected_batch_count,
2594            "expected at most {expected_batch_count} batches, got {}",
2595            batches.len()
2596        );
2597
2598        // Inner join output is expected to preserve both inputs order
2599        allow_duplicates! {
2600            assert_snapshot!(batches_to_string(&batches), @r"
2601            +----+----+----+----+----+----+
2602            | a1 | b2 | c1 | a1 | b2 | c2 |
2603            +----+----+----+----+----+----+
2604            | 1  | 1  | 7  | 1  | 1  | 70 |
2605            | 2  | 2  | 8  | 2  | 2  | 80 |
2606            | 2  | 2  | 9  | 2  | 2  | 80 |
2607            +----+----+----+----+----+----+
2608            ");
2609        }
2610
2611        assert_join_metrics!(metrics, 3);
2612
2613        Ok(())
2614    }
2615
2616    /// Test where the left has 2 parts, the right with 1 part => 1 part
2617    #[apply(hash_join_exec_configs)]
2618    #[tokio::test]
2619    async fn join_inner_one_two_parts_left(
2620        batch_size: usize,
2621        use_perfect_hash_join_as_possible: bool,
2622    ) -> Result<()> {
2623        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2624        let batch1 = build_table_i32(
2625            ("a1", &vec![1, 2]),
2626            ("b2", &vec![1, 2]),
2627            ("c1", &vec![7, 8]),
2628        );
2629        let batch2 =
2630            build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2631        let schema = batch1.schema();
2632        let left =
2633            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2634                .unwrap();
2635        let left = Arc::new(CoalescePartitionsExec::new(left));
2636
2637        let right = build_table(
2638            ("a1", &vec![1, 2, 3]),
2639            ("b2", &vec![1, 2, 2]),
2640            ("c2", &vec![70, 80, 90]),
2641        );
2642        let on = vec![
2643            (
2644                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2645                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2646            ),
2647            (
2648                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2649                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2650            ),
2651        ];
2652
2653        let (columns, batches, metrics) = join_collect(
2654            left,
2655            right,
2656            on,
2657            &JoinType::Inner,
2658            NullEquality::NullEqualsNothing,
2659            task_ctx,
2660        )
2661        .await?;
2662
2663        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2664
2665        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2666            // Expected number of hash table matches = 3
2667            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2668            let mut expected_batch_count = div_ceil(3, batch_size);
2669            if batch_size == 1 {
2670                expected_batch_count += 1;
2671            }
2672            expected_batch_count
2673        } else {
2674            // With hash collisions enabled, all records will match each other
2675            // and filtered later.
2676            div_ceil(9, batch_size)
2677        };
2678
2679        // With batch coalescing, we may have fewer batches than expected
2680        assert!(
2681            batches.len() <= expected_batch_count,
2682            "expected at most {expected_batch_count} batches, got {}",
2683            batches.len()
2684        );
2685
2686        // Inner join output is expected to preserve both inputs order
2687        allow_duplicates! {
2688            assert_snapshot!(batches_to_string(&batches), @r"
2689            +----+----+----+----+----+----+
2690            | a1 | b2 | c1 | a1 | b2 | c2 |
2691            +----+----+----+----+----+----+
2692            | 1  | 1  | 7  | 1  | 1  | 70 |
2693            | 2  | 2  | 8  | 2  | 2  | 80 |
2694            | 2  | 2  | 9  | 2  | 2  | 80 |
2695            +----+----+----+----+----+----+
2696            ");
2697        }
2698
2699        assert_join_metrics!(metrics, 3);
2700
2701        Ok(())
2702    }
2703
2704    #[tokio::test]
2705    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2706        let task_ctx = Arc::new(TaskContext::default());
2707        let batch1 = build_table_i32(
2708            ("a1", &vec![0, 3]),
2709            ("b1", &vec![4, 5]),
2710            ("c1", &vec![6, 9]),
2711        );
2712        let batch2 = build_table_i32(
2713            ("a1", &vec![2, 1]),
2714            ("b1", &vec![5, 4]),
2715            ("c1", &vec![8, 7]),
2716        );
2717        let schema = batch1.schema();
2718
2719        let left =
2720            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2721                .unwrap();
2722        let left = Arc::new(CoalescePartitionsExec::new(left));
2723        let right = build_table(
2724            ("a2", &vec![20, 30, 10]),
2725            ("b2", &vec![5, 6, 4]),
2726            ("c2", &vec![80, 90, 70]),
2727        );
2728        let on = vec![(
2729            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2730            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2731        )];
2732
2733        let (columns, batches, metrics) = join_collect(
2734            left,
2735            right,
2736            on,
2737            &JoinType::Inner,
2738            NullEquality::NullEqualsNothing,
2739            task_ctx,
2740        )
2741        .await?;
2742
2743        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2744
2745        // Inner join output is expected to preserve both inputs order
2746        allow_duplicates! {
2747            assert_snapshot!(batches_to_string(&batches), @r"
2748            +----+----+----+----+----+----+
2749            | a1 | b1 | c1 | a2 | b2 | c2 |
2750            +----+----+----+----+----+----+
2751            | 3  | 5  | 9  | 20 | 5  | 80 |
2752            | 2  | 5  | 8  | 20 | 5  | 80 |
2753            | 0  | 4  | 6  | 10 | 4  | 70 |
2754            | 1  | 4  | 7  | 10 | 4  | 70 |
2755            +----+----+----+----+----+----+
2756            ");
2757        }
2758
2759        assert_join_metrics!(metrics, 4);
2760
2761        Ok(())
2762    }
2763
2764    /// Test where the left has 1 part, the right has 2 parts => 2 parts
2765    #[apply(hash_join_exec_configs)]
2766    #[tokio::test]
2767    async fn join_inner_one_two_parts_right(
2768        batch_size: usize,
2769        use_perfect_hash_join_as_possible: bool,
2770    ) -> Result<()> {
2771        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2772        let left = build_table(
2773            ("a1", &vec![1, 2, 3]),
2774            ("b1", &vec![4, 5, 5]), // this has a repetition
2775            ("c1", &vec![7, 8, 9]),
2776        );
2777
2778        let batch1 = build_table_i32(
2779            ("a2", &vec![10, 20]),
2780            ("b1", &vec![4, 6]),
2781            ("c2", &vec![70, 80]),
2782        );
2783        let batch2 =
2784            build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2785        let schema = batch1.schema();
2786        let right =
2787            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2788                .unwrap();
2789
2790        let on = vec![(
2791            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2792            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2793        )];
2794
2795        let join = join(
2796            left,
2797            right,
2798            on,
2799            &JoinType::Inner,
2800            NullEquality::NullEqualsNothing,
2801        )?;
2802
2803        let columns = columns(&join.schema());
2804        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2805
2806        // first part
2807        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2808        let batches = common::collect(stream).await?;
2809
2810        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2811            // Expected number of hash table matches for first right batch = 1
2812            // and additional empty batch for non-joined 20-6-80
2813            let mut expected_batch_count = div_ceil(1, batch_size);
2814            if batch_size == 1 {
2815                expected_batch_count += 1;
2816            }
2817            expected_batch_count
2818        } else {
2819            // With hash collisions enabled, all records will match each other
2820            // and filtered later.
2821            div_ceil(6, batch_size)
2822        };
2823        // With batch coalescing, we may have fewer batches than expected
2824        assert!(
2825            batches.len() <= expected_batch_count,
2826            "expected at most {expected_batch_count} batches, got {}",
2827            batches.len()
2828        );
2829
2830        // Inner join output is expected to preserve both inputs order
2831        allow_duplicates! {
2832            assert_snapshot!(batches_to_string(&batches), @r"
2833            +----+----+----+----+----+----+
2834            | a1 | b1 | c1 | a2 | b1 | c2 |
2835            +----+----+----+----+----+----+
2836            | 1  | 4  | 7  | 10 | 4  | 70 |
2837            +----+----+----+----+----+----+
2838            ");
2839        }
2840
2841        // second part
2842        let stream = join.execute(1, Arc::clone(&task_ctx))?;
2843        let batches = common::collect(stream).await?;
2844
2845        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2846            // Expected number of hash table matches for second right batch = 2
2847            div_ceil(2, batch_size)
2848        } else {
2849            // With hash collisions enabled, all records will match each other
2850            // and filtered later.
2851            div_ceil(3, batch_size)
2852        };
2853        // With batch coalescing, we may have fewer batches than expected
2854        assert!(
2855            batches.len() <= expected_batch_count,
2856            "expected at most {expected_batch_count} batches, got {}",
2857            batches.len()
2858        );
2859
2860        // Inner join output is expected to preserve both inputs order
2861        allow_duplicates! {
2862            assert_snapshot!(batches_to_string(&batches), @r"
2863            +----+----+----+----+----+----+
2864            | a1 | b1 | c1 | a2 | b1 | c2 |
2865            +----+----+----+----+----+----+
2866            | 2  | 5  | 8  | 30 | 5  | 90 |
2867            | 3  | 5  | 9  | 30 | 5  | 90 |
2868            +----+----+----+----+----+----+
2869            ");
2870        }
2871
2872        let metrics = join.metrics().unwrap();
2873        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2874
2875        Ok(())
2876    }
2877
2878    fn build_table_two_batches(
2879        a: (&str, &Vec<i32>),
2880        b: (&str, &Vec<i32>),
2881        c: (&str, &Vec<i32>),
2882    ) -> Arc<dyn ExecutionPlan> {
2883        let batch = build_table_i32(a, b, c);
2884        let schema = batch.schema();
2885        TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2886    }
2887
2888    #[apply(hash_join_exec_configs)]
2889    #[tokio::test]
2890    async fn join_left_multi_batch(
2891        batch_size: usize,
2892        use_perfect_hash_join_as_possible: bool,
2893    ) -> Result<()> {
2894        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2895        let left = build_table(
2896            ("a1", &vec![1, 2, 3]),
2897            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2898            ("c1", &vec![7, 8, 9]),
2899        );
2900        let right = build_table_two_batches(
2901            ("a2", &vec![10, 20, 30]),
2902            ("b1", &vec![4, 5, 6]),
2903            ("c2", &vec![70, 80, 90]),
2904        );
2905        let on = vec![(
2906            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2907            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2908        )];
2909
2910        let join = join(
2911            Arc::clone(&left),
2912            Arc::clone(&right),
2913            on.clone(),
2914            &JoinType::Left,
2915            NullEquality::NullEqualsNothing,
2916        )
2917        .unwrap();
2918
2919        let columns = columns(&join.schema());
2920        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2921
2922        let (_, batches, metrics) = join_collect(
2923            Arc::clone(&left),
2924            Arc::clone(&right),
2925            on.clone(),
2926            &JoinType::Left,
2927            NullEquality::NullEqualsNothing,
2928            task_ctx,
2929        )
2930        .await?;
2931
2932        allow_duplicates! {
2933            assert_snapshot!(batches_to_sort_string(&batches), @r"
2934            +----+----+----+----+----+----+
2935            | a1 | b1 | c1 | a2 | b1 | c2 |
2936            +----+----+----+----+----+----+
2937            | 1  | 4  | 7  | 10 | 4  | 70 |
2938            | 1  | 4  | 7  | 10 | 4  | 70 |
2939            | 2  | 5  | 8  | 20 | 5  | 80 |
2940            | 2  | 5  | 8  | 20 | 5  | 80 |
2941            | 3  | 7  | 9  |    |    |    |
2942            +----+----+----+----+----+----+
2943            ");
2944        }
2945
2946        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2947        return Ok(());
2948    }
2949
2950    #[apply(hash_join_exec_configs)]
2951    #[tokio::test]
2952    async fn join_full_multi_batch(
2953        batch_size: usize,
2954        use_perfect_hash_join_as_possible: bool,
2955    ) {
2956        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2957        let left = build_table(
2958            ("a1", &vec![1, 2, 3]),
2959            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2960            ("c1", &vec![7, 8, 9]),
2961        );
2962        // create two identical batches for the right side
2963        let right = build_table_two_batches(
2964            ("a2", &vec![10, 20, 30]),
2965            ("b2", &vec![4, 5, 6]),
2966            ("c2", &vec![70, 80, 90]),
2967        );
2968        let on = vec![(
2969            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2970            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2971        )];
2972
2973        let join = join(
2974            left,
2975            right,
2976            on,
2977            &JoinType::Full,
2978            NullEquality::NullEqualsNothing,
2979        )
2980        .unwrap();
2981
2982        let columns = columns(&join.schema());
2983        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2984
2985        let stream = join.execute(0, task_ctx).unwrap();
2986        let batches = common::collect(stream).await.unwrap();
2987        let metrics = join.metrics().unwrap();
2988
2989        allow_duplicates! {
2990            assert_snapshot!(batches_to_sort_string(&batches), @r"
2991            +----+----+----+----+----+----+
2992            | a1 | b1 | c1 | a2 | b2 | c2 |
2993            +----+----+----+----+----+----+
2994            |    |    |    | 30 | 6  | 90 |
2995            |    |    |    | 30 | 6  | 90 |
2996            | 1  | 4  | 7  | 10 | 4  | 70 |
2997            | 1  | 4  | 7  | 10 | 4  | 70 |
2998            | 2  | 5  | 8  | 20 | 5  | 80 |
2999            | 2  | 5  | 8  | 20 | 5  | 80 |
3000            | 3  | 7  | 9  |    |    |    |
3001            +----+----+----+----+----+----+
3002            ");
3003        }
3004
3005        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3006    }
3007
3008    #[apply(hash_join_exec_configs)]
3009    #[tokio::test]
3010    async fn join_left_empty_right(
3011        batch_size: usize,
3012        use_perfect_hash_join_as_possible: bool,
3013    ) {
3014        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3015        let left = build_table(
3016            ("a1", &vec![1, 2, 3]),
3017            ("b1", &vec![4, 5, 7]),
3018            ("c1", &vec![7, 8, 9]),
3019        );
3020        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3021        let on = vec![(
3022            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3023            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3024        )];
3025        let schema = right.schema();
3026        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3027        let join = join(
3028            left,
3029            right,
3030            on,
3031            &JoinType::Left,
3032            NullEquality::NullEqualsNothing,
3033        )
3034        .unwrap();
3035
3036        let columns = columns(&join.schema());
3037        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3038
3039        let stream = join.execute(0, task_ctx).unwrap();
3040        let batches = common::collect(stream).await.unwrap();
3041        let metrics = join.metrics().unwrap();
3042
3043        allow_duplicates! {
3044            assert_snapshot!(batches_to_sort_string(&batches), @r"
3045            +----+----+----+----+----+----+
3046            | a1 | b1 | c1 | a2 | b1 | c2 |
3047            +----+----+----+----+----+----+
3048            | 1  | 4  | 7  |    |    |    |
3049            | 2  | 5  | 8  |    |    |    |
3050            | 3  | 7  | 9  |    |    |    |
3051            +----+----+----+----+----+----+
3052            ");
3053        }
3054
3055        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3056    }
3057
3058    #[apply(hash_join_exec_configs)]
3059    #[tokio::test]
3060    async fn join_full_empty_right(
3061        batch_size: usize,
3062        use_perfect_hash_join_as_possible: bool,
3063    ) {
3064        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3065        let left = build_table(
3066            ("a1", &vec![1, 2, 3]),
3067            ("b1", &vec![4, 5, 7]),
3068            ("c1", &vec![7, 8, 9]),
3069        );
3070        let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
3071        let on = vec![(
3072            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3073            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3074        )];
3075        let schema = right.schema();
3076        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3077        let join = join(
3078            left,
3079            right,
3080            on,
3081            &JoinType::Full,
3082            NullEquality::NullEqualsNothing,
3083        )
3084        .unwrap();
3085
3086        let columns = columns(&join.schema());
3087        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3088
3089        let stream = join.execute(0, task_ctx).unwrap();
3090        let batches = common::collect(stream).await.unwrap();
3091        let metrics = join.metrics().unwrap();
3092
3093        allow_duplicates! {
3094            assert_snapshot!(batches_to_sort_string(&batches), @r"
3095            +----+----+----+----+----+----+
3096            | a1 | b1 | c1 | a2 | b2 | c2 |
3097            +----+----+----+----+----+----+
3098            | 1  | 4  | 7  |    |    |    |
3099            | 2  | 5  | 8  |    |    |    |
3100            | 3  | 7  | 9  |    |    |    |
3101            +----+----+----+----+----+----+
3102            ");
3103        }
3104
3105        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3106    }
3107
3108    #[apply(hash_join_exec_configs)]
3109    #[tokio::test]
3110    async fn join_left_one(
3111        batch_size: usize,
3112        use_perfect_hash_join_as_possible: bool,
3113    ) -> Result<()> {
3114        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3115        let left = build_table(
3116            ("a1", &vec![1, 2, 3]),
3117            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3118            ("c1", &vec![7, 8, 9]),
3119        );
3120        let right = build_table(
3121            ("a2", &vec![10, 20, 30]),
3122            ("b1", &vec![4, 5, 6]),
3123            ("c2", &vec![70, 80, 90]),
3124        );
3125        let on = vec![(
3126            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3127            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3128        )];
3129
3130        let (columns, batches, metrics) = join_collect(
3131            Arc::clone(&left),
3132            Arc::clone(&right),
3133            on.clone(),
3134            &JoinType::Left,
3135            NullEquality::NullEqualsNothing,
3136            task_ctx,
3137        )
3138        .await?;
3139
3140        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3141
3142        allow_duplicates! {
3143            assert_snapshot!(batches_to_sort_string(&batches), @r"
3144            +----+----+----+----+----+----+
3145            | a1 | b1 | c1 | a2 | b1 | c2 |
3146            +----+----+----+----+----+----+
3147            | 1  | 4  | 7  | 10 | 4  | 70 |
3148            | 2  | 5  | 8  | 20 | 5  | 80 |
3149            | 3  | 7  | 9  |    |    |    |
3150            +----+----+----+----+----+----+
3151            ");
3152        }
3153
3154        assert_join_metrics!(metrics, 3);
3155        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3156
3157        Ok(())
3158    }
3159
3160    #[apply(hash_join_exec_configs)]
3161    #[tokio::test]
3162    async fn partitioned_join_left_one(
3163        batch_size: usize,
3164        use_perfect_hash_join_as_possible: bool,
3165    ) -> Result<()> {
3166        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3167        let left = build_table(
3168            ("a1", &vec![1, 2, 3]),
3169            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3170            ("c1", &vec![7, 8, 9]),
3171        );
3172        let right = build_table(
3173            ("a2", &vec![10, 20, 30]),
3174            ("b1", &vec![4, 5, 6]),
3175            ("c2", &vec![70, 80, 90]),
3176        );
3177        let on = vec![(
3178            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3179            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3180        )];
3181
3182        let (columns, batches, metrics) = partitioned_join_collect(
3183            Arc::clone(&left),
3184            Arc::clone(&right),
3185            on.clone(),
3186            &JoinType::Left,
3187            NullEquality::NullEqualsNothing,
3188            task_ctx,
3189        )
3190        .await?;
3191
3192        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3193
3194        allow_duplicates! {
3195            assert_snapshot!(batches_to_sort_string(&batches), @r"
3196            +----+----+----+----+----+----+
3197            | a1 | b1 | c1 | a2 | b1 | c2 |
3198            +----+----+----+----+----+----+
3199            | 1  | 4  | 7  | 10 | 4  | 70 |
3200            | 2  | 5  | 8  | 20 | 5  | 80 |
3201            | 3  | 7  | 9  |    |    |    |
3202            +----+----+----+----+----+----+
3203            ");
3204        }
3205
3206        assert_join_metrics!(metrics, 3);
3207        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3208
3209        Ok(())
3210    }
3211
3212    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
3213        // just two line match
3214        // b1 = 10
3215        build_table(
3216            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
3217            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
3218            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
3219        )
3220    }
3221
3222    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
3223        // just two line match
3224        // b2 = 10
3225        build_table(
3226            ("a2", &vec![8, 12, 6, 2, 10, 4]),
3227            ("b2", &vec![8, 10, 6, 2, 10, 4]),
3228            ("c2", &vec![20, 40, 60, 80, 100, 120]),
3229        )
3230    }
3231
3232    #[apply(hash_join_exec_configs)]
3233    #[tokio::test]
3234    async fn join_left_semi(
3235        batch_size: usize,
3236        use_perfect_hash_join_as_possible: bool,
3237    ) -> Result<()> {
3238        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3239        let left = build_semi_anti_left_table();
3240        let right = build_semi_anti_right_table();
3241        // left_table left semi join right_table on left_table.b1 = right_table.b2
3242        let on = vec![(
3243            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3244            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3245        )];
3246
3247        let join = join(
3248            left,
3249            right,
3250            on,
3251            &JoinType::LeftSemi,
3252            NullEquality::NullEqualsNothing,
3253        )?;
3254
3255        let columns = columns(&join.schema());
3256        assert_eq!(columns, vec!["a1", "b1", "c1"]);
3257
3258        let stream = join.execute(0, task_ctx)?;
3259        let batches = common::collect(stream).await?;
3260
3261        // ignore the order
3262        allow_duplicates! {
3263            assert_snapshot!(batches_to_sort_string(&batches), @r"
3264            +----+----+-----+
3265            | a1 | b1 | c1  |
3266            +----+----+-----+
3267            | 11 | 8  | 110 |
3268            | 13 | 10 | 130 |
3269            | 9  | 8  | 90  |
3270            +----+----+-----+
3271            ");
3272        }
3273
3274        let metrics = join.metrics().unwrap();
3275        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3276
3277        Ok(())
3278    }
3279
3280    #[apply(hash_join_exec_configs)]
3281    #[tokio::test]
3282    async fn join_left_semi_with_filter(
3283        batch_size: usize,
3284        use_perfect_hash_join_as_possible: bool,
3285    ) -> Result<()> {
3286        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3287        let left = build_semi_anti_left_table();
3288        let right = build_semi_anti_right_table();
3289
3290        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 10
3291        let on = vec![(
3292            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3293            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3294        )];
3295
3296        let column_indices = vec![ColumnIndex {
3297            index: 0,
3298            side: JoinSide::Right,
3299        }];
3300        let intermediate_schema =
3301            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3302
3303        let filter_expression = Arc::new(BinaryExpr::new(
3304            Arc::new(Column::new("x", 0)),
3305            Operator::NotEq,
3306            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3307        )) as Arc<dyn PhysicalExpr>;
3308
3309        let filter = JoinFilter::new(
3310            filter_expression,
3311            column_indices.clone(),
3312            Arc::new(intermediate_schema.clone()),
3313        );
3314
3315        let join = join_with_filter(
3316            Arc::clone(&left),
3317            Arc::clone(&right),
3318            on.clone(),
3319            filter,
3320            &JoinType::LeftSemi,
3321            NullEquality::NullEqualsNothing,
3322        )?;
3323
3324        let columns_header = columns(&join.schema());
3325        assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
3326
3327        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3328        let batches = common::collect(stream).await?;
3329
3330        allow_duplicates! {
3331            assert_snapshot!(batches_to_sort_string(&batches), @r"
3332            +----+----+-----+
3333            | a1 | b1 | c1  |
3334            +----+----+-----+
3335            | 11 | 8  | 110 |
3336            | 13 | 10 | 130 |
3337            | 9  | 8  | 90  |
3338            +----+----+-----+
3339            ");
3340        }
3341
3342        let metrics = join.metrics().unwrap();
3343        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3344
3345        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10
3346        let filter_expression = Arc::new(BinaryExpr::new(
3347            Arc::new(Column::new("x", 0)),
3348            Operator::Gt,
3349            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3350        )) as Arc<dyn PhysicalExpr>;
3351        let filter = JoinFilter::new(
3352            filter_expression,
3353            column_indices,
3354            Arc::new(intermediate_schema),
3355        );
3356
3357        let join = join_with_filter(
3358            left,
3359            right,
3360            on,
3361            filter,
3362            &JoinType::LeftSemi,
3363            NullEquality::NullEqualsNothing,
3364        )?;
3365
3366        let columns_header = columns(&join.schema());
3367        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3368
3369        let stream = join.execute(0, task_ctx)?;
3370        let batches = common::collect(stream).await?;
3371
3372        allow_duplicates! {
3373            assert_snapshot!(batches_to_sort_string(&batches), @r"
3374            +----+----+-----+
3375            | a1 | b1 | c1  |
3376            +----+----+-----+
3377            | 13 | 10 | 130 |
3378            +----+----+-----+
3379            ");
3380        }
3381
3382        let metrics = join.metrics().unwrap();
3383        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3384
3385        Ok(())
3386    }
3387
3388    #[apply(hash_join_exec_configs)]
3389    #[tokio::test]
3390    async fn join_right_semi(
3391        batch_size: usize,
3392        use_perfect_hash_join_as_possible: bool,
3393    ) -> Result<()> {
3394        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3395        let left = build_semi_anti_left_table();
3396        let right = build_semi_anti_right_table();
3397
3398        // left_table right semi join right_table on left_table.b1 = right_table.b2
3399        let on = vec![(
3400            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3401            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3402        )];
3403
3404        let join = join(
3405            left,
3406            right,
3407            on,
3408            &JoinType::RightSemi,
3409            NullEquality::NullEqualsNothing,
3410        )?;
3411
3412        let columns = columns(&join.schema());
3413        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3414
3415        let stream = join.execute(0, task_ctx)?;
3416        let batches = common::collect(stream).await?;
3417
3418        // RightSemi join output is expected to preserve right input order
3419        allow_duplicates! {
3420            assert_snapshot!(batches_to_string(&batches), @r"
3421            +----+----+-----+
3422            | a2 | b2 | c2  |
3423            +----+----+-----+
3424            | 8  | 8  | 20  |
3425            | 12 | 10 | 40  |
3426            | 10 | 10 | 100 |
3427            +----+----+-----+
3428            ");
3429        }
3430
3431        let metrics = join.metrics().unwrap();
3432        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3433
3434        Ok(())
3435    }
3436
3437    #[apply(hash_join_exec_configs)]
3438    #[tokio::test]
3439    async fn join_right_semi_with_filter(
3440        batch_size: usize,
3441        use_perfect_hash_join_as_possible: bool,
3442    ) -> Result<()> {
3443        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3444        let left = build_semi_anti_left_table();
3445        let right = build_semi_anti_right_table();
3446
3447        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
3448        let on = vec![(
3449            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3450            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3451        )];
3452
3453        let column_indices = vec![ColumnIndex {
3454            index: 0,
3455            side: JoinSide::Left,
3456        }];
3457        let intermediate_schema =
3458            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3459
3460        let filter_expression = Arc::new(BinaryExpr::new(
3461            Arc::new(Column::new("x", 0)),
3462            Operator::NotEq,
3463            Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
3464        )) as Arc<dyn PhysicalExpr>;
3465
3466        let filter = JoinFilter::new(
3467            filter_expression,
3468            column_indices.clone(),
3469            Arc::new(intermediate_schema.clone()),
3470        );
3471
3472        let join = join_with_filter(
3473            Arc::clone(&left),
3474            Arc::clone(&right),
3475            on.clone(),
3476            filter,
3477            &JoinType::RightSemi,
3478            NullEquality::NullEqualsNothing,
3479        )?;
3480
3481        let columns = columns(&join.schema());
3482        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3483
3484        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3485        let batches = common::collect(stream).await?;
3486
3487        // RightSemi join output is expected to preserve right input order
3488        allow_duplicates! {
3489            assert_snapshot!(batches_to_string(&batches), @r"
3490            +----+----+-----+
3491            | a2 | b2 | c2  |
3492            +----+----+-----+
3493            | 8  | 8  | 20  |
3494            | 12 | 10 | 40  |
3495            | 10 | 10 | 100 |
3496            +----+----+-----+
3497            ");
3498        }
3499
3500        let metrics = join.metrics().unwrap();
3501        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3502
3503        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
3504        let filter_expression = Arc::new(BinaryExpr::new(
3505            Arc::new(Column::new("x", 0)),
3506            Operator::Gt,
3507            Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
3508        )) as Arc<dyn PhysicalExpr>;
3509
3510        let filter = JoinFilter::new(
3511            filter_expression,
3512            column_indices,
3513            Arc::new(intermediate_schema.clone()),
3514        );
3515
3516        let join = join_with_filter(
3517            left,
3518            right,
3519            on,
3520            filter,
3521            &JoinType::RightSemi,
3522            NullEquality::NullEqualsNothing,
3523        )?;
3524        let stream = join.execute(0, task_ctx)?;
3525        let batches = common::collect(stream).await?;
3526
3527        // RightSemi join output is expected to preserve right input order
3528        allow_duplicates! {
3529            assert_snapshot!(batches_to_string(&batches), @r"
3530            +----+----+-----+
3531            | a2 | b2 | c2  |
3532            +----+----+-----+
3533            | 12 | 10 | 40  |
3534            | 10 | 10 | 100 |
3535            +----+----+-----+
3536            ");
3537        }
3538
3539        let metrics = join.metrics().unwrap();
3540        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3541
3542        Ok(())
3543    }
3544
3545    #[apply(hash_join_exec_configs)]
3546    #[tokio::test]
3547    async fn join_left_anti(
3548        batch_size: usize,
3549        use_perfect_hash_join_as_possible: bool,
3550    ) -> Result<()> {
3551        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3552        let left = build_semi_anti_left_table();
3553        let right = build_semi_anti_right_table();
3554        // left_table left anti join right_table on left_table.b1 = right_table.b2
3555        let on = vec![(
3556            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3557            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3558        )];
3559
3560        let join = join(
3561            left,
3562            right,
3563            on,
3564            &JoinType::LeftAnti,
3565            NullEquality::NullEqualsNothing,
3566        )?;
3567
3568        let columns = columns(&join.schema());
3569        assert_eq!(columns, vec!["a1", "b1", "c1"]);
3570
3571        let stream = join.execute(0, task_ctx)?;
3572        let batches = common::collect(stream).await?;
3573
3574        allow_duplicates! {
3575            assert_snapshot!(batches_to_sort_string(&batches), @r"
3576            +----+----+----+
3577            | a1 | b1 | c1 |
3578            +----+----+----+
3579            | 1  | 1  | 10 |
3580            | 3  | 3  | 30 |
3581            | 5  | 5  | 50 |
3582            | 7  | 7  | 70 |
3583            +----+----+----+
3584            ");
3585        }
3586
3587        let metrics = join.metrics().unwrap();
3588        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3589
3590        Ok(())
3591    }
3592
3593    #[apply(hash_join_exec_configs)]
3594    #[tokio::test]
3595    async fn join_left_anti_with_filter(
3596        batch_size: usize,
3597        use_perfect_hash_join_as_possible: bool,
3598    ) -> Result<()> {
3599        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3600        let left = build_semi_anti_left_table();
3601        let right = build_semi_anti_right_table();
3602        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8
3603        let on = vec![(
3604            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3605            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3606        )];
3607
3608        let column_indices = vec![ColumnIndex {
3609            index: 0,
3610            side: JoinSide::Right,
3611        }];
3612        let intermediate_schema =
3613            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3614        let filter_expression = Arc::new(BinaryExpr::new(
3615            Arc::new(Column::new("x", 0)),
3616            Operator::NotEq,
3617            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3618        )) as Arc<dyn PhysicalExpr>;
3619
3620        let filter = JoinFilter::new(
3621            filter_expression,
3622            column_indices.clone(),
3623            Arc::new(intermediate_schema.clone()),
3624        );
3625
3626        let join = join_with_filter(
3627            Arc::clone(&left),
3628            Arc::clone(&right),
3629            on.clone(),
3630            filter,
3631            &JoinType::LeftAnti,
3632            NullEquality::NullEqualsNothing,
3633        )?;
3634
3635        let columns_header = columns(&join.schema());
3636        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3637
3638        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3639        let batches = common::collect(stream).await?;
3640
3641        allow_duplicates! {
3642            assert_snapshot!(batches_to_sort_string(&batches), @r"
3643            +----+----+-----+
3644            | a1 | b1 | c1  |
3645            +----+----+-----+
3646            | 1  | 1  | 10  |
3647            | 11 | 8  | 110 |
3648            | 3  | 3  | 30  |
3649            | 5  | 5  | 50  |
3650            | 7  | 7  | 70  |
3651            | 9  | 8  | 90  |
3652            +----+----+-----+
3653            ");
3654        }
3655
3656        let metrics = join.metrics().unwrap();
3657        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3658
3659        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13
3660        let filter_expression = Arc::new(BinaryExpr::new(
3661            Arc::new(Column::new("x", 0)),
3662            Operator::NotEq,
3663            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3664        )) as Arc<dyn PhysicalExpr>;
3665
3666        let filter = JoinFilter::new(
3667            filter_expression,
3668            column_indices,
3669            Arc::new(intermediate_schema),
3670        );
3671
3672        let join = join_with_filter(
3673            left,
3674            right,
3675            on,
3676            filter,
3677            &JoinType::LeftAnti,
3678            NullEquality::NullEqualsNothing,
3679        )?;
3680
3681        let columns_header = columns(&join.schema());
3682        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3683
3684        let stream = join.execute(0, task_ctx)?;
3685        let batches = common::collect(stream).await?;
3686
3687        allow_duplicates! {
3688            assert_snapshot!(batches_to_sort_string(&batches), @r"
3689            +----+----+-----+
3690            | a1 | b1 | c1  |
3691            +----+----+-----+
3692            | 1  | 1  | 10  |
3693            | 11 | 8  | 110 |
3694            | 3  | 3  | 30  |
3695            | 5  | 5  | 50  |
3696            | 7  | 7  | 70  |
3697            | 9  | 8  | 90  |
3698            +----+----+-----+
3699            ");
3700        }
3701
3702        let metrics = join.metrics().unwrap();
3703        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3704
3705        Ok(())
3706    }
3707
3708    #[apply(hash_join_exec_configs)]
3709    #[tokio::test]
3710    async fn join_right_anti(
3711        batch_size: usize,
3712        use_perfect_hash_join_as_possible: bool,
3713    ) -> Result<()> {
3714        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3715        let left = build_semi_anti_left_table();
3716        let right = build_semi_anti_right_table();
3717        let on = vec![(
3718            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3719            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3720        )];
3721
3722        let join = join(
3723            left,
3724            right,
3725            on,
3726            &JoinType::RightAnti,
3727            NullEquality::NullEqualsNothing,
3728        )?;
3729
3730        let columns = columns(&join.schema());
3731        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3732
3733        let stream = join.execute(0, task_ctx)?;
3734        let batches = common::collect(stream).await?;
3735
3736        // RightAnti join output is expected to preserve right input order
3737        allow_duplicates! {
3738            assert_snapshot!(batches_to_string(&batches), @r"
3739            +----+----+-----+
3740            | a2 | b2 | c2  |
3741            +----+----+-----+
3742            | 6  | 6  | 60  |
3743            | 2  | 2  | 80  |
3744            | 4  | 4  | 120 |
3745            +----+----+-----+
3746            ");
3747        }
3748
3749        let metrics = join.metrics().unwrap();
3750        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3751
3752        Ok(())
3753    }
3754
3755    #[apply(hash_join_exec_configs)]
3756    #[tokio::test]
3757    async fn join_right_anti_with_filter(
3758        batch_size: usize,
3759        use_perfect_hash_join_as_possible: bool,
3760    ) -> Result<()> {
3761        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3762        let left = build_semi_anti_left_table();
3763        let right = build_semi_anti_right_table();
3764        // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13
3765        let on = vec![(
3766            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3767            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3768        )];
3769
3770        let column_indices = vec![ColumnIndex {
3771            index: 0,
3772            side: JoinSide::Left,
3773        }];
3774        let intermediate_schema =
3775            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3776
3777        let filter_expression = Arc::new(BinaryExpr::new(
3778            Arc::new(Column::new("x", 0)),
3779            Operator::NotEq,
3780            Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3781        )) as Arc<dyn PhysicalExpr>;
3782
3783        let filter = JoinFilter::new(
3784            filter_expression,
3785            column_indices,
3786            Arc::new(intermediate_schema.clone()),
3787        );
3788
3789        let join = join_with_filter(
3790            Arc::clone(&left),
3791            Arc::clone(&right),
3792            on.clone(),
3793            filter,
3794            &JoinType::RightAnti,
3795            NullEquality::NullEqualsNothing,
3796        )?;
3797
3798        let columns_header = columns(&join.schema());
3799        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3800
3801        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3802        let batches = common::collect(stream).await?;
3803
3804        // RightAnti join output is expected to preserve right input order
3805        allow_duplicates! {
3806            assert_snapshot!(batches_to_string(&batches), @r"
3807            +----+----+-----+
3808            | a2 | b2 | c2  |
3809            +----+----+-----+
3810            | 12 | 10 | 40  |
3811            | 6  | 6  | 60  |
3812            | 2  | 2  | 80  |
3813            | 10 | 10 | 100 |
3814            | 4  | 4  | 120 |
3815            +----+----+-----+
3816            ");
3817        }
3818
3819        let metrics = join.metrics().unwrap();
3820        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3821
3822        // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
3823        let column_indices = vec![ColumnIndex {
3824            index: 1,
3825            side: JoinSide::Right,
3826        }];
3827        let filter_expression = Arc::new(BinaryExpr::new(
3828            Arc::new(Column::new("x", 0)),
3829            Operator::NotEq,
3830            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3831        )) as Arc<dyn PhysicalExpr>;
3832
3833        let filter = JoinFilter::new(
3834            filter_expression,
3835            column_indices,
3836            Arc::new(intermediate_schema),
3837        );
3838
3839        let join = join_with_filter(
3840            left,
3841            right,
3842            on,
3843            filter,
3844            &JoinType::RightAnti,
3845            NullEquality::NullEqualsNothing,
3846        )?;
3847
3848        let columns_header = columns(&join.schema());
3849        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3850
3851        let stream = join.execute(0, task_ctx)?;
3852        let batches = common::collect(stream).await?;
3853
3854        // RightAnti join output is expected to preserve right input order
3855        allow_duplicates! {
3856            assert_snapshot!(batches_to_string(&batches), @r"
3857            +----+----+-----+
3858            | a2 | b2 | c2  |
3859            +----+----+-----+
3860            | 8  | 8  | 20  |
3861            | 6  | 6  | 60  |
3862            | 2  | 2  | 80  |
3863            | 4  | 4  | 120 |
3864            +----+----+-----+
3865            ");
3866        }
3867
3868        let metrics = join.metrics().unwrap();
3869        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3870
3871        Ok(())
3872    }
3873
3874    #[apply(hash_join_exec_configs)]
3875    #[tokio::test]
3876    async fn join_right_one(
3877        batch_size: usize,
3878        use_perfect_hash_join_as_possible: bool,
3879    ) -> Result<()> {
3880        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3881        let left = build_table(
3882            ("a1", &vec![1, 2, 3]),
3883            ("b1", &vec![4, 5, 7]),
3884            ("c1", &vec![7, 8, 9]),
3885        );
3886        let right = build_table(
3887            ("a2", &vec![10, 20, 30]),
3888            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3889            ("c2", &vec![70, 80, 90]),
3890        );
3891        let on = vec![(
3892            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3893            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3894        )];
3895
3896        let (columns, batches, metrics) = join_collect(
3897            left,
3898            right,
3899            on,
3900            &JoinType::Right,
3901            NullEquality::NullEqualsNothing,
3902            task_ctx,
3903        )
3904        .await?;
3905
3906        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3907
3908        allow_duplicates! {
3909            assert_snapshot!(batches_to_sort_string(&batches), @r"
3910            +----+----+----+----+----+----+
3911            | a1 | b1 | c1 | a2 | b1 | c2 |
3912            +----+----+----+----+----+----+
3913            |    |    |    | 30 | 6  | 90 |
3914            | 1  | 4  | 7  | 10 | 4  | 70 |
3915            | 2  | 5  | 8  | 20 | 5  | 80 |
3916            +----+----+----+----+----+----+
3917            ");
3918        }
3919
3920        assert_join_metrics!(metrics, 3);
3921        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3922
3923        Ok(())
3924    }
3925
3926    #[apply(hash_join_exec_configs)]
3927    #[tokio::test]
3928    async fn partitioned_join_right_one(
3929        batch_size: usize,
3930        use_perfect_hash_join_as_possible: bool,
3931    ) -> Result<()> {
3932        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3933        let left = build_table(
3934            ("a1", &vec![1, 2, 3]),
3935            ("b1", &vec![4, 5, 7]),
3936            ("c1", &vec![7, 8, 9]),
3937        );
3938        let right = build_table(
3939            ("a2", &vec![10, 20, 30]),
3940            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3941            ("c2", &vec![70, 80, 90]),
3942        );
3943        let on = vec![(
3944            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3945            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3946        )];
3947
3948        let (columns, batches, metrics) = partitioned_join_collect(
3949            left,
3950            right,
3951            on,
3952            &JoinType::Right,
3953            NullEquality::NullEqualsNothing,
3954            task_ctx,
3955        )
3956        .await?;
3957
3958        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3959
3960        allow_duplicates! {
3961            assert_snapshot!(batches_to_sort_string(&batches), @r"
3962            +----+----+----+----+----+----+
3963            | a1 | b1 | c1 | a2 | b1 | c2 |
3964            +----+----+----+----+----+----+
3965            |    |    |    | 30 | 6  | 90 |
3966            | 1  | 4  | 7  | 10 | 4  | 70 |
3967            | 2  | 5  | 8  | 20 | 5  | 80 |
3968            +----+----+----+----+----+----+
3969            ");
3970        }
3971
3972        assert_join_metrics!(metrics, 3);
3973        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3974
3975        Ok(())
3976    }
3977
3978    #[apply(hash_join_exec_configs)]
3979    #[tokio::test]
3980    async fn join_full_one(
3981        batch_size: usize,
3982        use_perfect_hash_join_as_possible: bool,
3983    ) -> Result<()> {
3984        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3985        let left = build_table(
3986            ("a1", &vec![1, 2, 3]),
3987            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3988            ("c1", &vec![7, 8, 9]),
3989        );
3990        let right = build_table(
3991            ("a2", &vec![10, 20, 30]),
3992            ("b2", &vec![4, 5, 6]),
3993            ("c2", &vec![70, 80, 90]),
3994        );
3995        let on = vec![(
3996            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3997            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3998        )];
3999
4000        let join = join(
4001            left,
4002            right,
4003            on,
4004            &JoinType::Full,
4005            NullEquality::NullEqualsNothing,
4006        )?;
4007
4008        let columns = columns(&join.schema());
4009        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
4010
4011        let stream = join.execute(0, task_ctx)?;
4012        let batches = common::collect(stream).await?;
4013
4014        allow_duplicates! {
4015            assert_snapshot!(batches_to_sort_string(&batches), @r"
4016            +----+----+----+----+----+----+
4017            | a1 | b1 | c1 | a2 | b2 | c2 |
4018            +----+----+----+----+----+----+
4019            |    |    |    | 30 | 6  | 90 |
4020            | 1  | 4  | 7  | 10 | 4  | 70 |
4021            | 2  | 5  | 8  | 20 | 5  | 80 |
4022            | 3  | 7  | 9  |    |    |    |
4023            +----+----+----+----+----+----+
4024            ");
4025        }
4026
4027        let metrics = join.metrics().unwrap();
4028        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4029
4030        Ok(())
4031    }
4032
4033    #[apply(hash_join_exec_configs)]
4034    #[tokio::test]
4035    async fn join_left_mark(
4036        batch_size: usize,
4037        use_perfect_hash_join_as_possible: bool,
4038    ) -> Result<()> {
4039        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4040        let left = build_table(
4041            ("a1", &vec![1, 2, 3]),
4042            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4043            ("c1", &vec![7, 8, 9]),
4044        );
4045        let right = build_table(
4046            ("a2", &vec![10, 20, 30]),
4047            ("b1", &vec![4, 5, 6]),
4048            ("c2", &vec![70, 80, 90]),
4049        );
4050        let on = vec![(
4051            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4052            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4053        )];
4054
4055        let (columns, batches, metrics) = join_collect(
4056            Arc::clone(&left),
4057            Arc::clone(&right),
4058            on.clone(),
4059            &JoinType::LeftMark,
4060            NullEquality::NullEqualsNothing,
4061            task_ctx,
4062        )
4063        .await?;
4064
4065        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4066
4067        allow_duplicates! {
4068            assert_snapshot!(batches_to_sort_string(&batches), @r"
4069            +----+----+----+-------+
4070            | a1 | b1 | c1 | mark  |
4071            +----+----+----+-------+
4072            | 1  | 4  | 7  | true  |
4073            | 2  | 5  | 8  | true  |
4074            | 3  | 7  | 9  | false |
4075            +----+----+----+-------+
4076            ");
4077        }
4078
4079        assert_join_metrics!(metrics, 3);
4080        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4081
4082        Ok(())
4083    }
4084
4085    #[apply(hash_join_exec_configs)]
4086    #[tokio::test]
4087    async fn partitioned_join_left_mark(
4088        batch_size: usize,
4089        use_perfect_hash_join_as_possible: bool,
4090    ) -> Result<()> {
4091        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4092        let left = build_table(
4093            ("a1", &vec![1, 2, 3]),
4094            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4095            ("c1", &vec![7, 8, 9]),
4096        );
4097        let right = build_table(
4098            ("a2", &vec![10, 20, 30, 40]),
4099            ("b1", &vec![4, 4, 5, 6]),
4100            ("c2", &vec![60, 70, 80, 90]),
4101        );
4102        let on = vec![(
4103            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4104            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4105        )];
4106
4107        let (columns, batches, metrics) = partitioned_join_collect(
4108            Arc::clone(&left),
4109            Arc::clone(&right),
4110            on.clone(),
4111            &JoinType::LeftMark,
4112            NullEquality::NullEqualsNothing,
4113            task_ctx,
4114        )
4115        .await?;
4116
4117        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4118
4119        allow_duplicates! {
4120            assert_snapshot!(batches_to_sort_string(&batches), @r"
4121            +----+----+----+-------+
4122            | a1 | b1 | c1 | mark  |
4123            +----+----+----+-------+
4124            | 1  | 4  | 7  | true  |
4125            | 2  | 5  | 8  | true  |
4126            | 3  | 7  | 9  | false |
4127            +----+----+----+-------+
4128            ");
4129        }
4130
4131        assert_join_metrics!(metrics, 3);
4132        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4133
4134        Ok(())
4135    }
4136
4137    #[apply(hash_join_exec_configs)]
4138    #[tokio::test]
4139    async fn join_right_mark(
4140        batch_size: usize,
4141        use_perfect_hash_join_as_possible: bool,
4142    ) -> Result<()> {
4143        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4144        let left = build_table(
4145            ("a1", &vec![1, 2, 3]),
4146            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4147            ("c1", &vec![7, 8, 9]),
4148        );
4149        let right = build_table(
4150            ("a2", &vec![10, 20, 30]),
4151            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
4152            ("c2", &vec![70, 80, 90]),
4153        );
4154        let on = vec![(
4155            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4156            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4157        )];
4158
4159        let (columns, batches, metrics) = join_collect(
4160            Arc::clone(&left),
4161            Arc::clone(&right),
4162            on.clone(),
4163            &JoinType::RightMark,
4164            NullEquality::NullEqualsNothing,
4165            task_ctx,
4166        )
4167        .await?;
4168
4169        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4170
4171        let expected = [
4172            "+----+----+----+-------+",
4173            "| a2 | b1 | c2 | mark  |",
4174            "+----+----+----+-------+",
4175            "| 10 | 4  | 70 | true  |",
4176            "| 20 | 5  | 80 | true  |",
4177            "| 30 | 6  | 90 | false |",
4178            "+----+----+----+-------+",
4179        ];
4180        assert_batches_sorted_eq!(expected, &batches);
4181
4182        assert_join_metrics!(metrics, 3);
4183        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4184
4185        Ok(())
4186    }
4187
4188    #[apply(hash_join_exec_configs)]
4189    #[tokio::test]
4190    async fn partitioned_join_right_mark(
4191        batch_size: usize,
4192        use_perfect_hash_join_as_possible: bool,
4193    ) -> Result<()> {
4194        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4195        let left = build_table(
4196            ("a1", &vec![1, 2, 3]),
4197            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4198            ("c1", &vec![7, 8, 9]),
4199        );
4200        let right = build_table(
4201            ("a2", &vec![10, 20, 30, 40]),
4202            ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left
4203            ("c2", &vec![60, 70, 80, 90]),
4204        );
4205        let on = vec![(
4206            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4207            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4208        )];
4209
4210        let (columns, batches, metrics) = partitioned_join_collect(
4211            Arc::clone(&left),
4212            Arc::clone(&right),
4213            on.clone(),
4214            &JoinType::RightMark,
4215            NullEquality::NullEqualsNothing,
4216            task_ctx,
4217        )
4218        .await?;
4219
4220        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4221
4222        let expected = [
4223            "+----+----+----+-------+",
4224            "| a2 | b1 | c2 | mark  |",
4225            "+----+----+----+-------+",
4226            "| 10 | 4  | 60 | true  |",
4227            "| 20 | 4  | 70 | true  |",
4228            "| 30 | 5  | 80 | true  |",
4229            "| 40 | 6  | 90 | false |",
4230            "+----+----+----+-------+",
4231        ];
4232        assert_batches_sorted_eq!(expected, &batches);
4233
4234        assert_join_metrics!(metrics, 4);
4235        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4236
4237        Ok(())
4238    }
4239
4240    #[test]
4241    fn join_with_hash_collisions_64() -> Result<()> {
4242        let mut hashmap_left = HashTable::with_capacity(4);
4243        let left = build_table_i32(
4244            ("a", &vec![10, 20]),
4245            ("x", &vec![100, 200]),
4246            ("y", &vec![200, 300]),
4247        );
4248
4249        let random_state = RandomState::with_seeds(0, 0, 0, 0);
4250        let hashes_buff = &mut vec![0; left.num_rows()];
4251        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4252
4253        // Maps both values to both indices (1 and 2, representing input 0 and 1)
4254        // 0 -> (0, 1)
4255        // 1 -> (0, 2)
4256        // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
4257        hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
4258        hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
4259
4260        hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
4261        hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
4262
4263        let next = vec![2, 0];
4264
4265        let right = build_table_i32(
4266            ("a", &vec![10, 20]),
4267            ("b", &vec![0, 0]),
4268            ("c", &vec![30, 40]),
4269        );
4270
4271        // Join key column for both join sides
4272        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4273
4274        let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
4275
4276        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4277        let right_keys_values =
4278            key_column.evaluate(&right)?.into_array(right.num_rows())?;
4279        let mut hashes_buffer = vec![0; right.num_rows()];
4280        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4281
4282        let mut probe_indices_buffer = Vec::new();
4283        let mut build_indices_buffer = Vec::new();
4284        let (l, r, _) = lookup_join_hashmap(
4285            &join_hash_map,
4286            &[left_keys_values],
4287            &[right_keys_values],
4288            NullEquality::NullEqualsNothing,
4289            &hashes_buffer,
4290            8192,
4291            (0, None),
4292            &mut probe_indices_buffer,
4293            &mut build_indices_buffer,
4294        )?;
4295
4296        let left_ids: UInt64Array = vec![0, 1].into();
4297
4298        let right_ids: UInt32Array = vec![0, 1].into();
4299
4300        assert_eq!(left_ids, l);
4301
4302        assert_eq!(right_ids, r);
4303
4304        Ok(())
4305    }
4306
4307    #[test]
4308    fn join_with_hash_collisions_u32() -> Result<()> {
4309        let mut hashmap_left = HashTable::with_capacity(4);
4310        let left = build_table_i32(
4311            ("a", &vec![10, 20]),
4312            ("x", &vec![100, 200]),
4313            ("y", &vec![200, 300]),
4314        );
4315
4316        let random_state = RandomState::with_seeds(0, 0, 0, 0);
4317        let hashes_buff = &mut vec![0; left.num_rows()];
4318        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4319
4320        hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
4321        hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
4322        hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
4323        hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
4324
4325        let next: Vec<u32> = vec![2, 0];
4326
4327        let right = build_table_i32(
4328            ("a", &vec![10, 20]),
4329            ("b", &vec![0, 0]),
4330            ("c", &vec![30, 40]),
4331        );
4332
4333        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4334
4335        let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
4336
4337        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4338        let right_keys_values =
4339            key_column.evaluate(&right)?.into_array(right.num_rows())?;
4340        let mut hashes_buffer = vec![0; right.num_rows()];
4341        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4342
4343        let mut probe_indices_buffer = Vec::new();
4344        let mut build_indices_buffer = Vec::new();
4345        let (l, r, _) = lookup_join_hashmap(
4346            &join_hash_map,
4347            &[left_keys_values],
4348            &[right_keys_values],
4349            NullEquality::NullEqualsNothing,
4350            &hashes_buffer,
4351            8192,
4352            (0, None),
4353            &mut probe_indices_buffer,
4354            &mut build_indices_buffer,
4355        )?;
4356
4357        // We still expect to match rows 0 and 1 on both sides
4358        let left_ids: UInt64Array = vec![0, 1].into();
4359        let right_ids: UInt32Array = vec![0, 1].into();
4360
4361        assert_eq!(left_ids, l);
4362        assert_eq!(right_ids, r);
4363
4364        Ok(())
4365    }
4366
4367    #[tokio::test]
4368    async fn join_with_duplicated_column_names() -> Result<()> {
4369        let task_ctx = Arc::new(TaskContext::default());
4370        let left = build_table(
4371            ("a", &vec![1, 2, 3]),
4372            ("b", &vec![4, 5, 7]),
4373            ("c", &vec![7, 8, 9]),
4374        );
4375        let right = build_table(
4376            ("a", &vec![10, 20, 30]),
4377            ("b", &vec![1, 2, 7]),
4378            ("c", &vec![70, 80, 90]),
4379        );
4380        let on = vec![(
4381            // join on a=b so there are duplicate column names on unjoined columns
4382            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4383            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4384        )];
4385
4386        let join = join(
4387            left,
4388            right,
4389            on,
4390            &JoinType::Inner,
4391            NullEquality::NullEqualsNothing,
4392        )?;
4393
4394        let columns = columns(&join.schema());
4395        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4396
4397        let stream = join.execute(0, task_ctx)?;
4398        let batches = common::collect(stream).await?;
4399
4400        allow_duplicates! {
4401            assert_snapshot!(batches_to_sort_string(&batches), @r"
4402            +---+---+---+----+---+----+
4403            | a | b | c | a  | b | c  |
4404            +---+---+---+----+---+----+
4405            | 1 | 4 | 7 | 10 | 1 | 70 |
4406            | 2 | 5 | 8 | 20 | 2 | 80 |
4407            +---+---+---+----+---+----+
4408            ");
4409        }
4410
4411        Ok(())
4412    }
4413
4414    fn prepare_join_filter() -> JoinFilter {
4415        let column_indices = vec![
4416            ColumnIndex {
4417                index: 2,
4418                side: JoinSide::Left,
4419            },
4420            ColumnIndex {
4421                index: 2,
4422                side: JoinSide::Right,
4423            },
4424        ];
4425        let intermediate_schema = Schema::new(vec![
4426            Field::new("c", DataType::Int32, true),
4427            Field::new("c", DataType::Int32, true),
4428        ]);
4429        let filter_expression = Arc::new(BinaryExpr::new(
4430            Arc::new(Column::new("c", 0)),
4431            Operator::Gt,
4432            Arc::new(Column::new("c", 1)),
4433        )) as Arc<dyn PhysicalExpr>;
4434
4435        JoinFilter::new(
4436            filter_expression,
4437            column_indices,
4438            Arc::new(intermediate_schema),
4439        )
4440    }
4441
4442    #[apply(hash_join_exec_configs)]
4443    #[tokio::test]
4444    async fn join_inner_with_filter(
4445        batch_size: usize,
4446        use_perfect_hash_join_as_possible: bool,
4447    ) -> Result<()> {
4448        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4449        let left = build_table(
4450            ("a", &vec![0, 1, 2, 2]),
4451            ("b", &vec![4, 5, 7, 8]),
4452            ("c", &vec![7, 8, 9, 1]),
4453        );
4454        let right = build_table(
4455            ("a", &vec![10, 20, 30, 40]),
4456            ("b", &vec![2, 2, 3, 4]),
4457            ("c", &vec![7, 5, 6, 4]),
4458        );
4459        let on = vec![(
4460            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4461            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4462        )];
4463        let filter = prepare_join_filter();
4464
4465        let join = join_with_filter(
4466            left,
4467            right,
4468            on,
4469            filter,
4470            &JoinType::Inner,
4471            NullEquality::NullEqualsNothing,
4472        )?;
4473
4474        let columns = columns(&join.schema());
4475        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4476
4477        let stream = join.execute(0, task_ctx)?;
4478        let batches = common::collect(stream).await?;
4479
4480        allow_duplicates! {
4481            assert_snapshot!(batches_to_sort_string(&batches), @r"
4482            +---+---+---+----+---+---+
4483            | a | b | c | a  | b | c |
4484            +---+---+---+----+---+---+
4485            | 2 | 7 | 9 | 10 | 2 | 7 |
4486            | 2 | 7 | 9 | 20 | 2 | 5 |
4487            +---+---+---+----+---+---+
4488            ");
4489        }
4490
4491        let metrics = join.metrics().unwrap();
4492        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4493
4494        Ok(())
4495    }
4496
4497    #[apply(hash_join_exec_configs)]
4498    #[tokio::test]
4499    async fn join_left_with_filter(
4500        batch_size: usize,
4501        use_perfect_hash_join_as_possible: bool,
4502    ) -> Result<()> {
4503        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4504        let left = build_table(
4505            ("a", &vec![0, 1, 2, 2]),
4506            ("b", &vec![4, 5, 7, 8]),
4507            ("c", &vec![7, 8, 9, 1]),
4508        );
4509        let right = build_table(
4510            ("a", &vec![10, 20, 30, 40]),
4511            ("b", &vec![2, 2, 3, 4]),
4512            ("c", &vec![7, 5, 6, 4]),
4513        );
4514        let on = vec![(
4515            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4516            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4517        )];
4518        let filter = prepare_join_filter();
4519
4520        let join = join_with_filter(
4521            left,
4522            right,
4523            on,
4524            filter,
4525            &JoinType::Left,
4526            NullEquality::NullEqualsNothing,
4527        )?;
4528
4529        let columns = columns(&join.schema());
4530        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4531
4532        let stream = join.execute(0, task_ctx)?;
4533        let batches = common::collect(stream).await?;
4534
4535        allow_duplicates! {
4536            assert_snapshot!(batches_to_sort_string(&batches), @r"
4537            +---+---+---+----+---+---+
4538            | a | b | c | a  | b | c |
4539            +---+---+---+----+---+---+
4540            | 0 | 4 | 7 |    |   |   |
4541            | 1 | 5 | 8 |    |   |   |
4542            | 2 | 7 | 9 | 10 | 2 | 7 |
4543            | 2 | 7 | 9 | 20 | 2 | 5 |
4544            | 2 | 8 | 1 |    |   |   |
4545            +---+---+---+----+---+---+
4546            ");
4547        }
4548
4549        let metrics = join.metrics().unwrap();
4550        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4551
4552        Ok(())
4553    }
4554
4555    #[apply(hash_join_exec_configs)]
4556    #[tokio::test]
4557    async fn join_right_with_filter(
4558        batch_size: usize,
4559        use_perfect_hash_join_as_possible: bool,
4560    ) -> Result<()> {
4561        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4562        let left = build_table(
4563            ("a", &vec![0, 1, 2, 2]),
4564            ("b", &vec![4, 5, 7, 8]),
4565            ("c", &vec![7, 8, 9, 1]),
4566        );
4567        let right = build_table(
4568            ("a", &vec![10, 20, 30, 40]),
4569            ("b", &vec![2, 2, 3, 4]),
4570            ("c", &vec![7, 5, 6, 4]),
4571        );
4572        let on = vec![(
4573            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4574            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4575        )];
4576        let filter = prepare_join_filter();
4577
4578        let join = join_with_filter(
4579            left,
4580            right,
4581            on,
4582            filter,
4583            &JoinType::Right,
4584            NullEquality::NullEqualsNothing,
4585        )?;
4586
4587        let columns = columns(&join.schema());
4588        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4589
4590        let stream = join.execute(0, task_ctx)?;
4591        let batches = common::collect(stream).await?;
4592
4593        allow_duplicates! {
4594            assert_snapshot!(batches_to_sort_string(&batches), @r"
4595            +---+---+---+----+---+---+
4596            | a | b | c | a  | b | c |
4597            +---+---+---+----+---+---+
4598            |   |   |   | 30 | 3 | 6 |
4599            |   |   |   | 40 | 4 | 4 |
4600            | 2 | 7 | 9 | 10 | 2 | 7 |
4601            | 2 | 7 | 9 | 20 | 2 | 5 |
4602            +---+---+---+----+---+---+
4603            ");
4604        }
4605
4606        let metrics = join.metrics().unwrap();
4607        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4608
4609        Ok(())
4610    }
4611
4612    #[apply(hash_join_exec_configs)]
4613    #[tokio::test]
4614    async fn join_full_with_filter(
4615        batch_size: usize,
4616        use_perfect_hash_join_as_possible: bool,
4617    ) -> Result<()> {
4618        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4619        let left = build_table(
4620            ("a", &vec![0, 1, 2, 2]),
4621            ("b", &vec![4, 5, 7, 8]),
4622            ("c", &vec![7, 8, 9, 1]),
4623        );
4624        let right = build_table(
4625            ("a", &vec![10, 20, 30, 40]),
4626            ("b", &vec![2, 2, 3, 4]),
4627            ("c", &vec![7, 5, 6, 4]),
4628        );
4629        let on = vec![(
4630            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4631            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4632        )];
4633        let filter = prepare_join_filter();
4634
4635        let join = join_with_filter(
4636            left,
4637            right,
4638            on,
4639            filter,
4640            &JoinType::Full,
4641            NullEquality::NullEqualsNothing,
4642        )?;
4643
4644        let columns = columns(&join.schema());
4645        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4646
4647        let stream = join.execute(0, task_ctx)?;
4648        let batches = common::collect(stream).await?;
4649
4650        let expected = [
4651            "+---+---+---+----+---+---+",
4652            "| a | b | c | a  | b | c |",
4653            "+---+---+---+----+---+---+",
4654            "|   |   |   | 30 | 3 | 6 |",
4655            "|   |   |   | 40 | 4 | 4 |",
4656            "| 2 | 7 | 9 | 10 | 2 | 7 |",
4657            "| 2 | 7 | 9 | 20 | 2 | 5 |",
4658            "| 0 | 4 | 7 |    |   |   |",
4659            "| 1 | 5 | 8 |    |   |   |",
4660            "| 2 | 8 | 1 |    |   |   |",
4661            "+---+---+---+----+---+---+",
4662        ];
4663        assert_batches_sorted_eq!(expected, &batches);
4664
4665        let metrics = join.metrics().unwrap();
4666        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4667
4668        // THIS MIGRATION HALTED DUE TO ISSUE #15312
4669        //allow_duplicates! {
4670        //    assert_snapshot!(batches_to_sort_string(&batches), @r#"
4671        //    +---+---+---+----+---+---+
4672        //    | a | b | c | a  | b | c |
4673        //    +---+---+---+----+---+---+
4674        //    |   |   |   | 30 | 3 | 6 |
4675        //    |   |   |   | 40 | 4 | 4 |
4676        //    | 2 | 7 | 9 | 10 | 2 | 7 |
4677        //    | 2 | 7 | 9 | 20 | 2 | 5 |
4678        //    | 0 | 4 | 7 |    |   |   |
4679        //    | 1 | 5 | 8 |    |   |   |
4680        //    | 2 | 8 | 1 |    |   |   |
4681        //    +---+---+---+----+---+---+
4682        //        "#)
4683        //}
4684
4685        Ok(())
4686    }
4687
4688    /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft
4689    #[tokio::test]
4690    async fn test_collect_left_multiple_partitions_join() -> Result<()> {
4691        let task_ctx = Arc::new(TaskContext::default());
4692        let left = build_table(
4693            ("a1", &vec![1, 2, 3]),
4694            ("b1", &vec![4, 5, 7]),
4695            ("c1", &vec![7, 8, 9]),
4696        );
4697        let right = build_table(
4698            ("a2", &vec![10, 20, 30]),
4699            ("b2", &vec![4, 5, 6]),
4700            ("c2", &vec![70, 80, 90]),
4701        );
4702        let on = vec![(
4703            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4704            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4705        )];
4706
4707        let expected_inner = vec![
4708            "+----+----+----+----+----+----+",
4709            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4710            "+----+----+----+----+----+----+",
4711            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4712            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4713            "+----+----+----+----+----+----+",
4714        ];
4715        let expected_left = vec![
4716            "+----+----+----+----+----+----+",
4717            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4718            "+----+----+----+----+----+----+",
4719            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4720            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4721            "| 3  | 7  | 9  |    |    |    |",
4722            "+----+----+----+----+----+----+",
4723        ];
4724        let expected_right = vec![
4725            "+----+----+----+----+----+----+",
4726            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4727            "+----+----+----+----+----+----+",
4728            "|    |    |    | 30 | 6  | 90 |",
4729            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4730            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4731            "+----+----+----+----+----+----+",
4732        ];
4733        let expected_full = vec![
4734            "+----+----+----+----+----+----+",
4735            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4736            "+----+----+----+----+----+----+",
4737            "|    |    |    | 30 | 6  | 90 |",
4738            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4739            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4740            "| 3  | 7  | 9  |    |    |    |",
4741            "+----+----+----+----+----+----+",
4742        ];
4743        let expected_left_semi = vec![
4744            "+----+----+----+",
4745            "| a1 | b1 | c1 |",
4746            "+----+----+----+",
4747            "| 1  | 4  | 7  |",
4748            "| 2  | 5  | 8  |",
4749            "+----+----+----+",
4750        ];
4751        let expected_left_anti = vec![
4752            "+----+----+----+",
4753            "| a1 | b1 | c1 |",
4754            "+----+----+----+",
4755            "| 3  | 7  | 9  |",
4756            "+----+----+----+",
4757        ];
4758        let expected_right_semi = vec![
4759            "+----+----+----+",
4760            "| a2 | b2 | c2 |",
4761            "+----+----+----+",
4762            "| 10 | 4  | 70 |",
4763            "| 20 | 5  | 80 |",
4764            "+----+----+----+",
4765        ];
4766        let expected_right_anti = vec![
4767            "+----+----+----+",
4768            "| a2 | b2 | c2 |",
4769            "+----+----+----+",
4770            "| 30 | 6  | 90 |",
4771            "+----+----+----+",
4772        ];
4773        let expected_left_mark = vec![
4774            "+----+----+----+-------+",
4775            "| a1 | b1 | c1 | mark  |",
4776            "+----+----+----+-------+",
4777            "| 1  | 4  | 7  | true  |",
4778            "| 2  | 5  | 8  | true  |",
4779            "| 3  | 7  | 9  | false |",
4780            "+----+----+----+-------+",
4781        ];
4782        let expected_right_mark = vec![
4783            "+----+----+----+-------+",
4784            "| a2 | b2 | c2 | mark  |",
4785            "+----+----+----+-------+",
4786            "| 10 | 4  | 70 | true  |",
4787            "| 20 | 5  | 80 | true  |",
4788            "| 30 | 6  | 90 | false |",
4789            "+----+----+----+-------+",
4790        ];
4791
4792        let test_cases = vec![
4793            (JoinType::Inner, expected_inner),
4794            (JoinType::Left, expected_left),
4795            (JoinType::Right, expected_right),
4796            (JoinType::Full, expected_full),
4797            (JoinType::LeftSemi, expected_left_semi),
4798            (JoinType::LeftAnti, expected_left_anti),
4799            (JoinType::RightSemi, expected_right_semi),
4800            (JoinType::RightAnti, expected_right_anti),
4801            (JoinType::LeftMark, expected_left_mark),
4802            (JoinType::RightMark, expected_right_mark),
4803        ];
4804
4805        for (join_type, expected) in test_cases {
4806            let (_, batches, metrics) = join_collect_with_partition_mode(
4807                Arc::clone(&left),
4808                Arc::clone(&right),
4809                on.clone(),
4810                &join_type,
4811                PartitionMode::CollectLeft,
4812                NullEquality::NullEqualsNothing,
4813                Arc::clone(&task_ctx),
4814            )
4815            .await?;
4816            assert_batches_sorted_eq!(expected, &batches);
4817            assert_join_metrics!(metrics, expected.len() - 4);
4818        }
4819
4820        Ok(())
4821    }
4822
4823    #[tokio::test]
4824    async fn join_date32() -> Result<()> {
4825        let schema = Arc::new(Schema::new(vec![
4826            Field::new("date", DataType::Date32, false),
4827            Field::new("n", DataType::Int32, false),
4828        ]));
4829
4830        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4831        let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4832        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4833        let left =
4834            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4835                .unwrap();
4836        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4837        let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4838        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4839        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4840        let on = vec![(
4841            Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4842            Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4843        )];
4844
4845        let join = join(
4846            left,
4847            right,
4848            on,
4849            &JoinType::Inner,
4850            NullEquality::NullEqualsNothing,
4851        )?;
4852
4853        let task_ctx = Arc::new(TaskContext::default());
4854        let stream = join.execute(0, task_ctx)?;
4855        let batches = common::collect(stream).await?;
4856
4857        allow_duplicates! {
4858            assert_snapshot!(batches_to_sort_string(&batches), @r"
4859            +------------+---+------------+---+
4860            | date       | n | date       | n |
4861            +------------+---+------------+---+
4862            | 2022-04-26 | 2 | 2022-04-26 | 4 |
4863            | 2022-04-26 | 2 | 2022-04-26 | 5 |
4864            | 2022-04-27 | 3 | 2022-04-27 | 6 |
4865            +------------+---+------------+---+
4866            ");
4867        }
4868
4869        Ok(())
4870    }
4871
4872    #[tokio::test]
4873    async fn join_with_error_right() {
4874        let left = build_table(
4875            ("a1", &vec![1, 2, 3]),
4876            ("b1", &vec![4, 5, 7]),
4877            ("c1", &vec![7, 8, 9]),
4878        );
4879
4880        // right input stream returns one good batch and then one error.
4881        // The error should be returned.
4882        let err = exec_err!("bad data error");
4883        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4884
4885        let on = vec![(
4886            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4887            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4888        )];
4889        let schema = right.schema();
4890        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4891        let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4892
4893        let join_types = vec![
4894            JoinType::Inner,
4895            JoinType::Left,
4896            JoinType::Right,
4897            JoinType::Full,
4898            JoinType::LeftSemi,
4899            JoinType::LeftAnti,
4900            JoinType::RightSemi,
4901            JoinType::RightAnti,
4902        ];
4903
4904        for join_type in join_types {
4905            let join = join(
4906                Arc::clone(&left),
4907                Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4908                on.clone(),
4909                &join_type,
4910                NullEquality::NullEqualsNothing,
4911            )
4912            .unwrap();
4913            let task_ctx = Arc::new(TaskContext::default());
4914
4915            let stream = join.execute(0, task_ctx).unwrap();
4916
4917            // Expect that an error is returned
4918            let result_string = common::collect(stream).await.unwrap_err().to_string();
4919            assert!(
4920                result_string.contains("bad data error"),
4921                "actual: {result_string}"
4922            );
4923        }
4924    }
4925
4926    #[tokio::test]
4927    async fn join_split_batch() {
4928        let left = build_table(
4929            ("a1", &vec![1, 2, 3, 4]),
4930            ("b1", &vec![1, 1, 1, 1]),
4931            ("c1", &vec![0, 0, 0, 0]),
4932        );
4933        let right = build_table(
4934            ("a2", &vec![10, 20, 30, 40, 50]),
4935            ("b2", &vec![1, 1, 1, 1, 1]),
4936            ("c2", &vec![0, 0, 0, 0, 0]),
4937        );
4938        let on = vec![(
4939            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4940            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4941        )];
4942
4943        let join_types = vec![
4944            JoinType::Inner,
4945            JoinType::Left,
4946            JoinType::Right,
4947            JoinType::Full,
4948            JoinType::RightSemi,
4949            JoinType::RightAnti,
4950            JoinType::LeftSemi,
4951            JoinType::LeftAnti,
4952        ];
4953        let expected_resultset_records = 20;
4954        let common_result = [
4955            "+----+----+----+----+----+----+",
4956            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4957            "+----+----+----+----+----+----+",
4958            "| 1  | 1  | 0  | 10 | 1  | 0  |",
4959            "| 2  | 1  | 0  | 10 | 1  | 0  |",
4960            "| 3  | 1  | 0  | 10 | 1  | 0  |",
4961            "| 4  | 1  | 0  | 10 | 1  | 0  |",
4962            "| 1  | 1  | 0  | 20 | 1  | 0  |",
4963            "| 2  | 1  | 0  | 20 | 1  | 0  |",
4964            "| 3  | 1  | 0  | 20 | 1  | 0  |",
4965            "| 4  | 1  | 0  | 20 | 1  | 0  |",
4966            "| 1  | 1  | 0  | 30 | 1  | 0  |",
4967            "| 2  | 1  | 0  | 30 | 1  | 0  |",
4968            "| 3  | 1  | 0  | 30 | 1  | 0  |",
4969            "| 4  | 1  | 0  | 30 | 1  | 0  |",
4970            "| 1  | 1  | 0  | 40 | 1  | 0  |",
4971            "| 2  | 1  | 0  | 40 | 1  | 0  |",
4972            "| 3  | 1  | 0  | 40 | 1  | 0  |",
4973            "| 4  | 1  | 0  | 40 | 1  | 0  |",
4974            "| 1  | 1  | 0  | 50 | 1  | 0  |",
4975            "| 2  | 1  | 0  | 50 | 1  | 0  |",
4976            "| 3  | 1  | 0  | 50 | 1  | 0  |",
4977            "| 4  | 1  | 0  | 50 | 1  | 0  |",
4978            "+----+----+----+----+----+----+",
4979        ];
4980        let left_batch = [
4981            "+----+----+----+",
4982            "| a1 | b1 | c1 |",
4983            "+----+----+----+",
4984            "| 1  | 1  | 0  |",
4985            "| 2  | 1  | 0  |",
4986            "| 3  | 1  | 0  |",
4987            "| 4  | 1  | 0  |",
4988            "+----+----+----+",
4989        ];
4990        let right_batch = [
4991            "+----+----+----+",
4992            "| a2 | b2 | c2 |",
4993            "+----+----+----+",
4994            "| 10 | 1  | 0  |",
4995            "| 20 | 1  | 0  |",
4996            "| 30 | 1  | 0  |",
4997            "| 40 | 1  | 0  |",
4998            "| 50 | 1  | 0  |",
4999            "+----+----+----+",
5000        ];
5001        let right_empty = [
5002            "+----+----+----+",
5003            "| a2 | b2 | c2 |",
5004            "+----+----+----+",
5005            "+----+----+----+",
5006        ];
5007        let left_empty = [
5008            "+----+----+----+",
5009            "| a1 | b1 | c1 |",
5010            "+----+----+----+",
5011            "+----+----+----+",
5012        ];
5013
5014        // validation of partial join results output for different batch_size setting
5015        for join_type in join_types {
5016            for batch_size in (1..21).rev() {
5017                let task_ctx = prepare_task_ctx(batch_size, true);
5018
5019                let join = join(
5020                    Arc::clone(&left),
5021                    Arc::clone(&right),
5022                    on.clone(),
5023                    &join_type,
5024                    NullEquality::NullEqualsNothing,
5025                )
5026                .unwrap();
5027
5028                let stream = join.execute(0, task_ctx).unwrap();
5029                let batches = common::collect(stream).await.unwrap();
5030
5031                // For inner/right join expected batch count equals dev_ceil result,
5032                // as there is no need to append non-joined build side data.
5033                // For other join types it'll be div_ceil + 1 -- for additional batch
5034                // containing not visited build side rows (empty in this test case).
5035                let expected_batch_count = match join_type {
5036                    JoinType::Inner
5037                    | JoinType::Right
5038                    | JoinType::RightSemi
5039                    | JoinType::RightAnti => {
5040                        div_ceil(expected_resultset_records, batch_size)
5041                    }
5042                    _ => div_ceil(expected_resultset_records, batch_size) + 1,
5043                };
5044                // With batch coalescing, we may have fewer batches than expected
5045                assert!(
5046                    batches.len() <= expected_batch_count,
5047                    "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
5048                    batches.len()
5049                );
5050
5051                let expected = match join_type {
5052                    JoinType::RightSemi => right_batch.to_vec(),
5053                    JoinType::RightAnti => right_empty.to_vec(),
5054                    JoinType::LeftSemi => left_batch.to_vec(),
5055                    JoinType::LeftAnti => left_empty.to_vec(),
5056                    _ => common_result.to_vec(),
5057                };
5058                // For anti joins with empty results, we may get zero batches
5059                // (with coalescing) instead of one empty batch with schema
5060                if batches.is_empty() {
5061                    // Verify this is an expected empty result case
5062                    assert!(
5063                        matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
5064                        "Unexpected empty result for {join_type} join"
5065                    );
5066                } else {
5067                    assert_batches_eq!(expected, &batches);
5068                }
5069            }
5070        }
5071    }
5072
5073    #[tokio::test]
5074    async fn single_partition_join_overallocation() -> Result<()> {
5075        let left = build_table(
5076            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5077            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5078            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5079        );
5080        let right = build_table(
5081            ("a2", &vec![10, 11]),
5082            ("b2", &vec![12, 13]),
5083            ("c2", &vec![14, 15]),
5084        );
5085        let on = vec![(
5086            Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
5087            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5088        )];
5089
5090        let join_types = vec![
5091            JoinType::Inner,
5092            JoinType::Left,
5093            JoinType::Right,
5094            JoinType::Full,
5095            JoinType::LeftSemi,
5096            JoinType::LeftAnti,
5097            JoinType::RightSemi,
5098            JoinType::RightAnti,
5099            JoinType::LeftMark,
5100            JoinType::RightMark,
5101        ];
5102
5103        for join_type in join_types {
5104            let runtime = RuntimeEnvBuilder::new()
5105                .with_memory_limit(100, 1.0)
5106                .build_arc()?;
5107            let task_ctx = TaskContext::default().with_runtime(runtime);
5108            let task_ctx = Arc::new(task_ctx);
5109
5110            let join = join(
5111                Arc::clone(&left),
5112                Arc::clone(&right),
5113                on.clone(),
5114                &join_type,
5115                NullEquality::NullEqualsNothing,
5116            )?;
5117
5118            let stream = join.execute(0, task_ctx)?;
5119            let err = common::collect(stream).await.unwrap_err();
5120
5121            // Asserting that operator-level reservation attempting to overallocate
5122            assert_contains!(
5123                err.to_string(),
5124                "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n  HashJoinInput"
5125            );
5126
5127            assert_contains!(
5128                err.to_string(),
5129                "Failed to allocate additional 120.0 B for HashJoinInput"
5130            );
5131        }
5132
5133        Ok(())
5134    }
5135
5136    #[tokio::test]
5137    async fn partitioned_join_overallocation() -> Result<()> {
5138        // Prepare partitioned inputs for HashJoinExec
5139        // No need to adjust partitioning, as execution should fail with `Resources exhausted` error
5140        let left_batch = build_table_i32(
5141            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5142            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5143            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5144        );
5145        let left = TestMemoryExec::try_new_exec(
5146            &[vec![left_batch.clone()], vec![left_batch.clone()]],
5147            left_batch.schema(),
5148            None,
5149        )
5150        .unwrap();
5151        let right_batch = build_table_i32(
5152            ("a2", &vec![10, 11]),
5153            ("b2", &vec![12, 13]),
5154            ("c2", &vec![14, 15]),
5155        );
5156        let right = TestMemoryExec::try_new_exec(
5157            &[vec![right_batch.clone()], vec![right_batch.clone()]],
5158            right_batch.schema(),
5159            None,
5160        )
5161        .unwrap();
5162        let on = vec![(
5163            Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
5164            Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
5165        )];
5166
5167        let join_types = vec![
5168            JoinType::Inner,
5169            JoinType::Left,
5170            JoinType::Right,
5171            JoinType::Full,
5172            JoinType::LeftSemi,
5173            JoinType::LeftAnti,
5174            JoinType::RightSemi,
5175            JoinType::RightAnti,
5176        ];
5177
5178        for join_type in join_types {
5179            let runtime = RuntimeEnvBuilder::new()
5180                .with_memory_limit(100, 1.0)
5181                .build_arc()?;
5182            let session_config = SessionConfig::default().with_batch_size(50);
5183            let task_ctx = TaskContext::default()
5184                .with_session_config(session_config)
5185                .with_runtime(runtime);
5186            let task_ctx = Arc::new(task_ctx);
5187
5188            let join = HashJoinExec::try_new(
5189                Arc::clone(&left) as Arc<dyn ExecutionPlan>,
5190                Arc::clone(&right) as Arc<dyn ExecutionPlan>,
5191                on.clone(),
5192                None,
5193                &join_type,
5194                None,
5195                PartitionMode::Partitioned,
5196                NullEquality::NullEqualsNothing,
5197                false,
5198            )?;
5199
5200            let stream = join.execute(1, task_ctx)?;
5201            let err = common::collect(stream).await.unwrap_err();
5202
5203            // Asserting that stream-level reservation attempting to overallocate
5204            assert_contains!(
5205                err.to_string(),
5206                "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n  HashJoinInput[1]"
5207            );
5208
5209            assert_contains!(
5210                err.to_string(),
5211                "Failed to allocate additional 120.0 B for HashJoinInput[1]"
5212            );
5213        }
5214
5215        Ok(())
5216    }
5217
5218    fn build_table_struct(
5219        struct_name: &str,
5220        field_name_and_values: (&str, &Vec<Option<i32>>),
5221        nulls: Option<NullBuffer>,
5222    ) -> Arc<dyn ExecutionPlan> {
5223        let (field_name, values) = field_name_and_values;
5224        let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
5225        let schema = Schema::new(vec![Field::new(
5226            struct_name,
5227            DataType::Struct(inner_fields.clone().into()),
5228            nulls.is_some(),
5229        )]);
5230
5231        let batch = RecordBatch::try_new(
5232            Arc::new(schema),
5233            vec![Arc::new(StructArray::new(
5234                inner_fields.into(),
5235                vec![Arc::new(Int32Array::from(values.clone()))],
5236                nulls,
5237            ))],
5238        )
5239        .unwrap();
5240        let schema_ref = batch.schema();
5241        TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
5242    }
5243
5244    #[tokio::test]
5245    async fn join_on_struct() -> Result<()> {
5246        let task_ctx = Arc::new(TaskContext::default());
5247        let left =
5248            build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
5249        let right =
5250            build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
5251        let on = vec![(
5252            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5253            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5254        )];
5255
5256        let (columns, batches, metrics) = join_collect(
5257            left,
5258            right,
5259            on,
5260            &JoinType::Inner,
5261            NullEquality::NullEqualsNothing,
5262            task_ctx,
5263        )
5264        .await?;
5265
5266        assert_eq!(columns, vec!["n1", "n2"]);
5267
5268        allow_duplicates! {
5269            assert_snapshot!(batches_to_string(&batches), @r"
5270            +--------+--------+
5271            | n1     | n2     |
5272            +--------+--------+
5273            | {a: }  | {a: }  |
5274            | {a: 1} | {a: 1} |
5275            | {a: 2} | {a: 2} |
5276            +--------+--------+
5277            ");
5278        }
5279
5280        assert_join_metrics!(metrics, 3);
5281
5282        Ok(())
5283    }
5284
5285    #[tokio::test]
5286    async fn join_on_struct_with_nulls() -> Result<()> {
5287        let task_ctx = Arc::new(TaskContext::default());
5288        let left =
5289            build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5290        let right =
5291            build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5292        let on = vec![(
5293            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5294            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5295        )];
5296
5297        let (_, batches_null_eq, metrics) = join_collect(
5298            Arc::clone(&left),
5299            Arc::clone(&right),
5300            on.clone(),
5301            &JoinType::Inner,
5302            NullEquality::NullEqualsNull,
5303            Arc::clone(&task_ctx),
5304        )
5305        .await?;
5306
5307        allow_duplicates! {
5308            assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
5309            +----+----+
5310            | n1 | n2 |
5311            +----+----+
5312            |    |    |
5313            +----+----+
5314            ");
5315        }
5316
5317        assert_join_metrics!(metrics, 1);
5318
5319        let (_, batches_null_neq, metrics) = join_collect(
5320            left,
5321            right,
5322            on,
5323            &JoinType::Inner,
5324            NullEquality::NullEqualsNothing,
5325            task_ctx,
5326        )
5327        .await?;
5328
5329        assert_join_metrics!(metrics, 0);
5330
5331        // With batch coalescing, empty results may not emit any batches
5332        // Check that either we have no batches, or an empty batch with proper schema
5333        if batches_null_neq.is_empty() {
5334            // This is fine - no output rows
5335        } else {
5336            let expected_null_neq =
5337                ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
5338            assert_batches_eq!(expected_null_neq, &batches_null_neq);
5339        }
5340
5341        Ok(())
5342    }
5343
5344    /// Returns the column names on the schema
5345    fn columns(schema: &Schema) -> Vec<String> {
5346        schema.fields().iter().map(|f| f.name().clone()).collect()
5347    }
5348
5349    /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
5350    #[tokio::test]
5351    async fn test_hash_join_marks_filter_complete() -> Result<()> {
5352        let task_ctx = Arc::new(TaskContext::default());
5353        let left = build_table(
5354            ("a1", &vec![1, 2, 3]),
5355            ("b1", &vec![4, 5, 6]),
5356            ("c1", &vec![7, 8, 9]),
5357        );
5358        let right = build_table(
5359            ("a2", &vec![10, 20, 30]),
5360            ("b1", &vec![4, 5, 6]),
5361            ("c2", &vec![70, 80, 90]),
5362        );
5363
5364        let on = vec![(
5365            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5366            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5367        )];
5368
5369        // Create a dynamic filter manually
5370        let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5371        let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5372
5373        // Create HashJoinExec with the dynamic filter
5374        let mut join = HashJoinExec::try_new(
5375            left,
5376            right,
5377            on,
5378            None,
5379            &JoinType::Inner,
5380            None,
5381            PartitionMode::CollectLeft,
5382            NullEquality::NullEqualsNothing,
5383            false,
5384        )?;
5385        join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5386            filter: dynamic_filter,
5387            build_accumulator: OnceLock::new(),
5388        });
5389
5390        // Execute the join
5391        let stream = join.execute(0, task_ctx)?;
5392        let _batches = common::collect(stream).await?;
5393
5394        // After the join completes, the dynamic filter should be marked as complete
5395        // wait_complete() should return immediately
5396        dynamic_filter_clone.wait_complete().await;
5397
5398        Ok(())
5399    }
5400
5401    /// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
5402    #[tokio::test]
5403    async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
5404        let task_ctx = Arc::new(TaskContext::default());
5405        // Empty left side (build side)
5406        let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
5407        let right = build_table(
5408            ("a2", &vec![10, 20, 30]),
5409            ("b1", &vec![4, 5, 6]),
5410            ("c2", &vec![70, 80, 90]),
5411        );
5412
5413        let on = vec![(
5414            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5415            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5416        )];
5417
5418        // Create a dynamic filter manually
5419        let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5420        let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5421
5422        // Create HashJoinExec with the dynamic filter
5423        let mut join = HashJoinExec::try_new(
5424            left,
5425            right,
5426            on,
5427            None,
5428            &JoinType::Inner,
5429            None,
5430            PartitionMode::CollectLeft,
5431            NullEquality::NullEqualsNothing,
5432            false,
5433        )?;
5434        join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5435            filter: dynamic_filter,
5436            build_accumulator: OnceLock::new(),
5437        });
5438
5439        // Execute the join
5440        let stream = join.execute(0, task_ctx)?;
5441        let _batches = common::collect(stream).await?;
5442
5443        // Even with empty build side, the dynamic filter should be marked as complete
5444        // wait_complete() should return immediately
5445        dynamic_filter_clone.wait_complete().await;
5446
5447        Ok(())
5448    }
5449
5450    #[tokio::test]
5451    async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> {
5452        let task_ctx = prepare_task_ctx(8192, true);
5453        let (left_schema, right_schema, on) = build_schema_and_on()?;
5454
5455        let left_batch = RecordBatch::try_new(
5456            Arc::clone(&left_schema),
5457            vec![
5458                Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
5459                Arc::new(Int32Array::from(vec![-1, 0, 1])) as ArrayRef,
5460            ],
5461        )?;
5462        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5463
5464        let right_batch = RecordBatch::try_new(
5465            Arc::clone(&right_schema),
5466            vec![
5467                Arc::new(Int32Array::from(vec![10, 20, 30, 40])) as ArrayRef,
5468                Arc::new(Int32Array::from(vec![1, -1, 0, 2])) as ArrayRef,
5469            ],
5470        )?;
5471        let right =
5472            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5473
5474        let (columns, batches, metrics) = join_collect(
5475            left,
5476            right,
5477            on,
5478            &JoinType::Inner,
5479            NullEquality::NullEqualsNothing,
5480            task_ctx,
5481        )
5482        .await?;
5483
5484        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5485
5486        assert_batches_sorted_eq!(
5487            [
5488                "+----+----+----+----+",
5489                "| a1 | b1 | a2 | b1 |",
5490                "+----+----+----+----+",
5491                "| 1  | -1 | 20 | -1 |",
5492                "| 2  | 0  | 30 | 0  |",
5493                "| 3  | 1  | 10 | 1  |",
5494                "+----+----+----+----+",
5495            ],
5496            &batches
5497        );
5498
5499        assert_phj_used(&metrics, true);
5500
5501        Ok(())
5502    }
5503
5504    #[tokio::test]
5505    async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
5506        let task_ctx = prepare_task_ctx(8192, true);
5507        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5508        let batch = RecordBatch::try_new(
5509            Arc::clone(&schema),
5510            vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
5511        )?;
5512        let left = TestMemoryExec::try_new_exec(
5513            &[vec![batch.clone()]],
5514            Arc::clone(&schema),
5515            None,
5516        )?;
5517        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
5518        let on: JoinOn = vec![(
5519            Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
5520            Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
5521        )];
5522        let (_columns, batches, _metrics) = join_collect(
5523            left,
5524            right,
5525            on,
5526            &JoinType::Inner,
5527            NullEquality::NullEqualsNothing,
5528            task_ctx,
5529        )
5530        .await?;
5531        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
5532        assert_eq!(total_rows, 2);
5533        Ok(())
5534    }
5535
5536    #[apply(hash_join_exec_configs)]
5537    #[tokio::test]
5538    async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
5539        batch_size: usize,
5540        use_perfect_hash_join_as_possible: bool,
5541    ) -> Result<()> {
5542        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5543        let (left_schema, right_schema, on) = build_schema_and_on()?;
5544
5545        let left_batch = RecordBatch::try_new(
5546            Arc::clone(&left_schema),
5547            vec![
5548                Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
5549                Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef,
5550            ],
5551        )?;
5552        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5553
5554        let right_batch = RecordBatch::try_new(
5555            Arc::clone(&right_schema),
5556            vec![
5557                Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef,
5558                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5559            ],
5560        )?;
5561        let right =
5562            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5563
5564        let (columns, batches, metrics) = join_collect(
5565            left,
5566            right,
5567            on,
5568            &JoinType::Inner,
5569            NullEquality::NullEqualsNull,
5570            task_ctx,
5571        )
5572        .await?;
5573
5574        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5575        assert_batches_sorted_eq!(
5576            [
5577                "+----+----+----+----+",
5578                "| a1 | b1 | a2 | b1 |",
5579                "+----+----+----+----+",
5580                "| 1  | 10 | 3  | 10 |",
5581                "+----+----+----+----+",
5582            ],
5583            &batches
5584        );
5585
5586        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5587
5588        Ok(())
5589    }
5590
5591    #[apply(hash_join_exec_configs)]
5592    #[tokio::test]
5593    async fn test_phj_null_equals_nothing_build_probe_all_have_nulls(
5594        batch_size: usize,
5595        use_perfect_hash_join_as_possible: bool,
5596    ) -> Result<()> {
5597        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5598        let (left_schema, right_schema, on) = build_schema_and_on()?;
5599
5600        let left_batch = RecordBatch::try_new(
5601            Arc::clone(&left_schema),
5602            vec![
5603                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef,
5604                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5605            ],
5606        )?;
5607        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5608
5609        let right_batch = RecordBatch::try_new(
5610            Arc::clone(&right_schema),
5611            vec![
5612                Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5613                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5614            ],
5615        )?;
5616        let right =
5617            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5618
5619        let (columns, batches, metrics) = join_collect(
5620            left,
5621            right,
5622            on,
5623            &JoinType::Inner,
5624            NullEquality::NullEqualsNothing,
5625            task_ctx,
5626        )
5627        .await?;
5628
5629        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5630        assert_batches_sorted_eq!(
5631            [
5632                "+----+----+----+----+",
5633                "| a1 | b1 | a2 | b1 |",
5634                "+----+----+----+----+",
5635                "| 1  | 10 | 3  | 10 |",
5636                "+----+----+----+----+",
5637            ],
5638            &batches
5639        );
5640
5641        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5642
5643        Ok(())
5644    }
5645
5646    #[tokio::test]
5647    async fn test_phj_null_equals_null_build_have_nulls() -> Result<()> {
5648        let task_ctx = prepare_task_ctx(8192, true);
5649        let (left_schema, right_schema, on) = build_schema_and_on()?;
5650
5651        let left_batch = RecordBatch::try_new(
5652            Arc::clone(&left_schema),
5653            vec![
5654                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef,
5655                Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef,
5656            ],
5657        )?;
5658        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5659
5660        let right_batch = RecordBatch::try_new(
5661            Arc::clone(&right_schema),
5662            vec![
5663                Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5664                Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef,
5665            ],
5666        )?;
5667        let right =
5668            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5669
5670        let (columns, batches, metrics) = join_collect(
5671            left,
5672            right,
5673            on,
5674            &JoinType::Inner,
5675            NullEquality::NullEqualsNull,
5676            task_ctx,
5677        )
5678        .await?;
5679
5680        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5681        assert_batches_sorted_eq!(
5682            [
5683                "+----+----+----+----+",
5684                "| a1 | b1 | a2 | b1 |",
5685                "+----+----+----+----+",
5686                "| 1  | 10 | 3  | 10 |",
5687                "+----+----+----+----+",
5688            ],
5689            &batches
5690        );
5691
5692        assert_phj_used(&metrics, false);
5693
5694        Ok(())
5695    }
5696
5697    /// Test null-aware anti join when probe side (right) contains NULL
5698    /// Expected: no rows should be output (NULL in subquery means all results are unknown)
5699    #[apply(hash_join_exec_configs)]
5700    #[tokio::test]
5701    async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> {
5702        let task_ctx = prepare_task_ctx(batch_size, false);
5703
5704        // Build left table (rows to potentially output)
5705        let left = build_table_two_cols(
5706            ("c1", &vec![Some(1), Some(2), Some(3), Some(4)]),
5707            ("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]),
5708        );
5709
5710        // Build right table (subquery with NULL)
5711        let right = build_table_two_cols(
5712            ("c2", &vec![Some(1), Some(2), Some(3), None]),
5713            ("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]),
5714        );
5715
5716        let on = vec![(
5717            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5718            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5719        )];
5720
5721        // Create null-aware anti join
5722        let join = HashJoinExec::try_new(
5723            left,
5724            right,
5725            on,
5726            None,
5727            &JoinType::LeftAnti,
5728            None,
5729            PartitionMode::CollectLeft,
5730            NullEquality::NullEqualsNothing,
5731            true, // null_aware = true
5732        )?;
5733
5734        let stream = join.execute(0, task_ctx)?;
5735        let batches = common::collect(stream).await?;
5736
5737        // Expected: empty result (probe side has NULL, so no rows should be output)
5738        allow_duplicates! {
5739            assert_snapshot!(batches_to_sort_string(&batches), @r"
5740            ++
5741            ++
5742            ");
5743        }
5744        Ok(())
5745    }
5746
5747    /// Test null-aware anti join when build side (left) contains NULL keys
5748    /// Expected: rows with NULL keys should not be output
5749    #[apply(hash_join_exec_configs)]
5750    #[tokio::test]
5751    async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> {
5752        let task_ctx = prepare_task_ctx(batch_size, false);
5753
5754        // Build left table with NULL key (this row should not be output)
5755        let left = build_table_two_cols(
5756            ("c1", &vec![Some(1), Some(4), None]),
5757            ("dummy", &vec![Some(10), Some(40), Some(0)]),
5758        );
5759
5760        // Build right table (no NULL, so probe-side check passes)
5761        let right = build_table_two_cols(
5762            ("c2", &vec![Some(1), Some(2), Some(3)]),
5763            ("dummy", &vec![Some(100), Some(200), Some(300)]),
5764        );
5765
5766        let on = vec![(
5767            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5768            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5769        )];
5770
5771        // Create null-aware anti join
5772        let join = HashJoinExec::try_new(
5773            left,
5774            right,
5775            on,
5776            None,
5777            &JoinType::LeftAnti,
5778            None,
5779            PartitionMode::CollectLeft,
5780            NullEquality::NullEqualsNothing,
5781            true, // null_aware = true
5782        )?;
5783
5784        let stream = join.execute(0, task_ctx)?;
5785        let batches = common::collect(stream).await?;
5786
5787        // Expected: only c1=4 (not c1=1 which matches, not c1=NULL)
5788        allow_duplicates! {
5789            assert_snapshot!(batches_to_sort_string(&batches), @r"
5790            +----+-------+
5791            | c1 | dummy |
5792            +----+-------+
5793            | 4  | 40    |
5794            +----+-------+
5795            ");
5796        }
5797        Ok(())
5798    }
5799
5800    /// Test null-aware anti join with no NULLs (should work like regular anti join)
5801    #[apply(hash_join_exec_configs)]
5802    #[tokio::test]
5803    async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> {
5804        let task_ctx = prepare_task_ctx(batch_size, false);
5805
5806        // Build left table (no NULLs)
5807        let left = build_table_two_cols(
5808            ("c1", &vec![Some(1), Some(2), Some(4), Some(5)]),
5809            ("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]),
5810        );
5811
5812        // Build right table (no NULLs)
5813        let right = build_table_two_cols(
5814            ("c2", &vec![Some(1), Some(2), Some(3)]),
5815            ("dummy", &vec![Some(100), Some(200), Some(300)]),
5816        );
5817
5818        let on = vec![(
5819            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5820            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5821        )];
5822
5823        // Create null-aware anti join
5824        let join = HashJoinExec::try_new(
5825            left,
5826            right,
5827            on,
5828            None,
5829            &JoinType::LeftAnti,
5830            None,
5831            PartitionMode::CollectLeft,
5832            NullEquality::NullEqualsNothing,
5833            true, // null_aware = true
5834        )?;
5835
5836        let stream = join.execute(0, task_ctx)?;
5837        let batches = common::collect(stream).await?;
5838
5839        // Expected: c1=4 and c1=5 (they don't match anything in right)
5840        allow_duplicates! {
5841            assert_snapshot!(batches_to_sort_string(&batches), @r"
5842            +----+-------+
5843            | c1 | dummy |
5844            +----+-------+
5845            | 4  | 40    |
5846            | 5  | 50    |
5847            +----+-------+
5848            ");
5849        }
5850        Ok(())
5851    }
5852
5853    /// Test that null_aware validation rejects non-LeftAnti join types
5854    #[tokio::test]
5855    async fn test_null_aware_validation_wrong_join_type() {
5856        let left =
5857            build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)]));
5858        let right =
5859            build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)]));
5860
5861        let on = vec![(
5862            Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _,
5863            Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _,
5864        )];
5865
5866        // Try to create null-aware Inner join (should fail)
5867        let result = HashJoinExec::try_new(
5868            left,
5869            right,
5870            on,
5871            None,
5872            &JoinType::Inner,
5873            None,
5874            PartitionMode::CollectLeft,
5875            NullEquality::NullEqualsNothing,
5876            true, // null_aware = true (invalid for Inner join)
5877        );
5878
5879        assert!(result.is_err());
5880        assert!(
5881            result
5882                .unwrap_err()
5883                .to_string()
5884                .contains("null_aware can only be true for LeftAnti joins")
5885        );
5886    }
5887
5888    /// Test that null_aware validation rejects multi-column joins
5889    #[tokio::test]
5890    async fn test_null_aware_validation_multi_column() {
5891        let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3]));
5892        let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3]));
5893
5894        // Try multi-column join
5895        let on = vec![
5896            (
5897                Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
5898                Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _,
5899            ),
5900            (
5901                Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _,
5902                Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _,
5903            ),
5904        ];
5905
5906        // Try to create null-aware anti join with 2 columns (should fail)
5907        let result = HashJoinExec::try_new(
5908            left,
5909            right,
5910            on,
5911            None,
5912            &JoinType::LeftAnti,
5913            None,
5914            PartitionMode::CollectLeft,
5915            NullEquality::NullEqualsNothing,
5916            true, // null_aware = true (invalid for multi-column)
5917        );
5918
5919        assert!(result.is_err());
5920        assert!(
5921            result
5922                .unwrap_err()
5923                .to_string()
5924                .contains("null_aware anti join only supports single column join key")
5925        );
5926    }
5927
5928    #[test]
5929    fn test_lr_is_preserved() {
5930        assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
5931        assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
5932        assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
5933        assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
5934        assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true));
5935        assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true));
5936        assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
5937        assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true));
5938        assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true));
5939        assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
5940    }
5941}