Skip to main content

datafusion_physical_plan/joins/hash_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::fmt;
20use std::mem::size_of;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, OnceLock};
23use std::vec;
24
25use crate::ExecutionPlanProperties;
26use crate::execution_plan::{
27    EmissionType, boundedness_from_children, has_same_children_properties,
28    stub_properties,
29};
30use crate::filter_pushdown::{
31    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32    FilterPushdownPropagation,
33};
34use crate::joins::Map;
35use crate::joins::array_map::ArrayMap;
36use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
37use crate::joins::hash_join::shared_bounds::{
38    ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
39};
40use crate::joins::hash_join::stream::{
41    BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
42};
43use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
44use crate::joins::utils::{
45    OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
46    swap_join_projection, update_hash,
47};
48use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
49use crate::metrics::{Count, MetricBuilder, MetricCategory};
50use crate::projection::{
51    EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
52    try_pushdown_through_join,
53};
54use crate::repartition::REPARTITION_RANDOM_STATE;
55use crate::spill::get_record_batch_memory_size;
56use crate::{
57    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
58    PlanProperties, SendableRecordBatchStream, Statistics,
59    common::can_project,
60    joins::utils::{
61        BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
62        build_join_schema, check_join_is_valid, estimate_join_statistics,
63        need_produce_result_in_final, symmetric_join_output_partitioning,
64    },
65    metrics::{ExecutionPlanMetricsSet, MetricsSet},
66};
67
68use arrow::array::{ArrayRef, BooleanBufferBuilder};
69use arrow::compute::concat_batches;
70use arrow::datatypes::SchemaRef;
71use arrow::record_batch::RecordBatch;
72use arrow::util::bit_util;
73use arrow_schema::{DataType, Schema};
74use datafusion_common::config::ConfigOptions;
75use datafusion_common::utils::memory::estimate_memory_size;
76use datafusion_common::{
77    JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
78    plan_err, project_schema,
79};
80use datafusion_execution::TaskContext;
81use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
82use datafusion_expr::Accumulator;
83use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
84use datafusion_physical_expr::equivalence::{
85    ProjectionMapping, join_equivalence_properties,
86};
87use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
88use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
89use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
90
91use datafusion_common::hash_utils::RandomState;
92use datafusion_physical_expr_common::physical_expr::fmt_sql;
93use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
94use futures::TryStreamExt;
95use parking_lot::Mutex;
96
97use super::partitioned_hash_eval::SeededRandomState;
98
99/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
100pub(crate) const HASH_JOIN_SEED: SeededRandomState =
101    SeededRandomState::with_seed(12210250226015887276);
102
103const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";
104
105#[expect(clippy::too_many_arguments)]
106fn try_create_array_map(
107    bounds: &Option<PartitionBounds>,
108    schema: &SchemaRef,
109    batches: &[RecordBatch],
110    on_left: &[PhysicalExprRef],
111    reservation: &mut MemoryReservation,
112    perfect_hash_join_small_build_threshold: usize,
113    perfect_hash_join_min_key_density: f64,
114    null_equality: NullEquality,
115) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
116    if on_left.len() != 1 {
117        return Ok(None);
118    }
119
120    if null_equality == NullEquality::NullEqualsNull {
121        for batch in batches.iter() {
122            let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
123            if arrays[0].null_count() > 0 {
124                return Ok(None);
125            }
126        }
127    }
128
129    let (min_val, max_val) = if let Some(bounds) = bounds {
130        let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) {
131            (cb.min.clone(), cb.max.clone())
132        } else {
133            return Ok(None);
134        };
135
136        if min_val.is_null() || max_val.is_null() {
137            return Ok(None);
138        }
139
140        if min_val > max_val {
141            return internal_err!("min_val>max_val");
142        }
143
144        if let Some((mi, ma)) =
145            ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val))
146        {
147            (mi, ma)
148        } else {
149            return Ok(None);
150        }
151    } else {
152        return Ok(None);
153    };
154
155    let range = ArrayMap::calculate_range(min_val, max_val);
156    let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
157
158    // TODO: support create ArrayMap<u64>
159    if num_row >= u32::MAX as usize {
160        return Ok(None);
161    }
162
163    // When the key range spans the full integer domain (e.g. i64::MIN to i64::MAX),
164    // range is u64::MAX and `range + 1` below would overflow.
165    if range == usize::MAX as u64 {
166        return Ok(None);
167    }
168
169    let dense_ratio = (num_row as f64) / ((range + 1) as f64);
170
171    if range >= perfect_hash_join_small_build_threshold as u64
172        && dense_ratio <= perfect_hash_join_min_key_density
173    {
174        return Ok(None);
175    }
176
177    let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
178    reservation.try_grow(mem_size)?;
179
180    let batch = concat_batches(schema, batches)?;
181    let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;
182
183    let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
184
185    Ok(Some((array_map, batch, left_values)))
186}
187
188/// HashTable and input data for the left (build side) of a join
189pub(super) struct JoinLeftData {
190    /// The hash table with indices into `batch`
191    /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
192    pub(super) map: Arc<Map>,
193    /// The input rows for the build side
194    batch: RecordBatch,
195    /// The build side on expressions values
196    values: Vec<ArrayRef>,
197    /// Shared bitmap builder for visited left indices
198    visited_indices_bitmap: SharedBitmapBuilder,
199    /// Counter of running probe-threads, potentially
200    /// able to update `visited_indices_bitmap`
201    probe_threads_counter: AtomicUsize,
202    /// We need to keep this field to maintain accurate memory accounting, even though we don't directly use it.
203    /// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
204    /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
205    /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
206    _reservation: MemoryReservation,
207    /// Bounds computed from the build side for dynamic filter pushdown.
208    /// If the partition is empty (no rows) this will be None.
209    /// If the partition has some rows this will be Some with the bounds for each join key column.
210    pub(super) bounds: Option<PartitionBounds>,
211    /// Membership testing strategy for filter pushdown
212    /// Contains either InList values for small build sides or hash table reference for large build sides
213    pub(super) membership: PushdownStrategy,
214    /// Shared atomic flag indicating if any probe partition saw data (for null-aware anti joins)
215    /// This is shared across all probe partitions to provide global knowledge
216    pub(super) probe_side_non_empty: AtomicBool,
217    /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins)
218    pub(super) probe_side_has_null: AtomicBool,
219}
220
221impl JoinLeftData {
222    /// return a reference to the map
223    pub(super) fn map(&self) -> &Map {
224        &self.map
225    }
226
227    /// returns a reference to the build side batch
228    pub(super) fn batch(&self) -> &RecordBatch {
229        &self.batch
230    }
231
232    /// returns a reference to the build side expressions values
233    pub(super) fn values(&self) -> &[ArrayRef] {
234        &self.values
235    }
236
237    /// returns a reference to the visited indices bitmap
238    pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
239        &self.visited_indices_bitmap
240    }
241
242    /// returns a reference to the InList values for filter pushdown
243    pub(super) fn membership(&self) -> &PushdownStrategy {
244        &self.membership
245    }
246
247    /// Decrements the counter of running threads, and returns `true`
248    /// if caller is the last running thread
249    pub(super) fn report_probe_completed(&self) -> bool {
250        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
251    }
252}
253
254/// Helps to build [`HashJoinExec`].
255///
256/// Builder can be created from an existing [`HashJoinExec`] using [`From::from`].
257/// In this case, all its fields are inherited. If a field that affects the node's
258/// properties is modified, they will be automatically recomputed during the build.
259///
260/// # Adding setters
261///
262/// When adding a new setter, it is necessary to ensure that the `preserve_properties`
263/// flag is set to false if modifying the field requires a recomputation of the plan's
264/// properties.
265///
266pub struct HashJoinExecBuilder {
267    exec: HashJoinExec,
268    preserve_properties: bool,
269}
270
271impl HashJoinExecBuilder {
272    /// Make a new [`HashJoinExecBuilder`].
273    pub fn new(
274        left: Arc<dyn ExecutionPlan>,
275        right: Arc<dyn ExecutionPlan>,
276        on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
277        join_type: JoinType,
278    ) -> Self {
279        Self {
280            exec: HashJoinExec {
281                left,
282                right,
283                on,
284                filter: None,
285                join_type,
286                left_fut: Default::default(),
287                random_state: HASH_JOIN_SEED,
288                mode: PartitionMode::Auto,
289                fetch: None,
290                metrics: ExecutionPlanMetricsSet::new(),
291                projection: None,
292                column_indices: vec![],
293                null_equality: NullEquality::NullEqualsNothing,
294                null_aware: false,
295                dynamic_filter: None,
296                // Will be computed at when plan will be built.
297                cache: stub_properties(),
298                join_schema: Arc::new(Schema::empty()),
299            },
300            // As `exec` is initialized with stub properties,
301            // they will be properly computed when plan will be built.
302            preserve_properties: false,
303        }
304    }
305
306    /// Set join type.
307    pub fn with_type(mut self, join_type: JoinType) -> Self {
308        self.exec.join_type = join_type;
309        self.preserve_properties = false;
310        self
311    }
312
313    /// Set projection from the vector.
314    pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
315        self.with_projection_ref(projection.map(Into::into))
316    }
317
318    /// Set projection from the shared reference.
319    pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self {
320        self.exec.projection = projection;
321        self.preserve_properties = false;
322        self
323    }
324
325    /// Set optional filter.
326    pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
327        self.exec.filter = filter;
328        self
329    }
330
331    /// Set expressions to join on.
332    pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) -> Self {
333        self.exec.on = on;
334        self.preserve_properties = false;
335        self
336    }
337
338    /// Set partition mode.
339    pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
340        self.exec.mode = mode;
341        self.preserve_properties = false;
342        self
343    }
344
345    /// Set null equality property.
346    pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
347        self.exec.null_equality = null_equality;
348        self
349    }
350
351    /// Set null aware property.
352    pub fn with_null_aware(mut self, null_aware: bool) -> Self {
353        self.exec.null_aware = null_aware;
354        self
355    }
356
357    /// Set fetch property.
358    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
359        self.exec.fetch = fetch;
360        self
361    }
362
363    /// Require to recompute plan properties.
364    pub fn recompute_properties(mut self) -> Self {
365        self.preserve_properties = false;
366        self
367    }
368
369    /// Replace children.
370    pub fn with_new_children(
371        mut self,
372        mut children: Vec<Arc<dyn ExecutionPlan>>,
373    ) -> Result<Self> {
374        assert_or_internal_err!(
375            children.len() == 2,
376            "wrong number of children passed into `HashJoinExecBuilder`"
377        );
378        self.preserve_properties &= has_same_children_properties(&self.exec, &children)?;
379        self.exec.right = children.swap_remove(1);
380        self.exec.left = children.swap_remove(0);
381        Ok(self)
382    }
383
384    /// Reset runtime state.
385    pub fn reset_state(mut self) -> Self {
386        self.exec.left_fut = Default::default();
387        self.exec.dynamic_filter = None;
388        self.exec.metrics = ExecutionPlanMetricsSet::new();
389        self
390    }
391
392    /// Build result as a dyn execution plan.
393    pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
394        self.build().map(|p| Arc::new(p) as _)
395    }
396
397    /// Build resulting execution plan.
398    pub fn build(self) -> Result<HashJoinExec> {
399        let Self {
400            exec,
401            preserve_properties,
402        } = self;
403
404        // Validate null_aware flag
405        if exec.null_aware {
406            let join_type = exec.join_type();
407            if !matches!(join_type, JoinType::LeftAnti) {
408                return plan_err!(
409                    "null_aware can only be true for LeftAnti joins, got {join_type}"
410                );
411            }
412            let on = exec.on();
413            if on.len() != 1 {
414                return plan_err!(
415                    "null_aware anti join only supports single column join key, got {} columns",
416                    on.len()
417                );
418            }
419        }
420
421        if preserve_properties {
422            return Ok(exec);
423        }
424
425        let HashJoinExec {
426            left,
427            right,
428            on,
429            filter,
430            join_type,
431            left_fut,
432            random_state,
433            mode,
434            metrics,
435            projection,
436            null_equality,
437            null_aware,
438            dynamic_filter,
439            fetch,
440            // Recomputed.
441            join_schema: _,
442            column_indices: _,
443            cache: _,
444        } = exec;
445
446        let left_schema = left.schema();
447        let right_schema = right.schema();
448        if on.is_empty() {
449            return plan_err!("On constraints in HashJoinExec should be non-empty");
450        }
451
452        check_join_is_valid(&left_schema, &right_schema, &on)?;
453        let (join_schema, column_indices) =
454            build_join_schema(&left_schema, &right_schema, &join_type);
455
456        let join_schema = Arc::new(join_schema);
457
458        // Check if the projection is valid.
459        can_project(&join_schema, projection.as_deref())?;
460
461        let cache = HashJoinExec::compute_properties(
462            &left,
463            &right,
464            &join_schema,
465            join_type,
466            &on,
467            mode,
468            projection.as_deref(),
469        )?;
470
471        Ok(HashJoinExec {
472            left,
473            right,
474            on,
475            filter,
476            join_type,
477            join_schema,
478            left_fut,
479            random_state,
480            mode,
481            metrics,
482            projection,
483            column_indices,
484            null_equality,
485            null_aware,
486            cache: Arc::new(cache),
487            dynamic_filter,
488            fetch,
489        })
490    }
491
492    fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
493        self.exec.dynamic_filter = filter;
494        self
495    }
496}
497
498impl From<&HashJoinExec> for HashJoinExecBuilder {
499    fn from(exec: &HashJoinExec) -> Self {
500        Self {
501            exec: HashJoinExec {
502                left: Arc::clone(exec.left()),
503                right: Arc::clone(exec.right()),
504                on: exec.on.clone(),
505                filter: exec.filter.clone(),
506                join_type: exec.join_type,
507                join_schema: Arc::clone(&exec.join_schema),
508                left_fut: Arc::clone(&exec.left_fut),
509                random_state: exec.random_state.clone(),
510                mode: exec.mode,
511                metrics: exec.metrics.clone(),
512                projection: exec.projection.clone(),
513                column_indices: exec.column_indices.clone(),
514                null_equality: exec.null_equality,
515                null_aware: exec.null_aware,
516                cache: Arc::clone(&exec.cache),
517                dynamic_filter: exec.dynamic_filter.clone(),
518                fetch: exec.fetch,
519            },
520            preserve_properties: true,
521        }
522    }
523}
524
525#[expect(rustdoc::private_intra_doc_links)]
526/// Join execution plan: Evaluates equijoin predicates in parallel on multiple
527/// partitions using a hash table and an optional filter list to apply post
528/// join.
529///
530/// # Join Expressions
531///
532/// This implementation is optimized for evaluating equijoin predicates  (
533/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
534/// in [`Self::on`].
535///
536/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
537/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
538/// after the equijoin predicates.
539///
540/// # ArrayMap Optimization
541///
542/// For joins with a single integer-based join key, `HashJoinExec` may use an [`ArrayMap`]
543/// (also known as a "perfect hash join") instead of a general-purpose hash map.
544/// This optimization is used when:
545/// 1. There is exactly one join key.
546/// 2. The join key is an integer type up to 64 bits wide that can be losslessly converted
547///    to `u64` (128-bit integer types such as `i128` and `u128` are not supported).
548/// 3. The range of keys is small enough (controlled by `perfect_hash_join_small_build_threshold`)
549///    OR the keys are sufficiently dense (controlled by `perfect_hash_join_min_key_density`).
550/// 4. build_side.num_rows() < u32::MAX
551/// 5. NullEqualsNothing || (NullEqualsNull && build side doesn't contain null)
552///
553/// See [`try_create_array_map`] for more details.
554///
555/// Note that when using [`PartitionMode::Partitioned`], the build side is split into multiple
556/// partitions. This can cause a dense build side to become sparse within each partition,
557/// potentially disabling this optimization.
558///
559/// For example, consider:
560/// ```sql
561/// SELECT t1.value, t2.value
562/// FROM range(10000) AS t1
563/// JOIN range(10000) AS t2
564///   ON t1.value = t2.value;
565/// ```
566/// With 24 partitions, each partition will only receive a subset of the 10,000 rows.
567/// The first partition might contain values like `3, 10, 18, 39, 43`, which are sparse
568/// relative to the original range, even though the overall data set is dense.
569///
570/// # "Build Side" vs "Probe Side"
571///
572/// HashJoin takes two inputs, which are referred to as the "build" and the
573/// "probe". The build side is the first child, and the probe side is the second
574/// child.
575///
576/// The two inputs are treated differently and it is VERY important that the
577/// *smaller* input is placed on the build side to minimize the work of creating
578/// the hash table.
579///
580/// ```text
581///          ┌───────────┐
582///          │ HashJoin  │
583///          │           │
584///          └───────────┘
585///              │   │
586///        ┌─────┘   └─────┐
587///        ▼               ▼
588/// ┌────────────┐  ┌─────────────┐
589/// │   Input    │  │    Input    │
590/// │    [0]     │  │     [1]     │
591/// └────────────┘  └─────────────┘
592///
593///  "build side"    "probe side"
594/// ```
595///
596/// Execution proceeds in 2 stages:
597///
598/// 1. the **build phase** creates a hash table from the tuples of the build side,
599///    and single concatenated batch containing data from all fetched record batches.
600///    Resulting hash table stores hashed join-key fields for each row as a key, and
601///    indices of corresponding rows in concatenated batch.
602///
603/// When using the standard `JoinHashMap`, hash join uses LIFO data structure as a hash table,
604/// and in order to retain original build-side input order while obtaining data during probe phase,
605/// hash table is updated by iterating batch sequence in reverse order -- it allows to
606/// keep rows with smaller indices "on the top" of hash table, and still maintain
607/// correct indexing for concatenated build-side data batch.
608///
609/// Example of build phase for 3 record batches:
610///
611///
612/// ```text
613///
614///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
615///                                                                         ┌───────────────────────────┐
616///                             hashmap.insert(row-hash, row-idx + offset)  │                      idx  │
617///            ┌───────┐                                                    │          ┌───────┐        │
618///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
619///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
620///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
621///            └───────┘                                                    │          └───────┘        │
622///                                                                         │                           │
623///            ┌───────┐                                                    │          ┌───────┐        │
624///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
625///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
626///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
627///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
628///            │ Row 5 │                                                    │          │ Row 5 │    4   │
629///            └───────┘                                                    │          └───────┘        │
630///                                                                         │                           │
631///            ┌───────┐                                                    │          ┌───────┐        │
632///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
633///   Batch 3  │       │           - hashmap.insert(Row 2, idx 6)           │ Batch 1  │       │        │
634///            │ Row 7 │           - hashmap.insert(Row 1, idx 5)           │          │ Row 2 │    6   │
635///            └───────┘                                                    │          └───────┘        │
636///                                                                         │                           │
637///                                                                         └───────────────────────────┘
638/// ```
639///
640/// 2. the **probe phase** where the tuples of the probe side are streamed
641///    through, checking for matches of the join keys in the hash table.
642///
643/// ```text
644///                 ┌────────────────┐          ┌────────────────┐
645///                 │ ┌─────────┐    │          │ ┌─────────┐    │
646///                 │ │  Hash   │    │          │ │  Hash   │    │
647///                 │ │  Table  │    │          │ │  Table  │    │
648///                 │ │(keys are│    │          │ │(keys are│    │
649///                 │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
650///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
651/// *entire* build  │ │         │    │          │ │         │    │  streamed through, and
652///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
653/// into the hash   │      ▲         │          │          ▲     │   contents of the hash
654///     table       │       HashJoin │          │  HashJoin      │          table
655///                 └──────┼─────────┘          └──────────┼─────┘
656///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
657///            │                                                         │
658///
659///            │                                                         │
660///     ┌────────────┐                                            ┌────────────┐
661///     │RecordBatch │                                            │RecordBatch │
662///     └────────────┘                                            └────────────┘
663///     ┌────────────┐                                            ┌────────────┐
664///     │RecordBatch │                                            │RecordBatch │
665///     └────────────┘                                            └────────────┘
666///           ...                                                       ...
667///     ┌────────────┐                                            ┌────────────┐
668///     │RecordBatch │                                            │RecordBatch │
669///     └────────────┘                                            └────────────┘
670///
671///        build side                                                probe side
672/// ```
673///
674/// # Example "Optimal" Plans
675///
676/// The differences in the inputs means that for classic "Star Schema Query",
677/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
678/// one where there is one large table and several smaller "dimension" tables,
679/// joined on `Foreign Key = Primary Key` predicates.
680///
681/// A "Right Deep Tree" looks like this large table as the probe side on the
682/// lowest join:
683///
684/// ```text
685///             ┌───────────┐
686///             │ HashJoin  │
687///             │           │
688///             └───────────┘
689///                 │   │
690///         ┌───────┘   └──────────┐
691///         ▼                      ▼
692/// ┌───────────────┐        ┌───────────┐
693/// │ small table 1 │        │ HashJoin  │
694/// │  "dimension"  │        │           │
695/// └───────────────┘        └───┬───┬───┘
696///                   ┌──────────┘   └───────┐
697///                   │                      │
698///                   ▼                      ▼
699///           ┌───────────────┐        ┌───────────┐
700///           │ small table 2 │        │ HashJoin  │
701///           │  "dimension"  │        │           │
702///           └───────────────┘        └───┬───┬───┘
703///                               ┌────────┘   └────────┐
704///                               │                     │
705///                               ▼                     ▼
706///                       ┌───────────────┐     ┌───────────────┐
707///                       │ small table 3 │     │  large table  │
708///                       │  "dimension"  │     │    "fact"     │
709///                       └───────────────┘     └───────────────┘
710/// ```
711///
712/// # Clone / Shared State
713///
714/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
715/// loading of the left side with the processing in each output stream.
716/// Therefore it can not be [`Clone`]
717pub struct HashJoinExec {
718    /// left (build) side which gets hashed
719    pub left: Arc<dyn ExecutionPlan>,
720    /// right (probe) side which are filtered by the hash table
721    pub right: Arc<dyn ExecutionPlan>,
722    /// Set of equijoin columns from the relations: `(left_col, right_col)`
723    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
724    /// Filters which are applied while finding matching rows
725    pub filter: Option<JoinFilter>,
726    /// How the join is performed (`OUTER`, `INNER`, etc)
727    pub join_type: JoinType,
728    /// The schema after join. Please be careful when using this schema,
729    /// if there is a projection, the schema isn't the same as the output schema.
730    join_schema: SchemaRef,
731    /// Future that consumes left input and builds the hash table
732    ///
733    /// For CollectLeft partition mode, this structure is *shared* across all output streams.
734    ///
735    /// Each output stream waits on the `OnceAsync` to signal the completion of
736    /// the hash table creation.
737    left_fut: Arc<OnceAsync<JoinLeftData>>,
738    /// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization)
739    random_state: SeededRandomState,
740    /// Partitioning mode to use
741    pub mode: PartitionMode,
742    /// Execution metrics
743    metrics: ExecutionPlanMetricsSet,
744    /// The projection indices of the columns in the output schema of join
745    pub projection: Option<ProjectionRef>,
746    /// Information of index and left / right placement of columns
747    column_indices: Vec<ColumnIndex>,
748    /// The equality null-handling behavior of the join algorithm.
749    pub null_equality: NullEquality,
750    /// Flag to indicate if this is a null-aware anti join
751    pub null_aware: bool,
752    /// Cache holding plan properties like equivalences, output partitioning etc.
753    cache: Arc<PlanProperties>,
754    /// Dynamic filter for pushing down to the probe side
755    /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
756    /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
757    dynamic_filter: Option<HashJoinExecDynamicFilter>,
758    /// Maximum number of rows to return
759    fetch: Option<usize>,
760}
761
762#[derive(Clone)]
763struct HashJoinExecDynamicFilter {
764    /// Dynamic filter that we'll update with the results of the build side once that is done.
765    filter: Arc<DynamicFilterPhysicalExpr>,
766    /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
767    /// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
768    build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
769}
770
771impl fmt::Debug for HashJoinExec {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        f.debug_struct("HashJoinExec")
774            .field("left", &self.left)
775            .field("right", &self.right)
776            .field("on", &self.on)
777            .field("filter", &self.filter)
778            .field("join_type", &self.join_type)
779            .field("join_schema", &self.join_schema)
780            .field("left_fut", &self.left_fut)
781            .field("random_state", &self.random_state)
782            .field("mode", &self.mode)
783            .field("metrics", &self.metrics)
784            .field("projection", &self.projection)
785            .field("column_indices", &self.column_indices)
786            .field("null_equality", &self.null_equality)
787            .field("cache", &self.cache)
788            // Explicitly exclude dynamic_filter to avoid runtime state differences in tests
789            .finish()
790    }
791}
792
793impl EmbeddedProjection for HashJoinExec {
794    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
795        self.with_projection(projection)
796    }
797}
798
799impl HashJoinExec {
800    /// Tries to create a new [`HashJoinExec`].
801    ///
802    /// # Error
803    /// This function errors when it is not possible to join the left and right sides on keys `on`.
804    #[expect(clippy::too_many_arguments)]
805    pub fn try_new(
806        left: Arc<dyn ExecutionPlan>,
807        right: Arc<dyn ExecutionPlan>,
808        on: JoinOn,
809        filter: Option<JoinFilter>,
810        join_type: &JoinType,
811        projection: Option<Vec<usize>>,
812        partition_mode: PartitionMode,
813        null_equality: NullEquality,
814        null_aware: bool,
815    ) -> Result<Self> {
816        HashJoinExecBuilder::new(left, right, on, *join_type)
817            .with_filter(filter)
818            .with_projection(projection)
819            .with_partition_mode(partition_mode)
820            .with_null_equality(null_equality)
821            .with_null_aware(null_aware)
822            .build()
823    }
824
825    /// Create a builder based on the existing [`HashJoinExec`].
826    ///
827    /// Returned builder preserves all existing fields. If a field requiring properties
828    /// recomputation is modified, this will be done automatically during the node build.
829    ///
830    pub fn builder(&self) -> HashJoinExecBuilder {
831        self.into()
832    }
833
834    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
835        // Extract the right-side keys (probe side keys) from the `on` clauses
836        // Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
837        let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
838        // Initialize with a placeholder expression (true) that will be updated when the hash table is built
839        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
840    }
841
842    fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
843        let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
844        if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown {
845            return false;
846        }
847
848        // `preserve_file_partitions` can report Hash partitioning for Hive-style
849        // file groups, but those partitions are not actually hash-distributed.
850        // Partitioned dynamic filters rely on hash routing, so disable them in
851        // this mode to avoid incorrect results. Follow-up work: enable dynamic
852        // filtering for preserve_file_partitioned scans (issue #20195).
853        // https://github.com/apache/datafusion/issues/20195
854        if config.optimizer.preserve_file_partitions > 0
855            && self.mode == PartitionMode::Partitioned
856        {
857            return false;
858        }
859
860        true
861    }
862
863    /// left (build) side which gets hashed
864    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
865        &self.left
866    }
867
868    /// right (probe) side which are filtered by the hash table
869    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
870        &self.right
871    }
872
873    /// Set of common columns used to join on
874    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
875        &self.on
876    }
877
878    /// Filters applied before join output
879    pub fn filter(&self) -> Option<&JoinFilter> {
880        self.filter.as_ref()
881    }
882
883    /// How the join is performed
884    pub fn join_type(&self) -> &JoinType {
885        &self.join_type
886    }
887
888    /// The schema after join. Please be careful when using this schema,
889    /// if there is a projection, the schema isn't the same as the output schema.
890    pub fn join_schema(&self) -> &SchemaRef {
891        &self.join_schema
892    }
893
894    /// The partitioning mode of this hash join
895    pub fn partition_mode(&self) -> &PartitionMode {
896        &self.mode
897    }
898
899    /// Get null_equality
900    pub fn null_equality(&self) -> NullEquality {
901        self.null_equality
902    }
903
904    /// Get the dynamic filter expression for testing purposes.
905    /// Returns the dynamic filter expression for this hash join, if set.
906    pub fn dynamic_filter_expr(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
907        self.dynamic_filter.as_ref().map(|df| &df.filter)
908    }
909
910    /// Set the dynamic filter on this hash join.
911    ///
912    /// Resets any internal state that depends on any existing dynamic filter.
913    ///
914    /// Validates that the filter's children reference valid columns in
915    /// the probe (right) side's schema.
916    pub fn with_dynamic_filter_expr(
917        mut self,
918        filter: Arc<DynamicFilterPhysicalExpr>,
919    ) -> Result<Self> {
920        let probe_schema = self.right.schema();
921        for child in filter.children() {
922            child.data_type(&probe_schema)?;
923        }
924        self.dynamic_filter = Some(HashJoinExecDynamicFilter {
925            filter,
926            // Initialize with an empty accumulator which will be lazily populated
927            // during execution.
928            build_accumulator: OnceLock::new(),
929        });
930        Ok(self)
931    }
932
933    /// Calculate order preservation flags for this hash join.
934    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
935        vec![
936            false,
937            matches!(
938                join_type,
939                JoinType::Inner
940                    | JoinType::Right
941                    | JoinType::RightAnti
942                    | JoinType::RightSemi
943                    | JoinType::RightMark
944            ),
945        ]
946    }
947
948    /// Get probe side information for the hash join.
949    pub fn probe_side() -> JoinSide {
950        // In current implementation right side is always probe side.
951        JoinSide::Right
952    }
953
954    /// Return whether the join contains a projection
955    pub fn contains_projection(&self) -> bool {
956        self.projection.is_some()
957    }
958
959    /// Return new instance of [HashJoinExec] with the given projection.
960    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
961        let projection = projection.map(Into::into);
962        //  check if the projection is valid
963        can_project(&self.schema(), projection.as_deref())?;
964        let projection =
965            combine_projections(projection.as_ref(), self.projection.as_ref())?;
966        self.builder().with_projection_ref(projection).build()
967    }
968
969    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
970    fn compute_properties(
971        left: &Arc<dyn ExecutionPlan>,
972        right: &Arc<dyn ExecutionPlan>,
973        schema: &SchemaRef,
974        join_type: JoinType,
975        on: JoinOnRef,
976        mode: PartitionMode,
977        projection: Option<&[usize]>,
978    ) -> Result<PlanProperties> {
979        // Calculate equivalence properties:
980        let mut eq_properties = join_equivalence_properties(
981            left.equivalence_properties().clone(),
982            right.equivalence_properties().clone(),
983            &join_type,
984            Arc::clone(schema),
985            &Self::maintains_input_order(join_type),
986            Some(Self::probe_side()),
987            on,
988        )?;
989
990        let mut output_partitioning = match mode {
991            PartitionMode::CollectLeft => {
992                asymmetric_join_output_partitioning(left, right, &join_type)?
993            }
994            PartitionMode::Auto => Partitioning::UnknownPartitioning(
995                right.output_partitioning().partition_count(),
996            ),
997            PartitionMode::Partitioned => {
998                symmetric_join_output_partitioning(left, right, &join_type)?
999            }
1000        };
1001
1002        let emission_type = if left.boundedness().is_unbounded() {
1003            EmissionType::Final
1004        } else if right.pipeline_behavior() == EmissionType::Incremental {
1005            match join_type {
1006                // If we only need to generate matched rows from the probe side,
1007                // we can emit rows incrementally.
1008                JoinType::Inner
1009                | JoinType::LeftSemi
1010                | JoinType::RightSemi
1011                | JoinType::Right
1012                | JoinType::RightAnti
1013                | JoinType::RightMark => EmissionType::Incremental,
1014                // If we need to generate unmatched rows from the *build side*,
1015                // we need to emit them at the end.
1016                JoinType::Left
1017                | JoinType::LeftAnti
1018                | JoinType::LeftMark
1019                | JoinType::Full => EmissionType::Both,
1020            }
1021        } else {
1022            right.pipeline_behavior()
1023        };
1024
1025        // If contains projection, update the PlanProperties.
1026        if let Some(projection) = projection {
1027            // construct a map from the input expressions to the output expression of the Projection
1028            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
1029            let out_schema = project_schema(schema, Some(&projection))?;
1030            output_partitioning =
1031                output_partitioning.project(&projection_mapping, &eq_properties);
1032            eq_properties = eq_properties.project(&projection_mapping, out_schema);
1033        }
1034
1035        Ok(PlanProperties::new(
1036            eq_properties,
1037            output_partitioning,
1038            emission_type,
1039            boundedness_from_children([left, right]),
1040        ))
1041    }
1042
1043    /// Returns a new `ExecutionPlan` that computes the same join as this one,
1044    /// with the left and right inputs swapped using the  specified
1045    /// `partition_mode`.
1046    ///
1047    /// # Notes:
1048    ///
1049    /// This function is public so other downstream projects can use it to
1050    /// construct `HashJoinExec` with right side as the build side.
1051    ///
1052    /// For using this interface directly, please refer to below:
1053    ///
1054    /// Hash join execution may require specific input partitioning (for example,
1055    /// the left child may have a single partition while the right child has multiple).
1056    ///
1057    /// Calling this function on join nodes whose children have already been repartitioned
1058    /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning
1059    /// requirements of the hash join. Therefore, ensure you call this function
1060    /// before inserting any repartitioning operators on the join's children.
1061    ///
1062    /// In DataFusion's default SQL interface, this function is used by the `JoinSelection`
1063    /// physical optimizer rule to determine a good join order, which is
1064    /// executed before the `EnforceDistribution` rule (the rule that may
1065    /// insert `RepartitionExec` operators).
1066    pub fn swap_inputs(
1067        &self,
1068        partition_mode: PartitionMode,
1069    ) -> Result<Arc<dyn ExecutionPlan>> {
1070        let left = self.left();
1071        let right = self.right();
1072        let new_join = self
1073            .builder()
1074            .with_type(self.join_type.swap())
1075            .with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
1076            .with_on(
1077                self.on()
1078                    .iter()
1079                    .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
1080                    .collect(),
1081            )
1082            .with_filter(self.filter().map(JoinFilter::swap))
1083            .with_projection(swap_join_projection(
1084                left.schema().fields().len(),
1085                right.schema().fields().len(),
1086                self.projection.as_deref(),
1087                self.join_type(),
1088            ))
1089            .with_partition_mode(partition_mode)
1090            .build()?;
1091        // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
1092        if matches!(
1093            self.join_type(),
1094            JoinType::LeftSemi
1095                | JoinType::RightSemi
1096                | JoinType::LeftAnti
1097                | JoinType::RightAnti
1098                | JoinType::LeftMark
1099                | JoinType::RightMark
1100        ) || self.projection.is_some()
1101        {
1102            Ok(Arc::new(new_join))
1103        } else {
1104            reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
1105        }
1106    }
1107}
1108
1109impl DisplayAs for HashJoinExec {
1110    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
1111        match t {
1112            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1113                let display_filter = self.filter.as_ref().map_or_else(
1114                    || "".to_string(),
1115                    |f| format!(", filter={}", f.expression()),
1116                );
1117                let display_projections = if self.contains_projection() {
1118                    format!(
1119                        ", projection=[{}]",
1120                        self.projection
1121                            .as_ref()
1122                            .unwrap()
1123                            .iter()
1124                            .map(|index| format!(
1125                                "{}@{}",
1126                                self.join_schema.fields().get(*index).unwrap().name(),
1127                                index
1128                            ))
1129                            .collect::<Vec<_>>()
1130                            .join(", ")
1131                    )
1132                } else {
1133                    "".to_string()
1134                };
1135                let display_null_equality =
1136                    if self.null_equality() == NullEquality::NullEqualsNull {
1137                        ", NullsEqual: true"
1138                    } else {
1139                        ""
1140                    };
1141                let display_fetch = self
1142                    .fetch
1143                    .map_or_else(String::new, |f| format!(", fetch={f}"));
1144                let on = self
1145                    .on
1146                    .iter()
1147                    .map(|(c1, c2)| format!("({c1}, {c2})"))
1148                    .collect::<Vec<String>>()
1149                    .join(", ");
1150                write!(
1151                    f,
1152                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
1153                    self.mode,
1154                    self.join_type,
1155                    on,
1156                    display_filter,
1157                    display_projections,
1158                    display_null_equality,
1159                    display_fetch,
1160                )
1161            }
1162            DisplayFormatType::TreeRender => {
1163                let on = self
1164                    .on
1165                    .iter()
1166                    .map(|(c1, c2)| {
1167                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
1168                    })
1169                    .collect::<Vec<String>>()
1170                    .join(", ");
1171
1172                if *self.join_type() != JoinType::Inner {
1173                    writeln!(f, "join_type={:?}", self.join_type)?;
1174                }
1175
1176                writeln!(f, "on={on}")?;
1177
1178                if self.null_equality() == NullEquality::NullEqualsNull {
1179                    writeln!(f, "NullsEqual: true")?;
1180                }
1181
1182                if let Some(filter) = self.filter.as_ref() {
1183                    writeln!(f, "filter={filter}")?;
1184                }
1185
1186                if let Some(fetch) = self.fetch {
1187                    writeln!(f, "fetch={fetch}")?;
1188                }
1189
1190                Ok(())
1191            }
1192        }
1193    }
1194}
1195
1196impl ExecutionPlan for HashJoinExec {
1197    fn name(&self) -> &'static str {
1198        "HashJoinExec"
1199    }
1200
1201    fn properties(&self) -> &Arc<PlanProperties> {
1202        &self.cache
1203    }
1204
1205    fn required_input_distribution(&self) -> Vec<Distribution> {
1206        match self.mode {
1207            PartitionMode::CollectLeft => vec![
1208                Distribution::SinglePartition,
1209                Distribution::UnspecifiedDistribution,
1210            ],
1211            PartitionMode::Partitioned => {
1212                let (left_expr, right_expr) = self
1213                    .on
1214                    .iter()
1215                    .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1216                    .unzip();
1217                vec![
1218                    Distribution::HashPartitioned(left_expr),
1219                    Distribution::HashPartitioned(right_expr),
1220                ]
1221            }
1222            PartitionMode::Auto => vec![
1223                Distribution::UnspecifiedDistribution,
1224                Distribution::UnspecifiedDistribution,
1225            ],
1226        }
1227    }
1228
1229    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by
1230    // applying the hash function to convert the join key(s) in each row into a hash value from the
1231    // probe side table in the order they're arranged. The hash value is used to look up corresponding
1232    // entries in the hash table that was constructed from the build side table during the build phase.
1233    //
1234    // Because of the immediate generation of result rows once a match is found,
1235    // the output of the join tends to follow the order in which the rows were read from
1236    // the probe side table. This is simply due to the sequence in which the rows were processed.
1237    // Hence, it appears that the hash join is preserving the order of the probe side.
1238    //
1239    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
1240    // the unmatched rows from the probe side are also kept in order.
1241    // This is because the **`RightAnti`** join is designed to return rows from the right
1242    // (probe side) table that have no match in the left (build side) table. Because the rows
1243    // are processed sequentially in the probe phase, and unmatched rows are directly output
1244    // as results, these results tend to retain the order of the probe side table.
1245    fn maintains_input_order(&self) -> Vec<bool> {
1246        Self::maintains_input_order(self.join_type)
1247    }
1248
1249    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1250        vec![&self.left, &self.right]
1251    }
1252
1253    /// Creates a new HashJoinExec with different children while preserving configuration.
1254    ///
1255    /// This method is called during query optimization when the optimizer creates new
1256    /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
1257    /// rather than cloning the existing one because partitioning may have changed.
1258    fn with_new_children(
1259        self: Arc<Self>,
1260        children: Vec<Arc<dyn ExecutionPlan>>,
1261    ) -> Result<Arc<dyn ExecutionPlan>> {
1262        self.builder().with_new_children(children)?.build_exec()
1263    }
1264
1265    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1266        self.builder().reset_state().build_exec()
1267    }
1268
1269    fn execute(
1270        &self,
1271        partition: usize,
1272        context: Arc<TaskContext>,
1273    ) -> Result<SendableRecordBatchStream> {
1274        let on_left = self
1275            .on
1276            .iter()
1277            .map(|on| Arc::clone(&on.0))
1278            .collect::<Vec<_>>();
1279        let left_partitions = self.left.output_partitioning().partition_count();
1280        let right_partitions = self.right.output_partitioning().partition_count();
1281
1282        assert_or_internal_err!(
1283            self.mode != PartitionMode::Partitioned
1284                || left_partitions == right_partitions,
1285            "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
1286             consider using RepartitionExec"
1287        );
1288
1289        assert_or_internal_err!(
1290            self.mode != PartitionMode::CollectLeft || left_partitions == 1,
1291            "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
1292             consider using CoalescePartitionsExec or the EnforceDistribution rule"
1293        );
1294
1295        // Only enable dynamic filter pushdown if:
1296        // - The session config enables dynamic filter pushdown
1297        // - A dynamic filter exists
1298        // - At least one consumer is holding a reference to it, this avoids expensive filter
1299        //   computation when disabled or when no consumer will use it.
1300        let enable_dynamic_filter_pushdown = self
1301            .allow_join_dynamic_filter_pushdown(context.session_config().options())
1302            && self
1303                .dynamic_filter
1304                .as_ref()
1305                .map(|df| df.filter.is_used())
1306                .unwrap_or(false);
1307
1308        let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
1309
1310        let array_map_created_count = MetricBuilder::new(&self.metrics)
1311            .with_category(MetricCategory::Rows)
1312            .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
1313
1314        // Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1315        // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1316        let repartition_random_state = REPARTITION_RANDOM_STATE;
1317        let build_accumulator = enable_dynamic_filter_pushdown
1318            .then(|| {
1319                self.dynamic_filter.as_ref().map(|df| {
1320                    let filter = Arc::clone(&df.filter);
1321                    let on_right = self
1322                        .on
1323                        .iter()
1324                        .map(|(_, right_expr)| Arc::clone(right_expr))
1325                        .collect::<Vec<_>>();
1326                    Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1327                        Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1328                            self.mode,
1329                            self.left.as_ref(),
1330                            self.right.as_ref(),
1331                            filter,
1332                            on_right,
1333                            repartition_random_state,
1334                        ))
1335                    })))
1336                })
1337            })
1338            .flatten()
1339            .flatten();
1340
1341        let left_fut = match self.mode {
1342            PartitionMode::CollectLeft => self.left_fut.try_once(|| {
1343                let left_stream = self.left.execute(0, Arc::clone(&context))?;
1344
1345                let reservation =
1346                    MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
1347
1348                Ok(collect_left_input(
1349                    self.random_state.random_state().clone(),
1350                    left_stream,
1351                    on_left.clone(),
1352                    join_metrics.clone(),
1353                    reservation,
1354                    need_produce_result_in_final(self.join_type),
1355                    self.right().output_partitioning().partition_count(),
1356                    enable_dynamic_filter_pushdown,
1357                    Arc::clone(context.session_config().options()),
1358                    self.null_equality,
1359                    array_map_created_count,
1360                ))
1361            })?,
1362            PartitionMode::Partitioned => {
1363                let left_stream = self.left.execute(partition, Arc::clone(&context))?;
1364
1365                let reservation =
1366                    MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
1367                        .register(context.memory_pool());
1368                OnceFut::new(collect_left_input(
1369                    self.random_state.random_state().clone(),
1370                    left_stream,
1371                    on_left.clone(),
1372                    join_metrics.clone(),
1373                    reservation,
1374                    need_produce_result_in_final(self.join_type),
1375                    1,
1376                    enable_dynamic_filter_pushdown,
1377                    Arc::clone(context.session_config().options()),
1378                    self.null_equality,
1379                    array_map_created_count,
1380                ))
1381            }
1382            PartitionMode::Auto => {
1383                return plan_err!(
1384                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1385                    PartitionMode::Auto
1386                );
1387            }
1388        };
1389
1390        let batch_size = context.session_config().batch_size();
1391
1392        // we have the batches and the hash map with their keys. We can how create a stream
1393        // over the right that uses this information to issue new batches.
1394        let right_stream = self.right.execute(partition, context)?;
1395
1396        // update column indices to reflect the projection
1397        let column_indices_after_projection = match self.projection.as_ref() {
1398            Some(projection) => projection
1399                .iter()
1400                .map(|i| self.column_indices[*i].clone())
1401                .collect(),
1402            None => self.column_indices.clone(),
1403        };
1404
1405        let on_right = self
1406            .on
1407            .iter()
1408            .map(|(_, right_expr)| Arc::clone(right_expr))
1409            .collect::<Vec<_>>();
1410
1411        Ok(Box::pin(HashJoinStream::new(
1412            partition,
1413            self.schema(),
1414            on_right,
1415            self.filter.clone(),
1416            self.join_type,
1417            right_stream,
1418            self.random_state.random_state().clone(),
1419            join_metrics,
1420            column_indices_after_projection,
1421            self.null_equality,
1422            HashJoinStreamState::WaitBuildSide,
1423            BuildSide::Initial(BuildSideInitialState { left_fut }),
1424            batch_size,
1425            vec![],
1426            self.right.output_ordering().is_some(),
1427            build_accumulator,
1428            self.mode,
1429            self.null_aware,
1430            self.fetch,
1431        )))
1432    }
1433
1434    fn metrics(&self) -> Option<MetricsSet> {
1435        Some(self.metrics.clone_inner())
1436    }
1437
1438    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1439        let stats = match (partition, self.mode) {
1440            // For CollectLeft mode, the left side is collected into a single partition,
1441            // so all left partitions are available to each output partition.
1442            // For the right side, we need the specific partition statistics.
1443            (Some(partition), PartitionMode::CollectLeft) => {
1444                let left_stats = self.left.partition_statistics(None)?;
1445                let right_stats = self.right.partition_statistics(Some(partition))?;
1446
1447                estimate_join_statistics(
1448                    Arc::unwrap_or_clone(left_stats),
1449                    Arc::unwrap_or_clone(right_stats),
1450                    &self.on,
1451                    &self.join_type,
1452                    &self.join_schema,
1453                )?
1454            }
1455
1456            // For Partitioned mode, both sides are partitioned, so each output partition
1457            // only has access to the corresponding partition from both sides.
1458            (Some(partition), PartitionMode::Partitioned) => {
1459                let left_stats = self.left.partition_statistics(Some(partition))?;
1460                let right_stats = self.right.partition_statistics(Some(partition))?;
1461
1462                estimate_join_statistics(
1463                    Arc::unwrap_or_clone(left_stats),
1464                    Arc::unwrap_or_clone(right_stats),
1465                    &self.on,
1466                    &self.join_type,
1467                    &self.join_schema,
1468                )?
1469            }
1470
1471            // For Auto mode or when no specific partition is requested, fall back to
1472            // the current behavior of getting all partition statistics.
1473            (None, _) | (Some(_), PartitionMode::Auto) => {
1474                // TODO stats: it is not possible in general to know the output size of joins
1475                // There are some special cases though, for example:
1476                // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1477                let left_stats = self.left.partition_statistics(None)?;
1478                let right_stats = self.right.partition_statistics(None)?;
1479                estimate_join_statistics(
1480                    Arc::unwrap_or_clone(left_stats),
1481                    Arc::unwrap_or_clone(right_stats),
1482                    &self.on,
1483                    &self.join_type,
1484                    &self.join_schema,
1485                )?
1486            }
1487        };
1488        // Project statistics if there is a projection
1489        let stats = stats.project(self.projection.as_ref());
1490        // Apply fetch limit to statistics
1491        Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
1492    }
1493
1494    /// Tries to push `projection` down through `hash_join`. If possible, performs the
1495    /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
1496    /// as its children. Otherwise, returns `None`.
1497    fn try_swapping_with_projection(
1498        &self,
1499        projection: &ProjectionExec,
1500    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1501        // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
1502        if self.contains_projection() {
1503            return Ok(None);
1504        }
1505
1506        let schema = self.schema();
1507        if let Some(JoinData {
1508            projected_left_child,
1509            projected_right_child,
1510            join_filter,
1511            join_on,
1512        }) = try_pushdown_through_join(
1513            projection,
1514            self.left(),
1515            self.right(),
1516            self.on(),
1517            &schema,
1518            self.filter(),
1519        )? {
1520            self.builder()
1521                .with_new_children(vec![
1522                    Arc::new(projected_left_child),
1523                    Arc::new(projected_right_child),
1524                ])?
1525                .with_on(join_on)
1526                .with_filter(join_filter)
1527                // Returned early if projection is not None
1528                .with_projection(None)
1529                .build_exec()
1530                .map(Some)
1531        } else {
1532            try_embed_projection(projection, self)
1533        }
1534    }
1535
1536    fn gather_filters_for_pushdown(
1537        &self,
1538        phase: FilterPushdownPhase,
1539        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1540        config: &ConfigOptions,
1541    ) -> Result<FilterDescription> {
1542        // This is the physical-plan equivalent of `push_down_all_join` in
1543        // `datafusion/optimizer/src/push_down_filter.rs`. That function uses `lr_is_preserved`
1544        // to decide which parent predicates can be pushed past a logical join to its children,
1545        // then checks column references to route each predicate to the correct side.
1546        //
1547        // We apply the same two-level logic here:
1548        // 1. `lr_is_preserved` gates whether a side is eligible at all.
1549        // 2. For each filter, we check that all column references belong to the
1550        //    target child (using `column_indices` to map output column positions
1551        //    to join sides). This is critical for correctness: name-based matching
1552        //    alone (as done by `ChildFilterDescription::from_child`) can incorrectly
1553        //    push filters when different join sides have columns with the same name
1554        //    (e.g. nested mark joins both producing "mark" columns).
1555        let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);
1556
1557        // Build the set of allowed column indices for each side
1558        let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
1559            Some(projection) => projection
1560                .iter()
1561                .map(|i| self.column_indices[*i].clone())
1562                .collect(),
1563            None => self.column_indices.clone(),
1564        };
1565
1566        let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new());
1567        column_indices
1568            .iter()
1569            .enumerate()
1570            .for_each(|(output_idx, ci)| {
1571                match ci.side {
1572                    JoinSide::Left => left_allowed.insert(output_idx),
1573                    JoinSide::Right => right_allowed.insert(output_idx),
1574                    // Mark columns - don't allow pushdown to either side
1575                    JoinSide::None => false,
1576                };
1577            });
1578
1579        // For semi/anti joins, the non-preserved side's columns are not in the
1580        // output, but filters on join key columns can still be pushed there.
1581        // We find output columns that are join keys on the preserved side and
1582        // add their output indices to the non-preserved side's allowed set.
1583        // The name-based remap in FilterRemapper will then match them to the
1584        // corresponding column in the non-preserved child's schema.
1585        match self.join_type {
1586            JoinType::LeftSemi | JoinType::LeftAnti => {
1587                let left_key_indices: HashSet<usize> = self
1588                    .on
1589                    .iter()
1590                    .filter_map(|(left_key, _)| {
1591                        left_key.downcast_ref::<Column>().map(|c| c.index())
1592                    })
1593                    .collect();
1594                for (output_idx, ci) in column_indices.iter().enumerate() {
1595                    if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) {
1596                        right_allowed.insert(output_idx);
1597                    }
1598                }
1599            }
1600            JoinType::RightSemi | JoinType::RightAnti => {
1601                let right_key_indices: HashSet<usize> = self
1602                    .on
1603                    .iter()
1604                    .filter_map(|(_, right_key)| {
1605                        right_key.downcast_ref::<Column>().map(|c| c.index())
1606                    })
1607                    .collect();
1608                for (output_idx, ci) in column_indices.iter().enumerate() {
1609                    if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index)
1610                    {
1611                        left_allowed.insert(output_idx);
1612                    }
1613                }
1614            }
1615            _ => {}
1616        }
1617
1618        let left_child = if left_preserved {
1619            ChildFilterDescription::from_child_with_allowed_indices(
1620                &parent_filters,
1621                left_allowed,
1622                self.left(),
1623            )?
1624        } else {
1625            ChildFilterDescription::all_unsupported(&parent_filters)
1626        };
1627
1628        let mut right_child = if right_preserved {
1629            ChildFilterDescription::from_child_with_allowed_indices(
1630                &parent_filters,
1631                right_allowed,
1632                self.right(),
1633            )?
1634        } else {
1635            ChildFilterDescription::all_unsupported(&parent_filters)
1636        };
1637
1638        // Add dynamic filters in Post phase if enabled
1639        if phase == FilterPushdownPhase::Post
1640            && self.allow_join_dynamic_filter_pushdown(config)
1641        {
1642            // Add actual dynamic filter to right side (probe side)
1643            let dynamic_filter = Self::create_dynamic_filter(&self.on);
1644            right_child = right_child.with_self_filter(dynamic_filter);
1645        }
1646
1647        Ok(FilterDescription::new()
1648            .with_child(left_child)
1649            .with_child(right_child))
1650    }
1651
1652    fn handle_child_pushdown_result(
1653        &self,
1654        _phase: FilterPushdownPhase,
1655        child_pushdown_result: ChildPushdownResult,
1656        _config: &ConfigOptions,
1657    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1658        let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1659        assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1660        let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1661        // We expect 0 or 1 self filters
1662        if let Some(filter) = right_child_self_filters.first() {
1663            // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1664            // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1665            let predicate = Arc::clone(&filter.predicate);
1666            if let Ok(dynamic_filter) =
1667                Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1668            {
1669                // We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1670                let new_node = self
1671                    .builder()
1672                    .with_dynamic_filter(Some(HashJoinExecDynamicFilter {
1673                        filter: dynamic_filter,
1674                        build_accumulator: OnceLock::new(),
1675                    }))
1676                    .build_exec()?;
1677                result = result.with_updated_node(new_node);
1678            }
1679        }
1680        Ok(result)
1681    }
1682
1683    fn supports_limit_pushdown(&self) -> bool {
1684        // Hash join execution plan does not support pushing limit down through to children
1685        // because the children don't know about the join condition and can't
1686        // determine how many rows to produce
1687        false
1688    }
1689
1690    fn fetch(&self) -> Option<usize> {
1691        self.fetch
1692    }
1693
1694    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1695        self.builder()
1696            .with_fetch(limit)
1697            .build()
1698            .ok()
1699            .map(|exec| Arc::new(exec) as _)
1700    }
1701}
1702
1703/// Determines which sides of a join are "preserved" for filter pushdown.
1704///
1705/// A preserved side means filters on that side's columns can be safely pushed
1706/// below the join. This mirrors the logic in the logical optimizer's
1707/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`.
1708fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
1709    match join_type {
1710        JoinType::Inner => (true, true),
1711        JoinType::Left => (true, false),
1712        JoinType::Right => (false, true),
1713        JoinType::Full => (false, false),
1714        // Filters in semi/anti joins are either on the preserved side, or on join keys,
1715        // as all output columns come from the preserved side. Join key filters can be
1716        // safely pushed down into the other side.
1717        JoinType::LeftSemi | JoinType::LeftAnti => (true, true),
1718        JoinType::RightSemi | JoinType::RightAnti => (true, true),
1719        JoinType::LeftMark => (true, false),
1720        JoinType::RightMark => (false, true),
1721    }
1722}
1723
1724/// Accumulator for collecting min/max bounds from build-side data during hash join.
1725///
1726/// This struct encapsulates the logic for progressively computing column bounds
1727/// (minimum and maximum values) for a specific join key expression as batches
1728/// are processed during the build phase of a hash join.
1729///
1730/// The bounds are used for dynamic filter pushdown optimization, where filters
1731/// based on the actual data ranges can be pushed down to the probe side to
1732/// eliminate unnecessary data early.
1733struct CollectLeftAccumulator {
1734    /// The physical expression to evaluate for each batch
1735    expr: Arc<dyn PhysicalExpr>,
1736    /// Accumulator for tracking the minimum value across all batches
1737    min: MinAccumulator,
1738    /// Accumulator for tracking the maximum value across all batches
1739    max: MaxAccumulator,
1740}
1741
1742impl CollectLeftAccumulator {
1743    /// Creates a new accumulator for tracking bounds of a join key expression.
1744    ///
1745    /// # Arguments
1746    /// * `expr` - The physical expression to track bounds for
1747    /// * `schema` - The schema of the input data
1748    ///
1749    /// # Returns
1750    /// A new `CollectLeftAccumulator` instance configured for the expression's data type
1751    fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1752        /// Recursively unwraps dictionary types to get the underlying value type.
1753        fn dictionary_value_type(data_type: &DataType) -> DataType {
1754            match data_type {
1755                DataType::Dictionary(_, value_type) => {
1756                    dictionary_value_type(value_type.as_ref())
1757                }
1758                _ => data_type.clone(),
1759            }
1760        }
1761
1762        let data_type = expr
1763            .data_type(schema)
1764            // Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
1765            .map(|dt| dictionary_value_type(&dt))?;
1766        Ok(Self {
1767            expr,
1768            min: MinAccumulator::try_new(&data_type)?,
1769            max: MaxAccumulator::try_new(&data_type)?,
1770        })
1771    }
1772
1773    /// Updates the accumulators with values from a new batch.
1774    ///
1775    /// Evaluates the expression on the batch and updates both min and max
1776    /// accumulators with the resulting values.
1777    ///
1778    /// # Arguments
1779    /// * `batch` - The record batch to process
1780    ///
1781    /// # Returns
1782    /// Ok(()) if the update succeeds, or an error if expression evaluation fails
1783    fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1784        let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1785        self.min.update_batch(std::slice::from_ref(&array))?;
1786        self.max.update_batch(std::slice::from_ref(&array))?;
1787        Ok(())
1788    }
1789
1790    /// Finalizes the accumulation and returns the computed bounds.
1791    ///
1792    /// Consumes self to extract the final min and max values from the accumulators.
1793    ///
1794    /// # Returns
1795    /// The `ColumnBounds` containing the minimum and maximum values observed
1796    fn evaluate(mut self) -> Result<ColumnBounds> {
1797        Ok(ColumnBounds::new(
1798            self.min.evaluate()?,
1799            self.max.evaluate()?,
1800        ))
1801    }
1802}
1803
1804/// State for collecting the build-side data during hash join
1805struct BuildSideState {
1806    batches: Vec<RecordBatch>,
1807    num_rows: usize,
1808    metrics: BuildProbeJoinMetrics,
1809    reservation: MemoryReservation,
1810    bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1811}
1812
1813impl BuildSideState {
1814    /// Create a new BuildSideState with optional accumulators for bounds computation
1815    fn try_new(
1816        metrics: BuildProbeJoinMetrics,
1817        reservation: MemoryReservation,
1818        on_left: Vec<Arc<dyn PhysicalExpr>>,
1819        schema: &SchemaRef,
1820        should_compute_dynamic_filters: bool,
1821    ) -> Result<Self> {
1822        Ok(Self {
1823            batches: Vec::new(),
1824            num_rows: 0,
1825            metrics,
1826            reservation,
1827            bounds_accumulators: should_compute_dynamic_filters
1828                .then(|| {
1829                    on_left
1830                        .into_iter()
1831                        .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1832                        .collect::<Result<Vec<_>>>()
1833                })
1834                .transpose()?,
1835        })
1836    }
1837}
1838
1839fn should_collect_min_max_for_perfect_hash(
1840    on_left: &[PhysicalExprRef],
1841    schema: &SchemaRef,
1842) -> Result<bool> {
1843    if on_left.len() != 1 {
1844        return Ok(false);
1845    }
1846
1847    let expr = &on_left[0];
1848    let data_type = expr.data_type(schema)?;
1849    Ok(ArrayMap::is_supported_type(&data_type))
1850}
1851
1852/// Collects all batches from the left (build) side stream and creates a hash map for joining.
1853///
1854/// This function is responsible for:
1855/// 1. Consuming the entire left stream and collecting all batches into memory
1856/// 2. Building a hash map from the join key columns for efficient probe operations
1857/// 3. Computing bounds for dynamic filter pushdown (if enabled)
1858/// 4. Preparing visited indices bitmap for certain join types
1859///
1860/// # Parameters
1861/// * `random_state` - Random state for consistent hashing across partitions
1862/// * `left_stream` - Stream of record batches from the build side
1863/// * `on_left` - Physical expressions for the left side join keys
1864/// * `metrics` - Metrics collector for tracking memory usage and row counts
1865/// * `reservation` - Memory reservation tracker for the hash table and data
1866/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
1867/// * `probe_threads_count` - Number of threads that will probe this hash table
1868/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering
1869///
1870/// # Dynamic Filter Coordination
1871/// When `should_compute_dynamic_filters` is true, this function computes the min/max bounds
1872/// for each join key column but does NOT update the dynamic filter. Instead, the
1873/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1874/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
1875/// before updating the filter exactly once.
1876///
1877/// # Returns
1878/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
1879/// visited indices bitmap, and computed bounds (if requested).
1880#[expect(clippy::too_many_arguments)]
1881async fn collect_left_input(
1882    random_state: RandomState,
1883    left_stream: SendableRecordBatchStream,
1884    on_left: Vec<PhysicalExprRef>,
1885    metrics: BuildProbeJoinMetrics,
1886    reservation: MemoryReservation,
1887    with_visited_indices_bitmap: bool,
1888    probe_threads_count: usize,
1889    should_compute_dynamic_filters: bool,
1890    config: Arc<ConfigOptions>,
1891    null_equality: NullEquality,
1892    array_map_created_count: Count,
1893) -> Result<JoinLeftData> {
1894    let schema = left_stream.schema();
1895
1896    let should_collect_min_max_for_phj =
1897        should_collect_min_max_for_perfect_hash(&on_left, &schema)?;
1898
1899    let initial = BuildSideState::try_new(
1900        metrics,
1901        reservation,
1902        on_left.clone(),
1903        &schema,
1904        should_compute_dynamic_filters || should_collect_min_max_for_phj,
1905    )?;
1906
1907    let state = left_stream
1908        .try_fold(initial, |mut state, batch| async move {
1909            // Update accumulators if computing bounds
1910            if let Some(ref mut accumulators) = state.bounds_accumulators {
1911                for accumulator in accumulators {
1912                    accumulator.update_batch(&batch)?;
1913                }
1914            }
1915
1916            // Decide if we spill or not
1917            let batch_size = get_record_batch_memory_size(&batch);
1918            // Reserve memory for incoming batch
1919            state.reservation.try_grow(batch_size)?;
1920            // Update metrics
1921            state.metrics.build_mem_used.add(batch_size);
1922            state.metrics.build_input_batches.add(1);
1923            state.metrics.build_input_rows.add(batch.num_rows());
1924            // Update row count
1925            state.num_rows += batch.num_rows();
1926            // Push batch to output
1927            state.batches.push(batch);
1928            Ok(state)
1929        })
1930        .await?;
1931
1932    // Extract fields from state
1933    let BuildSideState {
1934        batches,
1935        num_rows,
1936        metrics,
1937        mut reservation,
1938        bounds_accumulators,
1939    } = state;
1940
1941    // Compute bounds
1942    let mut bounds = match bounds_accumulators {
1943        Some(accumulators) if num_rows > 0 => {
1944            let bounds = accumulators
1945                .into_iter()
1946                .map(CollectLeftAccumulator::evaluate)
1947                .collect::<Result<Vec<_>>>()?;
1948            Some(PartitionBounds::new(bounds))
1949        }
1950        _ => None,
1951    };
1952
1953    let (join_hash_map, batch, left_values) =
1954        if let Some((array_map, batch, left_value)) = try_create_array_map(
1955            &bounds,
1956            &schema,
1957            &batches,
1958            &on_left,
1959            &mut reservation,
1960            config.execution.perfect_hash_join_small_build_threshold,
1961            config.execution.perfect_hash_join_min_key_density,
1962            null_equality,
1963        )? {
1964            array_map_created_count.add(1);
1965            metrics.build_mem_used.add(array_map.size());
1966
1967            (Map::ArrayMap(array_map), batch, left_value)
1968        } else {
1969            // Estimation of memory size, required for hashtable, prior to allocation.
1970            // Final result can be verified using `RawTable.allocation_info()`
1971            let fixed_size_u32 = size_of::<JoinHashMapU32>();
1972            let fixed_size_u64 = size_of::<JoinHashMapU64>();
1973
1974            // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
1975            // `u64` indice variant
1976            // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
1977            let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1978                let estimated_hashtable_size =
1979                    estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1980                reservation.try_grow(estimated_hashtable_size)?;
1981                metrics.build_mem_used.add(estimated_hashtable_size);
1982                Box::new(JoinHashMapU64::with_capacity(num_rows))
1983            } else {
1984                let estimated_hashtable_size =
1985                    estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1986                reservation.try_grow(estimated_hashtable_size)?;
1987                metrics.build_mem_used.add(estimated_hashtable_size);
1988                Box::new(JoinHashMapU32::with_capacity(num_rows))
1989            };
1990
1991            let mut hashes_buffer = Vec::new();
1992            let mut offset = 0;
1993
1994            let batches_iter = batches.iter().rev();
1995
1996            // Updating hashmap starting from the last batch
1997            for batch in batches_iter.clone() {
1998                hashes_buffer.clear();
1999                hashes_buffer.resize(batch.num_rows(), 0);
2000                update_hash(
2001                    &on_left,
2002                    batch,
2003                    &mut *hashmap,
2004                    offset,
2005                    &random_state,
2006                    &mut hashes_buffer,
2007                    0,
2008                    true,
2009                )?;
2010                offset += batch.num_rows();
2011            }
2012
2013            // Merge all batches into a single batch, so we can directly index into the arrays
2014            let batch = concat_batches(&schema, batches_iter.clone())?;
2015
2016            let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
2017
2018            (Map::HashMap(hashmap), batch, left_values)
2019        };
2020
2021    // Reserve additional memory for visited indices bitmap and create shared builder
2022    let visited_indices_bitmap = if with_visited_indices_bitmap {
2023        let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
2024        reservation.try_grow(bitmap_size)?;
2025        metrics.build_mem_used.add(bitmap_size);
2026
2027        let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
2028        bitmap_buffer.append_n(num_rows, false);
2029        bitmap_buffer
2030    } else {
2031        BooleanBufferBuilder::new(0)
2032    };
2033
2034    let map = Arc::new(join_hash_map);
2035
2036    let membership = if num_rows == 0 {
2037        PushdownStrategy::Empty
2038    } else {
2039        // If the build side is small enough we can use IN list pushdown.
2040        // If it's too big we fall back to pushing down a reference to the hash table.
2041        // See `PushdownStrategy` for more details.
2042        let estimated_size = left_values
2043            .iter()
2044            .map(|arr| arr.get_array_memory_size())
2045            .sum::<usize>();
2046        if left_values.is_empty()
2047            || left_values[0].is_empty()
2048            || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
2049            || map.num_of_distinct_key()
2050                > config
2051                    .optimizer
2052                    .hash_join_inlist_pushdown_max_distinct_values
2053        {
2054            PushdownStrategy::Map(Arc::clone(&map))
2055        } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
2056            PushdownStrategy::InList(in_list_values)
2057        } else {
2058            PushdownStrategy::Map(Arc::clone(&map))
2059        }
2060    };
2061
2062    if should_collect_min_max_for_phj && !should_compute_dynamic_filters {
2063        bounds = None;
2064    }
2065
2066    let data = JoinLeftData {
2067        map,
2068        batch,
2069        values: left_values,
2070        visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
2071        probe_threads_counter: AtomicUsize::new(probe_threads_count),
2072        _reservation: reservation,
2073        bounds,
2074        membership,
2075        probe_side_non_empty: AtomicBool::new(false),
2076        probe_side_has_null: AtomicBool::new(false),
2077    };
2078
2079    Ok(data)
2080}
2081
2082#[cfg(test)]
2083mod tests {
2084    use super::*;
2085
2086    fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) {
2087        if use_phj {
2088            assert!(
2089                metrics
2090                    .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2091                    .expect("should have array_map_created_count metrics")
2092                    .as_usize()
2093                    >= 1
2094            );
2095        } else {
2096            assert_eq!(
2097                metrics
2098                    .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2099                    .map(|v| v.as_usize())
2100                    .unwrap_or(0),
2101                0
2102            )
2103        }
2104    }
2105
2106    fn build_schema_and_on() -> Result<(SchemaRef, SchemaRef, JoinOn)> {
2107        let left_schema = Arc::new(Schema::new(vec![
2108            Field::new("a1", DataType::Int32, true),
2109            Field::new("b1", DataType::Int32, true),
2110        ]));
2111        let right_schema = Arc::new(Schema::new(vec![
2112            Field::new("a2", DataType::Int32, true),
2113            Field::new("b1", DataType::Int32, true),
2114        ]));
2115        let on = vec![(
2116            Arc::new(Column::new_with_schema("b1", &left_schema)?) as _,
2117            Arc::new(Column::new_with_schema("b1", &right_schema)?) as _,
2118        )];
2119        Ok((left_schema, right_schema, on))
2120    }
2121
2122    use crate::coalesce_partitions::CoalescePartitionsExec;
2123    use crate::joins::hash_join::stream::lookup_join_hashmap;
2124    use crate::test::{TestMemoryExec, assert_join_metrics};
2125    use crate::{
2126        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2127        test::exec::MockExec,
2128    };
2129
2130    use arrow::array::{
2131        Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
2132    };
2133    use arrow::buffer::NullBuffer;
2134    use arrow::datatypes::{DataType, Field};
2135    use datafusion_common::hash_utils::create_hashes;
2136    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
2137    use datafusion_common::{
2138        ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
2139        exec_err, internal_err,
2140    };
2141    use datafusion_execution::config::SessionConfig;
2142    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2143    use datafusion_expr::Operator;
2144    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2145    use hashbrown::HashTable;
2146    use insta::{allow_duplicates, assert_snapshot};
2147    use rstest::*;
2148    use rstest_reuse::*;
2149
2150    fn div_ceil(a: usize, b: usize) -> usize {
2151        a.div_ceil(b)
2152    }
2153
2154    #[template]
2155    #[rstest]
2156    fn hash_join_exec_configs(
2157        #[values(8192, 10, 5, 2, 1)] batch_size: usize,
2158        #[values(true, false)] use_perfect_hash_join_as_possible: bool,
2159    ) {
2160    }
2161
2162    fn prepare_task_ctx(
2163        batch_size: usize,
2164        use_perfect_hash_join_as_possible: bool,
2165    ) -> Arc<TaskContext> {
2166        let mut session_config = SessionConfig::default().with_batch_size(batch_size);
2167
2168        if use_perfect_hash_join_as_possible {
2169            session_config
2170                .options_mut()
2171                .execution
2172                .perfect_hash_join_small_build_threshold = 819200;
2173            session_config
2174                .options_mut()
2175                .execution
2176                .perfect_hash_join_min_key_density = 0.0;
2177        } else {
2178            session_config
2179                .options_mut()
2180                .execution
2181                .perfect_hash_join_small_build_threshold = 0;
2182            session_config
2183                .options_mut()
2184                .execution
2185                .perfect_hash_join_min_key_density = f64::INFINITY;
2186        }
2187        Arc::new(TaskContext::default().with_session_config(session_config))
2188    }
2189
2190    fn build_table(
2191        a: (&str, &Vec<i32>),
2192        b: (&str, &Vec<i32>),
2193        c: (&str, &Vec<i32>),
2194    ) -> Arc<dyn ExecutionPlan> {
2195        let batch = build_table_i32(a, b, c);
2196        let schema = batch.schema();
2197        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2198    }
2199
2200    /// Build a table with two columns supporting nullable values
2201    fn build_table_two_cols(
2202        a: (&str, &Vec<Option<i32>>),
2203        b: (&str, &Vec<Option<i32>>),
2204    ) -> Arc<dyn ExecutionPlan> {
2205        let schema = Arc::new(Schema::new(vec![
2206            Field::new(a.0, DataType::Int32, true),
2207            Field::new(b.0, DataType::Int32, true),
2208        ]));
2209        let batch = RecordBatch::try_new(
2210            Arc::clone(&schema),
2211            vec![
2212                Arc::new(Int32Array::from(a.1.clone())),
2213                Arc::new(Int32Array::from(b.1.clone())),
2214            ],
2215        )
2216        .unwrap();
2217        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2218    }
2219
2220    fn join(
2221        left: Arc<dyn ExecutionPlan>,
2222        right: Arc<dyn ExecutionPlan>,
2223        on: JoinOn,
2224        join_type: &JoinType,
2225        null_equality: NullEquality,
2226    ) -> Result<HashJoinExec> {
2227        HashJoinExec::try_new(
2228            left,
2229            right,
2230            on,
2231            None,
2232            join_type,
2233            None,
2234            PartitionMode::CollectLeft,
2235            null_equality,
2236            false,
2237        )
2238    }
2239
2240    fn join_with_filter(
2241        left: Arc<dyn ExecutionPlan>,
2242        right: Arc<dyn ExecutionPlan>,
2243        on: JoinOn,
2244        filter: JoinFilter,
2245        join_type: &JoinType,
2246        null_equality: NullEquality,
2247    ) -> Result<HashJoinExec> {
2248        HashJoinExec::try_new(
2249            left,
2250            right,
2251            on,
2252            Some(filter),
2253            join_type,
2254            None,
2255            PartitionMode::CollectLeft,
2256            null_equality,
2257            false,
2258        )
2259    }
2260
2261    fn empty_build_with_probe_error_inputs()
2262    -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2263        let left_batch =
2264            build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
2265        let left_schema = left_batch.schema();
2266        let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
2267            &[vec![left_batch]],
2268            Arc::clone(&left_schema),
2269            None,
2270        )
2271        .unwrap();
2272
2273        let err = exec_err!("bad data error");
2274        let right_batch =
2275            build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2276        let right_schema = right_batch.schema();
2277        let on = vec![(
2278            Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
2279            Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
2280        )];
2281        let right: Arc<dyn ExecutionPlan> = Arc::new(
2282            MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
2283        );
2284
2285        (left, right, on)
2286    }
2287
2288    async fn assert_empty_build_probe_behavior(
2289        join_types: &[JoinType],
2290        expect_probe_error: bool,
2291        with_filter: bool,
2292    ) {
2293        let (left, right, on) = empty_build_with_probe_error_inputs();
2294        let filter = prepare_join_filter();
2295
2296        for join_type in join_types {
2297            let join = if with_filter {
2298                join_with_filter(
2299                    Arc::clone(&left),
2300                    Arc::clone(&right),
2301                    on.clone(),
2302                    filter.clone(),
2303                    join_type,
2304                    NullEquality::NullEqualsNothing,
2305                )
2306                .unwrap()
2307            } else {
2308                join(
2309                    Arc::clone(&left),
2310                    Arc::clone(&right),
2311                    on.clone(),
2312                    join_type,
2313                    NullEquality::NullEqualsNothing,
2314                )
2315                .unwrap()
2316            };
2317
2318            let result = common::collect(
2319                join.execute(0, Arc::new(TaskContext::default())).unwrap(),
2320            )
2321            .await;
2322
2323            if expect_probe_error {
2324                let result_string = result.unwrap_err().to_string();
2325                assert!(
2326                    result_string.contains("bad data error"),
2327                    "actual: {result_string}"
2328                );
2329            } else {
2330                let batches = result.unwrap();
2331                assert!(
2332                    batches.is_empty(),
2333                    "expected no output batches for {join_type}, got {batches:?}"
2334                );
2335            }
2336        }
2337    }
2338
2339    fn hash_join_with_dynamic_filter(
2340        left: Arc<dyn ExecutionPlan>,
2341        right: Arc<dyn ExecutionPlan>,
2342        on: JoinOn,
2343        join_type: JoinType,
2344    ) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2345        hash_join_with_dynamic_filter_and_mode(
2346            left,
2347            right,
2348            on,
2349            join_type,
2350            PartitionMode::CollectLeft,
2351        )
2352    }
2353
2354    fn hash_join_with_dynamic_filter_and_mode(
2355        left: Arc<dyn ExecutionPlan>,
2356        right: Arc<dyn ExecutionPlan>,
2357        on: JoinOn,
2358        join_type: JoinType,
2359        mode: PartitionMode,
2360    ) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2361        let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
2362        let mut join = HashJoinExec::try_new(
2363            left,
2364            right,
2365            on,
2366            None,
2367            &join_type,
2368            None,
2369            mode,
2370            NullEquality::NullEqualsNothing,
2371            false,
2372        )?;
2373        join.dynamic_filter = Some(HashJoinExecDynamicFilter {
2374            filter: Arc::clone(&dynamic_filter),
2375            build_accumulator: OnceLock::new(),
2376        });
2377
2378        Ok((join, dynamic_filter))
2379    }
2380
2381    async fn join_collect(
2382        left: Arc<dyn ExecutionPlan>,
2383        right: Arc<dyn ExecutionPlan>,
2384        on: JoinOn,
2385        join_type: &JoinType,
2386        null_equality: NullEquality,
2387        context: Arc<TaskContext>,
2388    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2389        let join = join(left, right, on, join_type, null_equality)?;
2390        let columns_header = columns(&join.schema());
2391
2392        let stream = join.execute(0, context)?;
2393        let batches = common::collect(stream).await?;
2394        let metrics = join.metrics().unwrap();
2395
2396        Ok((columns_header, batches, metrics))
2397    }
2398
2399    async fn partitioned_join_collect(
2400        left: Arc<dyn ExecutionPlan>,
2401        right: Arc<dyn ExecutionPlan>,
2402        on: JoinOn,
2403        join_type: &JoinType,
2404        null_equality: NullEquality,
2405        context: Arc<TaskContext>,
2406    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2407        join_collect_with_partition_mode(
2408            left,
2409            right,
2410            on,
2411            join_type,
2412            PartitionMode::Partitioned,
2413            null_equality,
2414            context,
2415        )
2416        .await
2417    }
2418
2419    async fn join_collect_with_partition_mode(
2420        left: Arc<dyn ExecutionPlan>,
2421        right: Arc<dyn ExecutionPlan>,
2422        on: JoinOn,
2423        join_type: &JoinType,
2424        partition_mode: PartitionMode,
2425        null_equality: NullEquality,
2426        context: Arc<TaskContext>,
2427    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2428        let partition_count = 4;
2429
2430        let (left_expr, right_expr) = on
2431            .iter()
2432            .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
2433            .unzip();
2434
2435        let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2436            PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
2437            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2438                left,
2439                Partitioning::Hash(left_expr, partition_count),
2440            )?),
2441            PartitionMode::Auto => {
2442                return internal_err!("Unexpected PartitionMode::Auto in join tests");
2443            }
2444        };
2445
2446        let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2447            PartitionMode::CollectLeft => {
2448                let partition_column_name = right.schema().field(0).name().clone();
2449                let partition_expr = vec![Arc::new(Column::new_with_schema(
2450                    &partition_column_name,
2451                    &right.schema(),
2452                )?) as _];
2453                Arc::new(RepartitionExec::try_new(
2454                    right,
2455                    Partitioning::Hash(partition_expr, partition_count),
2456                )?) as _
2457            }
2458            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2459                right,
2460                Partitioning::Hash(right_expr, partition_count),
2461            )?),
2462            PartitionMode::Auto => {
2463                return internal_err!("Unexpected PartitionMode::Auto in join tests");
2464            }
2465        };
2466
2467        let join = HashJoinExec::try_new(
2468            left_repartitioned,
2469            right_repartitioned,
2470            on,
2471            None,
2472            join_type,
2473            None,
2474            partition_mode,
2475            null_equality,
2476            false,
2477        )?;
2478
2479        let columns = columns(&join.schema());
2480
2481        let mut batches = vec![];
2482        for i in 0..partition_count {
2483            let stream = join.execute(i, Arc::clone(&context))?;
2484            let more_batches = common::collect(stream).await?;
2485            batches.extend(
2486                more_batches
2487                    .into_iter()
2488                    .filter(|b| b.num_rows() > 0)
2489                    .collect::<Vec<_>>(),
2490            );
2491        }
2492        let metrics = join.metrics().unwrap();
2493
2494        Ok((columns, batches, metrics))
2495    }
2496
2497    #[apply(hash_join_exec_configs)]
2498    #[tokio::test]
2499    async fn join_inner_one(
2500        batch_size: usize,
2501        use_perfect_hash_join_as_possible: bool,
2502    ) -> Result<()> {
2503        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2504        let left = build_table(
2505            ("a1", &vec![1, 2, 3]),
2506            ("b1", &vec![4, 5, 5]), // this has a repetition
2507            ("c1", &vec![7, 8, 9]),
2508        );
2509        let right = build_table(
2510            ("a2", &vec![10, 20, 30]),
2511            ("b1", &vec![4, 5, 6]),
2512            ("c2", &vec![70, 80, 90]),
2513        );
2514
2515        let on = vec![(
2516            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2517            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2518        )];
2519
2520        let (columns, batches, metrics) = join_collect(
2521            Arc::clone(&left),
2522            Arc::clone(&right),
2523            on.clone(),
2524            &JoinType::Inner,
2525            NullEquality::NullEqualsNothing,
2526            task_ctx,
2527        )
2528        .await?;
2529
2530        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2531
2532        allow_duplicates! {
2533            // Inner join output is expected to preserve both inputs order
2534            assert_snapshot!(batches_to_string(&batches), @r"
2535            +----+----+----+----+----+----+
2536            | a1 | b1 | c1 | a2 | b1 | c2 |
2537            +----+----+----+----+----+----+
2538            | 1  | 4  | 7  | 10 | 4  | 70 |
2539            | 2  | 5  | 8  | 20 | 5  | 80 |
2540            | 3  | 5  | 9  | 20 | 5  | 80 |
2541            +----+----+----+----+----+----+
2542            ");
2543        }
2544
2545        assert_join_metrics!(metrics, 3);
2546        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2547
2548        Ok(())
2549    }
2550
2551    #[apply(hash_join_exec_configs)]
2552    #[tokio::test]
2553    async fn partitioned_join_inner_one(
2554        batch_size: usize,
2555        use_perfect_hash_join_as_possible: bool,
2556    ) -> Result<()> {
2557        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2558        let left = build_table(
2559            ("a1", &vec![1, 2, 3]),
2560            ("b1", &vec![4, 5, 5]), // this has a repetition
2561            ("c1", &vec![7, 8, 9]),
2562        );
2563        let right = build_table(
2564            ("a2", &vec![10, 20, 30]),
2565            ("b1", &vec![4, 5, 6]),
2566            ("c2", &vec![70, 80, 90]),
2567        );
2568        let on = vec![(
2569            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2570            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2571        )];
2572
2573        let (columns, batches, metrics) = partitioned_join_collect(
2574            Arc::clone(&left),
2575            Arc::clone(&right),
2576            on.clone(),
2577            &JoinType::Inner,
2578            NullEquality::NullEqualsNothing,
2579            task_ctx,
2580        )
2581        .await?;
2582
2583        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2584
2585        allow_duplicates! {
2586            assert_snapshot!(batches_to_sort_string(&batches), @r"
2587            +----+----+----+----+----+----+
2588            | a1 | b1 | c1 | a2 | b1 | c2 |
2589            +----+----+----+----+----+----+
2590            | 1  | 4  | 7  | 10 | 4  | 70 |
2591            | 2  | 5  | 8  | 20 | 5  | 80 |
2592            | 3  | 5  | 9  | 20 | 5  | 80 |
2593            +----+----+----+----+----+----+
2594            ");
2595        }
2596
2597        assert_join_metrics!(metrics, 3);
2598        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2599
2600        Ok(())
2601    }
2602
2603    #[tokio::test]
2604    async fn join_inner_one_no_shared_column_names() -> Result<()> {
2605        let task_ctx = Arc::new(TaskContext::default());
2606        let left = build_table(
2607            ("a1", &vec![1, 2, 3]),
2608            ("b1", &vec![4, 5, 5]), // this has a repetition
2609            ("c1", &vec![7, 8, 9]),
2610        );
2611        let right = build_table(
2612            ("a2", &vec![10, 20, 30]),
2613            ("b2", &vec![4, 5, 6]),
2614            ("c2", &vec![70, 80, 90]),
2615        );
2616        let on = vec![(
2617            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2618            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2619        )];
2620
2621        let (columns, batches, metrics) = join_collect(
2622            left,
2623            right,
2624            on,
2625            &JoinType::Inner,
2626            NullEquality::NullEqualsNothing,
2627            task_ctx,
2628        )
2629        .await?;
2630
2631        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2632
2633        // Inner join output is expected to preserve both inputs order
2634        allow_duplicates! {
2635            assert_snapshot!(batches_to_string(&batches), @r"
2636            +----+----+----+----+----+----+
2637            | a1 | b1 | c1 | a2 | b2 | c2 |
2638            +----+----+----+----+----+----+
2639            | 1  | 4  | 7  | 10 | 4  | 70 |
2640            | 2  | 5  | 8  | 20 | 5  | 80 |
2641            | 3  | 5  | 9  | 20 | 5  | 80 |
2642            +----+----+----+----+----+----+
2643            ");
2644        }
2645
2646        assert_join_metrics!(metrics, 3);
2647
2648        Ok(())
2649    }
2650
2651    #[tokio::test]
2652    async fn join_inner_one_randomly_ordered() -> Result<()> {
2653        let task_ctx = Arc::new(TaskContext::default());
2654        let left = build_table(
2655            ("a1", &vec![0, 3, 2, 1]),
2656            ("b1", &vec![4, 5, 5, 4]),
2657            ("c1", &vec![6, 9, 8, 7]),
2658        );
2659        let right = build_table(
2660            ("a2", &vec![20, 30, 10]),
2661            ("b2", &vec![5, 6, 4]),
2662            ("c2", &vec![80, 90, 70]),
2663        );
2664        let on = vec![(
2665            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2666            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2667        )];
2668
2669        let (columns, batches, metrics) = join_collect(
2670            left,
2671            right,
2672            on,
2673            &JoinType::Inner,
2674            NullEquality::NullEqualsNothing,
2675            task_ctx,
2676        )
2677        .await?;
2678
2679        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2680
2681        // Inner join output is expected to preserve both inputs order
2682        allow_duplicates! {
2683            assert_snapshot!(batches_to_string(&batches), @r"
2684            +----+----+----+----+----+----+
2685            | a1 | b1 | c1 | a2 | b2 | c2 |
2686            +----+----+----+----+----+----+
2687            | 3  | 5  | 9  | 20 | 5  | 80 |
2688            | 2  | 5  | 8  | 20 | 5  | 80 |
2689            | 0  | 4  | 6  | 10 | 4  | 70 |
2690            | 1  | 4  | 7  | 10 | 4  | 70 |
2691            +----+----+----+----+----+----+
2692            ");
2693        }
2694
2695        assert_join_metrics!(metrics, 4);
2696
2697        Ok(())
2698    }
2699
2700    #[apply(hash_join_exec_configs)]
2701    #[tokio::test]
2702    async fn join_inner_two(
2703        batch_size: usize,
2704        use_perfect_hash_join_as_possible: bool,
2705    ) -> Result<()> {
2706        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2707        let left = build_table(
2708            ("a1", &vec![1, 2, 2]),
2709            ("b2", &vec![1, 2, 2]),
2710            ("c1", &vec![7, 8, 9]),
2711        );
2712        let right = build_table(
2713            ("a1", &vec![1, 2, 3]),
2714            ("b2", &vec![1, 2, 2]),
2715            ("c2", &vec![70, 80, 90]),
2716        );
2717        let on = vec![
2718            (
2719                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2720                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2721            ),
2722            (
2723                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2724                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2725            ),
2726        ];
2727
2728        let (columns, batches, metrics) = join_collect(
2729            left,
2730            right,
2731            on,
2732            &JoinType::Inner,
2733            NullEquality::NullEqualsNothing,
2734            task_ctx,
2735        )
2736        .await?;
2737
2738        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2739
2740        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2741            // Expected number of hash table matches = 3
2742            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2743            let mut expected_batch_count = div_ceil(3, batch_size);
2744            if batch_size == 1 {
2745                expected_batch_count += 1;
2746            }
2747            expected_batch_count
2748        } else {
2749            // With hash collisions enabled, all records will match each other
2750            // and filtered later.
2751            div_ceil(9, batch_size)
2752        };
2753
2754        // With batch coalescing, we may have fewer batches than expected
2755        assert!(
2756            batches.len() <= expected_batch_count,
2757            "expected at most {expected_batch_count} batches, got {}",
2758            batches.len()
2759        );
2760
2761        // Inner join output is expected to preserve both inputs order
2762        allow_duplicates! {
2763            assert_snapshot!(batches_to_string(&batches), @r"
2764            +----+----+----+----+----+----+
2765            | a1 | b2 | c1 | a1 | b2 | c2 |
2766            +----+----+----+----+----+----+
2767            | 1  | 1  | 7  | 1  | 1  | 70 |
2768            | 2  | 2  | 8  | 2  | 2  | 80 |
2769            | 2  | 2  | 9  | 2  | 2  | 80 |
2770            +----+----+----+----+----+----+
2771            ");
2772        }
2773
2774        assert_join_metrics!(metrics, 3);
2775
2776        Ok(())
2777    }
2778
2779    /// Test where the left has 2 parts, the right with 1 part => 1 part
2780    #[apply(hash_join_exec_configs)]
2781    #[tokio::test]
2782    async fn join_inner_one_two_parts_left(
2783        batch_size: usize,
2784        use_perfect_hash_join_as_possible: bool,
2785    ) -> Result<()> {
2786        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2787        let batch1 = build_table_i32(
2788            ("a1", &vec![1, 2]),
2789            ("b2", &vec![1, 2]),
2790            ("c1", &vec![7, 8]),
2791        );
2792        let batch2 =
2793            build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2794        let schema = batch1.schema();
2795        let left =
2796            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2797                .unwrap();
2798        let left = Arc::new(CoalescePartitionsExec::new(left));
2799
2800        let right = build_table(
2801            ("a1", &vec![1, 2, 3]),
2802            ("b2", &vec![1, 2, 2]),
2803            ("c2", &vec![70, 80, 90]),
2804        );
2805        let on = vec![
2806            (
2807                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2808                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2809            ),
2810            (
2811                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2812                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2813            ),
2814        ];
2815
2816        let (columns, batches, metrics) = join_collect(
2817            left,
2818            right,
2819            on,
2820            &JoinType::Inner,
2821            NullEquality::NullEqualsNothing,
2822            task_ctx,
2823        )
2824        .await?;
2825
2826        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2827
2828        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2829            // Expected number of hash table matches = 3
2830            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2831            let mut expected_batch_count = div_ceil(3, batch_size);
2832            if batch_size == 1 {
2833                expected_batch_count += 1;
2834            }
2835            expected_batch_count
2836        } else {
2837            // With hash collisions enabled, all records will match each other
2838            // and filtered later.
2839            div_ceil(9, batch_size)
2840        };
2841
2842        // With batch coalescing, we may have fewer batches than expected
2843        assert!(
2844            batches.len() <= expected_batch_count,
2845            "expected at most {expected_batch_count} batches, got {}",
2846            batches.len()
2847        );
2848
2849        // Inner join output is expected to preserve both inputs order
2850        allow_duplicates! {
2851            assert_snapshot!(batches_to_string(&batches), @r"
2852            +----+----+----+----+----+----+
2853            | a1 | b2 | c1 | a1 | b2 | c2 |
2854            +----+----+----+----+----+----+
2855            | 1  | 1  | 7  | 1  | 1  | 70 |
2856            | 2  | 2  | 8  | 2  | 2  | 80 |
2857            | 2  | 2  | 9  | 2  | 2  | 80 |
2858            +----+----+----+----+----+----+
2859            ");
2860        }
2861
2862        assert_join_metrics!(metrics, 3);
2863
2864        Ok(())
2865    }
2866
2867    #[tokio::test]
2868    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2869        let task_ctx = Arc::new(TaskContext::default());
2870        let batch1 = build_table_i32(
2871            ("a1", &vec![0, 3]),
2872            ("b1", &vec![4, 5]),
2873            ("c1", &vec![6, 9]),
2874        );
2875        let batch2 = build_table_i32(
2876            ("a1", &vec![2, 1]),
2877            ("b1", &vec![5, 4]),
2878            ("c1", &vec![8, 7]),
2879        );
2880        let schema = batch1.schema();
2881
2882        let left =
2883            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2884                .unwrap();
2885        let left = Arc::new(CoalescePartitionsExec::new(left));
2886        let right = build_table(
2887            ("a2", &vec![20, 30, 10]),
2888            ("b2", &vec![5, 6, 4]),
2889            ("c2", &vec![80, 90, 70]),
2890        );
2891        let on = vec![(
2892            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2893            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2894        )];
2895
2896        let (columns, batches, metrics) = join_collect(
2897            left,
2898            right,
2899            on,
2900            &JoinType::Inner,
2901            NullEquality::NullEqualsNothing,
2902            task_ctx,
2903        )
2904        .await?;
2905
2906        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2907
2908        // Inner join output is expected to preserve both inputs order
2909        allow_duplicates! {
2910            assert_snapshot!(batches_to_string(&batches), @r"
2911            +----+----+----+----+----+----+
2912            | a1 | b1 | c1 | a2 | b2 | c2 |
2913            +----+----+----+----+----+----+
2914            | 3  | 5  | 9  | 20 | 5  | 80 |
2915            | 2  | 5  | 8  | 20 | 5  | 80 |
2916            | 0  | 4  | 6  | 10 | 4  | 70 |
2917            | 1  | 4  | 7  | 10 | 4  | 70 |
2918            +----+----+----+----+----+----+
2919            ");
2920        }
2921
2922        assert_join_metrics!(metrics, 4);
2923
2924        Ok(())
2925    }
2926
2927    /// Test where the left has 1 part, the right has 2 parts => 2 parts
2928    #[apply(hash_join_exec_configs)]
2929    #[tokio::test]
2930    async fn join_inner_one_two_parts_right(
2931        batch_size: usize,
2932        use_perfect_hash_join_as_possible: bool,
2933    ) -> Result<()> {
2934        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2935        let left = build_table(
2936            ("a1", &vec![1, 2, 3]),
2937            ("b1", &vec![4, 5, 5]), // this has a repetition
2938            ("c1", &vec![7, 8, 9]),
2939        );
2940
2941        let batch1 = build_table_i32(
2942            ("a2", &vec![10, 20]),
2943            ("b1", &vec![4, 6]),
2944            ("c2", &vec![70, 80]),
2945        );
2946        let batch2 =
2947            build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2948        let schema = batch1.schema();
2949        let right =
2950            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2951                .unwrap();
2952
2953        let on = vec![(
2954            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2955            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2956        )];
2957
2958        let join = join(
2959            left,
2960            right,
2961            on,
2962            &JoinType::Inner,
2963            NullEquality::NullEqualsNothing,
2964        )?;
2965
2966        let columns = columns(&join.schema());
2967        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2968
2969        // first part
2970        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2971        let batches = common::collect(stream).await?;
2972
2973        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2974            // Expected number of hash table matches for first right batch = 1
2975            // and additional empty batch for non-joined 20-6-80
2976            let mut expected_batch_count = div_ceil(1, batch_size);
2977            if batch_size == 1 {
2978                expected_batch_count += 1;
2979            }
2980            expected_batch_count
2981        } else {
2982            // With hash collisions enabled, all records will match each other
2983            // and filtered later.
2984            div_ceil(6, batch_size)
2985        };
2986        // With batch coalescing, we may have fewer batches than expected
2987        assert!(
2988            batches.len() <= expected_batch_count,
2989            "expected at most {expected_batch_count} batches, got {}",
2990            batches.len()
2991        );
2992
2993        // Inner join output is expected to preserve both inputs order
2994        allow_duplicates! {
2995            assert_snapshot!(batches_to_string(&batches), @r"
2996            +----+----+----+----+----+----+
2997            | a1 | b1 | c1 | a2 | b1 | c2 |
2998            +----+----+----+----+----+----+
2999            | 1  | 4  | 7  | 10 | 4  | 70 |
3000            +----+----+----+----+----+----+
3001            ");
3002        }
3003
3004        // second part
3005        let stream = join.execute(1, Arc::clone(&task_ctx))?;
3006        let batches = common::collect(stream).await?;
3007
3008        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
3009            // Expected number of hash table matches for second right batch = 2
3010            div_ceil(2, batch_size)
3011        } else {
3012            // With hash collisions enabled, all records will match each other
3013            // and filtered later.
3014            div_ceil(3, batch_size)
3015        };
3016        // With batch coalescing, we may have fewer batches than expected
3017        assert!(
3018            batches.len() <= expected_batch_count,
3019            "expected at most {expected_batch_count} batches, got {}",
3020            batches.len()
3021        );
3022
3023        // Inner join output is expected to preserve both inputs order
3024        allow_duplicates! {
3025            assert_snapshot!(batches_to_string(&batches), @r"
3026            +----+----+----+----+----+----+
3027            | a1 | b1 | c1 | a2 | b1 | c2 |
3028            +----+----+----+----+----+----+
3029            | 2  | 5  | 8  | 30 | 5  | 90 |
3030            | 3  | 5  | 9  | 30 | 5  | 90 |
3031            +----+----+----+----+----+----+
3032            ");
3033        }
3034
3035        let metrics = join.metrics().unwrap();
3036        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3037
3038        Ok(())
3039    }
3040
3041    fn build_table_two_batches(
3042        a: (&str, &Vec<i32>),
3043        b: (&str, &Vec<i32>),
3044        c: (&str, &Vec<i32>),
3045    ) -> Arc<dyn ExecutionPlan> {
3046        let batch = build_table_i32(a, b, c);
3047        let schema = batch.schema();
3048        TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
3049    }
3050
3051    #[apply(hash_join_exec_configs)]
3052    #[tokio::test]
3053    async fn join_left_multi_batch(
3054        batch_size: usize,
3055        use_perfect_hash_join_as_possible: bool,
3056    ) -> Result<()> {
3057        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3058        let left = build_table(
3059            ("a1", &vec![1, 2, 3]),
3060            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3061            ("c1", &vec![7, 8, 9]),
3062        );
3063        let right = build_table_two_batches(
3064            ("a2", &vec![10, 20, 30]),
3065            ("b1", &vec![4, 5, 6]),
3066            ("c2", &vec![70, 80, 90]),
3067        );
3068        let on = vec![(
3069            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3070            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3071        )];
3072
3073        let join = join(
3074            Arc::clone(&left),
3075            Arc::clone(&right),
3076            on.clone(),
3077            &JoinType::Left,
3078            NullEquality::NullEqualsNothing,
3079        )
3080        .unwrap();
3081
3082        let columns = columns(&join.schema());
3083        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3084
3085        let (_, batches, metrics) = join_collect(
3086            Arc::clone(&left),
3087            Arc::clone(&right),
3088            on.clone(),
3089            &JoinType::Left,
3090            NullEquality::NullEqualsNothing,
3091            task_ctx,
3092        )
3093        .await?;
3094
3095        allow_duplicates! {
3096            assert_snapshot!(batches_to_sort_string(&batches), @r"
3097            +----+----+----+----+----+----+
3098            | a1 | b1 | c1 | a2 | b1 | c2 |
3099            +----+----+----+----+----+----+
3100            | 1  | 4  | 7  | 10 | 4  | 70 |
3101            | 1  | 4  | 7  | 10 | 4  | 70 |
3102            | 2  | 5  | 8  | 20 | 5  | 80 |
3103            | 2  | 5  | 8  | 20 | 5  | 80 |
3104            | 3  | 7  | 9  |    |    |    |
3105            +----+----+----+----+----+----+
3106            ");
3107        }
3108
3109        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3110        return Ok(());
3111    }
3112
3113    #[apply(hash_join_exec_configs)]
3114    #[tokio::test]
3115    async fn join_full_multi_batch(
3116        batch_size: usize,
3117        use_perfect_hash_join_as_possible: bool,
3118    ) {
3119        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3120        let left = build_table(
3121            ("a1", &vec![1, 2, 3]),
3122            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3123            ("c1", &vec![7, 8, 9]),
3124        );
3125        // create two identical batches for the right side
3126        let right = build_table_two_batches(
3127            ("a2", &vec![10, 20, 30]),
3128            ("b2", &vec![4, 5, 6]),
3129            ("c2", &vec![70, 80, 90]),
3130        );
3131        let on = vec![(
3132            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3133            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3134        )];
3135
3136        let join = join(
3137            left,
3138            right,
3139            on,
3140            &JoinType::Full,
3141            NullEquality::NullEqualsNothing,
3142        )
3143        .unwrap();
3144
3145        let columns = columns(&join.schema());
3146        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3147
3148        let stream = join.execute(0, task_ctx).unwrap();
3149        let batches = common::collect(stream).await.unwrap();
3150        let metrics = join.metrics().unwrap();
3151
3152        allow_duplicates! {
3153            assert_snapshot!(batches_to_sort_string(&batches), @r"
3154            +----+----+----+----+----+----+
3155            | a1 | b1 | c1 | a2 | b2 | c2 |
3156            +----+----+----+----+----+----+
3157            |    |    |    | 30 | 6  | 90 |
3158            |    |    |    | 30 | 6  | 90 |
3159            | 1  | 4  | 7  | 10 | 4  | 70 |
3160            | 1  | 4  | 7  | 10 | 4  | 70 |
3161            | 2  | 5  | 8  | 20 | 5  | 80 |
3162            | 2  | 5  | 8  | 20 | 5  | 80 |
3163            | 3  | 7  | 9  |    |    |    |
3164            +----+----+----+----+----+----+
3165            ");
3166        }
3167
3168        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3169    }
3170
3171    #[apply(hash_join_exec_configs)]
3172    #[tokio::test]
3173    async fn join_left_empty_right(
3174        batch_size: usize,
3175        use_perfect_hash_join_as_possible: bool,
3176    ) {
3177        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3178        let left = build_table(
3179            ("a1", &vec![1, 2, 3]),
3180            ("b1", &vec![4, 5, 7]),
3181            ("c1", &vec![7, 8, 9]),
3182        );
3183        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3184        let on = vec![(
3185            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3186            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3187        )];
3188        let schema = right.schema();
3189        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3190        let join = join(
3191            left,
3192            right,
3193            on,
3194            &JoinType::Left,
3195            NullEquality::NullEqualsNothing,
3196        )
3197        .unwrap();
3198
3199        let columns = columns(&join.schema());
3200        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3201
3202        let stream = join.execute(0, task_ctx).unwrap();
3203        let batches = common::collect(stream).await.unwrap();
3204        let metrics = join.metrics().unwrap();
3205
3206        allow_duplicates! {
3207            assert_snapshot!(batches_to_sort_string(&batches), @r"
3208            +----+----+----+----+----+----+
3209            | a1 | b1 | c1 | a2 | b1 | c2 |
3210            +----+----+----+----+----+----+
3211            | 1  | 4  | 7  |    |    |    |
3212            | 2  | 5  | 8  |    |    |    |
3213            | 3  | 7  | 9  |    |    |    |
3214            +----+----+----+----+----+----+
3215            ");
3216        }
3217
3218        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3219    }
3220
3221    #[apply(hash_join_exec_configs)]
3222    #[tokio::test]
3223    async fn join_full_empty_right(
3224        batch_size: usize,
3225        use_perfect_hash_join_as_possible: bool,
3226    ) {
3227        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3228        let left = build_table(
3229            ("a1", &vec![1, 2, 3]),
3230            ("b1", &vec![4, 5, 7]),
3231            ("c1", &vec![7, 8, 9]),
3232        );
3233        let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
3234        let on = vec![(
3235            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3236            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3237        )];
3238        let schema = right.schema();
3239        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3240        let join = join(
3241            left,
3242            right,
3243            on,
3244            &JoinType::Full,
3245            NullEquality::NullEqualsNothing,
3246        )
3247        .unwrap();
3248
3249        let columns = columns(&join.schema());
3250        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3251
3252        let stream = join.execute(0, task_ctx).unwrap();
3253        let batches = common::collect(stream).await.unwrap();
3254        let metrics = join.metrics().unwrap();
3255
3256        allow_duplicates! {
3257            assert_snapshot!(batches_to_sort_string(&batches), @r"
3258            +----+----+----+----+----+----+
3259            | a1 | b1 | c1 | a2 | b2 | c2 |
3260            +----+----+----+----+----+----+
3261            | 1  | 4  | 7  |    |    |    |
3262            | 2  | 5  | 8  |    |    |    |
3263            | 3  | 7  | 9  |    |    |    |
3264            +----+----+----+----+----+----+
3265            ");
3266        }
3267
3268        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3269    }
3270
3271    #[apply(hash_join_exec_configs)]
3272    #[tokio::test]
3273    async fn join_left_one(
3274        batch_size: usize,
3275        use_perfect_hash_join_as_possible: bool,
3276    ) -> Result<()> {
3277        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3278        let left = build_table(
3279            ("a1", &vec![1, 2, 3]),
3280            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3281            ("c1", &vec![7, 8, 9]),
3282        );
3283        let right = build_table(
3284            ("a2", &vec![10, 20, 30]),
3285            ("b1", &vec![4, 5, 6]),
3286            ("c2", &vec![70, 80, 90]),
3287        );
3288        let on = vec![(
3289            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3290            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3291        )];
3292
3293        let (columns, batches, metrics) = join_collect(
3294            Arc::clone(&left),
3295            Arc::clone(&right),
3296            on.clone(),
3297            &JoinType::Left,
3298            NullEquality::NullEqualsNothing,
3299            task_ctx,
3300        )
3301        .await?;
3302
3303        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3304
3305        allow_duplicates! {
3306            assert_snapshot!(batches_to_sort_string(&batches), @r"
3307            +----+----+----+----+----+----+
3308            | a1 | b1 | c1 | a2 | b1 | c2 |
3309            +----+----+----+----+----+----+
3310            | 1  | 4  | 7  | 10 | 4  | 70 |
3311            | 2  | 5  | 8  | 20 | 5  | 80 |
3312            | 3  | 7  | 9  |    |    |    |
3313            +----+----+----+----+----+----+
3314            ");
3315        }
3316
3317        assert_join_metrics!(metrics, 3);
3318        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3319
3320        Ok(())
3321    }
3322
3323    #[apply(hash_join_exec_configs)]
3324    #[tokio::test]
3325    async fn partitioned_join_left_one(
3326        batch_size: usize,
3327        use_perfect_hash_join_as_possible: bool,
3328    ) -> Result<()> {
3329        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3330        let left = build_table(
3331            ("a1", &vec![1, 2, 3]),
3332            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3333            ("c1", &vec![7, 8, 9]),
3334        );
3335        let right = build_table(
3336            ("a2", &vec![10, 20, 30]),
3337            ("b1", &vec![4, 5, 6]),
3338            ("c2", &vec![70, 80, 90]),
3339        );
3340        let on = vec![(
3341            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3342            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3343        )];
3344
3345        let (columns, batches, metrics) = partitioned_join_collect(
3346            Arc::clone(&left),
3347            Arc::clone(&right),
3348            on.clone(),
3349            &JoinType::Left,
3350            NullEquality::NullEqualsNothing,
3351            task_ctx,
3352        )
3353        .await?;
3354
3355        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3356
3357        allow_duplicates! {
3358            assert_snapshot!(batches_to_sort_string(&batches), @r"
3359            +----+----+----+----+----+----+
3360            | a1 | b1 | c1 | a2 | b1 | c2 |
3361            +----+----+----+----+----+----+
3362            | 1  | 4  | 7  | 10 | 4  | 70 |
3363            | 2  | 5  | 8  | 20 | 5  | 80 |
3364            | 3  | 7  | 9  |    |    |    |
3365            +----+----+----+----+----+----+
3366            ");
3367        }
3368
3369        assert_join_metrics!(metrics, 3);
3370        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3371
3372        Ok(())
3373    }
3374
3375    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
3376        // just two line match
3377        // b1 = 10
3378        build_table(
3379            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
3380            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
3381            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
3382        )
3383    }
3384
3385    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
3386        // just two line match
3387        // b2 = 10
3388        build_table(
3389            ("a2", &vec![8, 12, 6, 2, 10, 4]),
3390            ("b2", &vec![8, 10, 6, 2, 10, 4]),
3391            ("c2", &vec![20, 40, 60, 80, 100, 120]),
3392        )
3393    }
3394
3395    #[apply(hash_join_exec_configs)]
3396    #[tokio::test]
3397    async fn join_left_semi(
3398        batch_size: usize,
3399        use_perfect_hash_join_as_possible: bool,
3400    ) -> Result<()> {
3401        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3402        let left = build_semi_anti_left_table();
3403        let right = build_semi_anti_right_table();
3404        // left_table left semi join right_table on left_table.b1 = right_table.b2
3405        let on = vec![(
3406            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3407            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3408        )];
3409
3410        let join = join(
3411            left,
3412            right,
3413            on,
3414            &JoinType::LeftSemi,
3415            NullEquality::NullEqualsNothing,
3416        )?;
3417
3418        let columns = columns(&join.schema());
3419        assert_eq!(columns, vec!["a1", "b1", "c1"]);
3420
3421        let stream = join.execute(0, task_ctx)?;
3422        let batches = common::collect(stream).await?;
3423
3424        // ignore the order
3425        allow_duplicates! {
3426            assert_snapshot!(batches_to_sort_string(&batches), @r"
3427            +----+----+-----+
3428            | a1 | b1 | c1  |
3429            +----+----+-----+
3430            | 11 | 8  | 110 |
3431            | 13 | 10 | 130 |
3432            | 9  | 8  | 90  |
3433            +----+----+-----+
3434            ");
3435        }
3436
3437        let metrics = join.metrics().unwrap();
3438        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3439
3440        Ok(())
3441    }
3442
3443    #[apply(hash_join_exec_configs)]
3444    #[tokio::test]
3445    async fn join_left_semi_with_filter(
3446        batch_size: usize,
3447        use_perfect_hash_join_as_possible: bool,
3448    ) -> Result<()> {
3449        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3450        let left = build_semi_anti_left_table();
3451        let right = build_semi_anti_right_table();
3452
3453        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 10
3454        let on = vec![(
3455            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3456            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3457        )];
3458
3459        let column_indices = vec![ColumnIndex {
3460            index: 0,
3461            side: JoinSide::Right,
3462        }];
3463        let intermediate_schema =
3464            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3465
3466        let filter_expression = Arc::new(BinaryExpr::new(
3467            Arc::new(Column::new("x", 0)),
3468            Operator::NotEq,
3469            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3470        )) as Arc<dyn PhysicalExpr>;
3471
3472        let filter = JoinFilter::new(
3473            filter_expression,
3474            column_indices.clone(),
3475            Arc::new(intermediate_schema.clone()),
3476        );
3477
3478        let join = join_with_filter(
3479            Arc::clone(&left),
3480            Arc::clone(&right),
3481            on.clone(),
3482            filter,
3483            &JoinType::LeftSemi,
3484            NullEquality::NullEqualsNothing,
3485        )?;
3486
3487        let columns_header = columns(&join.schema());
3488        assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
3489
3490        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3491        let batches = common::collect(stream).await?;
3492
3493        allow_duplicates! {
3494            assert_snapshot!(batches_to_sort_string(&batches), @r"
3495            +----+----+-----+
3496            | a1 | b1 | c1  |
3497            +----+----+-----+
3498            | 11 | 8  | 110 |
3499            | 13 | 10 | 130 |
3500            | 9  | 8  | 90  |
3501            +----+----+-----+
3502            ");
3503        }
3504
3505        let metrics = join.metrics().unwrap();
3506        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3507
3508        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10
3509        let filter_expression = Arc::new(BinaryExpr::new(
3510            Arc::new(Column::new("x", 0)),
3511            Operator::Gt,
3512            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3513        )) as Arc<dyn PhysicalExpr>;
3514        let filter = JoinFilter::new(
3515            filter_expression,
3516            column_indices,
3517            Arc::new(intermediate_schema),
3518        );
3519
3520        let join = join_with_filter(
3521            left,
3522            right,
3523            on,
3524            filter,
3525            &JoinType::LeftSemi,
3526            NullEquality::NullEqualsNothing,
3527        )?;
3528
3529        let columns_header = columns(&join.schema());
3530        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3531
3532        let stream = join.execute(0, task_ctx)?;
3533        let batches = common::collect(stream).await?;
3534
3535        allow_duplicates! {
3536            assert_snapshot!(batches_to_sort_string(&batches), @r"
3537            +----+----+-----+
3538            | a1 | b1 | c1  |
3539            +----+----+-----+
3540            | 13 | 10 | 130 |
3541            +----+----+-----+
3542            ");
3543        }
3544
3545        let metrics = join.metrics().unwrap();
3546        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3547
3548        Ok(())
3549    }
3550
3551    #[apply(hash_join_exec_configs)]
3552    #[tokio::test]
3553    async fn join_right_semi(
3554        batch_size: usize,
3555        use_perfect_hash_join_as_possible: bool,
3556    ) -> Result<()> {
3557        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3558        let left = build_semi_anti_left_table();
3559        let right = build_semi_anti_right_table();
3560
3561        // left_table right semi join right_table on left_table.b1 = right_table.b2
3562        let on = vec![(
3563            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3564            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3565        )];
3566
3567        let join = join(
3568            left,
3569            right,
3570            on,
3571            &JoinType::RightSemi,
3572            NullEquality::NullEqualsNothing,
3573        )?;
3574
3575        let columns = columns(&join.schema());
3576        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3577
3578        let stream = join.execute(0, task_ctx)?;
3579        let batches = common::collect(stream).await?;
3580
3581        // RightSemi join output is expected to preserve right input order
3582        allow_duplicates! {
3583            assert_snapshot!(batches_to_string(&batches), @r"
3584            +----+----+-----+
3585            | a2 | b2 | c2  |
3586            +----+----+-----+
3587            | 8  | 8  | 20  |
3588            | 12 | 10 | 40  |
3589            | 10 | 10 | 100 |
3590            +----+----+-----+
3591            ");
3592        }
3593
3594        let metrics = join.metrics().unwrap();
3595        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3596
3597        Ok(())
3598    }
3599
3600    #[apply(hash_join_exec_configs)]
3601    #[tokio::test]
3602    async fn join_right_semi_with_filter(
3603        batch_size: usize,
3604        use_perfect_hash_join_as_possible: bool,
3605    ) -> Result<()> {
3606        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3607        let left = build_semi_anti_left_table();
3608        let right = build_semi_anti_right_table();
3609
3610        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
3611        let on = vec![(
3612            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3613            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3614        )];
3615
3616        let column_indices = vec![ColumnIndex {
3617            index: 0,
3618            side: JoinSide::Left,
3619        }];
3620        let intermediate_schema =
3621            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3622
3623        let filter_expression = Arc::new(BinaryExpr::new(
3624            Arc::new(Column::new("x", 0)),
3625            Operator::NotEq,
3626            Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
3627        )) as Arc<dyn PhysicalExpr>;
3628
3629        let filter = JoinFilter::new(
3630            filter_expression,
3631            column_indices.clone(),
3632            Arc::new(intermediate_schema.clone()),
3633        );
3634
3635        let join = join_with_filter(
3636            Arc::clone(&left),
3637            Arc::clone(&right),
3638            on.clone(),
3639            filter,
3640            &JoinType::RightSemi,
3641            NullEquality::NullEqualsNothing,
3642        )?;
3643
3644        let columns = columns(&join.schema());
3645        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3646
3647        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3648        let batches = common::collect(stream).await?;
3649
3650        // RightSemi join output is expected to preserve right input order
3651        allow_duplicates! {
3652            assert_snapshot!(batches_to_string(&batches), @r"
3653            +----+----+-----+
3654            | a2 | b2 | c2  |
3655            +----+----+-----+
3656            | 8  | 8  | 20  |
3657            | 12 | 10 | 40  |
3658            | 10 | 10 | 100 |
3659            +----+----+-----+
3660            ");
3661        }
3662
3663        let metrics = join.metrics().unwrap();
3664        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3665
3666        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
3667        let filter_expression = Arc::new(BinaryExpr::new(
3668            Arc::new(Column::new("x", 0)),
3669            Operator::Gt,
3670            Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
3671        )) as Arc<dyn PhysicalExpr>;
3672
3673        let filter = JoinFilter::new(
3674            filter_expression,
3675            column_indices,
3676            Arc::new(intermediate_schema.clone()),
3677        );
3678
3679        let join = join_with_filter(
3680            left,
3681            right,
3682            on,
3683            filter,
3684            &JoinType::RightSemi,
3685            NullEquality::NullEqualsNothing,
3686        )?;
3687        let stream = join.execute(0, task_ctx)?;
3688        let batches = common::collect(stream).await?;
3689
3690        // RightSemi join output is expected to preserve right input order
3691        allow_duplicates! {
3692            assert_snapshot!(batches_to_string(&batches), @r"
3693            +----+----+-----+
3694            | a2 | b2 | c2  |
3695            +----+----+-----+
3696            | 12 | 10 | 40  |
3697            | 10 | 10 | 100 |
3698            +----+----+-----+
3699            ");
3700        }
3701
3702        let metrics = join.metrics().unwrap();
3703        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3704
3705        Ok(())
3706    }
3707
3708    #[apply(hash_join_exec_configs)]
3709    #[tokio::test]
3710    async fn join_left_anti(
3711        batch_size: usize,
3712        use_perfect_hash_join_as_possible: bool,
3713    ) -> Result<()> {
3714        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3715        let left = build_semi_anti_left_table();
3716        let right = build_semi_anti_right_table();
3717        // left_table left anti join right_table on left_table.b1 = right_table.b2
3718        let on = vec![(
3719            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3720            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3721        )];
3722
3723        let join = join(
3724            left,
3725            right,
3726            on,
3727            &JoinType::LeftAnti,
3728            NullEquality::NullEqualsNothing,
3729        )?;
3730
3731        let columns = columns(&join.schema());
3732        assert_eq!(columns, vec!["a1", "b1", "c1"]);
3733
3734        let stream = join.execute(0, task_ctx)?;
3735        let batches = common::collect(stream).await?;
3736
3737        allow_duplicates! {
3738            assert_snapshot!(batches_to_sort_string(&batches), @r"
3739            +----+----+----+
3740            | a1 | b1 | c1 |
3741            +----+----+----+
3742            | 1  | 1  | 10 |
3743            | 3  | 3  | 30 |
3744            | 5  | 5  | 50 |
3745            | 7  | 7  | 70 |
3746            +----+----+----+
3747            ");
3748        }
3749
3750        let metrics = join.metrics().unwrap();
3751        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3752
3753        Ok(())
3754    }
3755
3756    #[apply(hash_join_exec_configs)]
3757    #[tokio::test]
3758    async fn join_left_anti_with_filter(
3759        batch_size: usize,
3760        use_perfect_hash_join_as_possible: bool,
3761    ) -> Result<()> {
3762        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3763        let left = build_semi_anti_left_table();
3764        let right = build_semi_anti_right_table();
3765        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8
3766        let on = vec![(
3767            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3768            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3769        )];
3770
3771        let column_indices = vec![ColumnIndex {
3772            index: 0,
3773            side: JoinSide::Right,
3774        }];
3775        let intermediate_schema =
3776            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3777        let filter_expression = Arc::new(BinaryExpr::new(
3778            Arc::new(Column::new("x", 0)),
3779            Operator::NotEq,
3780            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3781        )) as Arc<dyn PhysicalExpr>;
3782
3783        let filter = JoinFilter::new(
3784            filter_expression,
3785            column_indices.clone(),
3786            Arc::new(intermediate_schema.clone()),
3787        );
3788
3789        let join = join_with_filter(
3790            Arc::clone(&left),
3791            Arc::clone(&right),
3792            on.clone(),
3793            filter,
3794            &JoinType::LeftAnti,
3795            NullEquality::NullEqualsNothing,
3796        )?;
3797
3798        let columns_header = columns(&join.schema());
3799        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3800
3801        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3802        let batches = common::collect(stream).await?;
3803
3804        allow_duplicates! {
3805            assert_snapshot!(batches_to_sort_string(&batches), @r"
3806            +----+----+-----+
3807            | a1 | b1 | c1  |
3808            +----+----+-----+
3809            | 1  | 1  | 10  |
3810            | 11 | 8  | 110 |
3811            | 3  | 3  | 30  |
3812            | 5  | 5  | 50  |
3813            | 7  | 7  | 70  |
3814            | 9  | 8  | 90  |
3815            +----+----+-----+
3816            ");
3817        }
3818
3819        let metrics = join.metrics().unwrap();
3820        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3821
3822        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13
3823        let filter_expression = Arc::new(BinaryExpr::new(
3824            Arc::new(Column::new("x", 0)),
3825            Operator::NotEq,
3826            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3827        )) as Arc<dyn PhysicalExpr>;
3828
3829        let filter = JoinFilter::new(
3830            filter_expression,
3831            column_indices,
3832            Arc::new(intermediate_schema),
3833        );
3834
3835        let join = join_with_filter(
3836            left,
3837            right,
3838            on,
3839            filter,
3840            &JoinType::LeftAnti,
3841            NullEquality::NullEqualsNothing,
3842        )?;
3843
3844        let columns_header = columns(&join.schema());
3845        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3846
3847        let stream = join.execute(0, task_ctx)?;
3848        let batches = common::collect(stream).await?;
3849
3850        allow_duplicates! {
3851            assert_snapshot!(batches_to_sort_string(&batches), @r"
3852            +----+----+-----+
3853            | a1 | b1 | c1  |
3854            +----+----+-----+
3855            | 1  | 1  | 10  |
3856            | 11 | 8  | 110 |
3857            | 3  | 3  | 30  |
3858            | 5  | 5  | 50  |
3859            | 7  | 7  | 70  |
3860            | 9  | 8  | 90  |
3861            +----+----+-----+
3862            ");
3863        }
3864
3865        let metrics = join.metrics().unwrap();
3866        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3867
3868        Ok(())
3869    }
3870
3871    #[apply(hash_join_exec_configs)]
3872    #[tokio::test]
3873    async fn join_right_anti(
3874        batch_size: usize,
3875        use_perfect_hash_join_as_possible: bool,
3876    ) -> Result<()> {
3877        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3878        let left = build_semi_anti_left_table();
3879        let right = build_semi_anti_right_table();
3880        let on = vec![(
3881            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3882            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3883        )];
3884
3885        let join = join(
3886            left,
3887            right,
3888            on,
3889            &JoinType::RightAnti,
3890            NullEquality::NullEqualsNothing,
3891        )?;
3892
3893        let columns = columns(&join.schema());
3894        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3895
3896        let stream = join.execute(0, task_ctx)?;
3897        let batches = common::collect(stream).await?;
3898
3899        // RightAnti join output is expected to preserve right input order
3900        allow_duplicates! {
3901            assert_snapshot!(batches_to_string(&batches), @r"
3902            +----+----+-----+
3903            | a2 | b2 | c2  |
3904            +----+----+-----+
3905            | 6  | 6  | 60  |
3906            | 2  | 2  | 80  |
3907            | 4  | 4  | 120 |
3908            +----+----+-----+
3909            ");
3910        }
3911
3912        let metrics = join.metrics().unwrap();
3913        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3914
3915        Ok(())
3916    }
3917
3918    #[apply(hash_join_exec_configs)]
3919    #[tokio::test]
3920    async fn join_right_anti_with_filter(
3921        batch_size: usize,
3922        use_perfect_hash_join_as_possible: bool,
3923    ) -> Result<()> {
3924        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3925        let left = build_semi_anti_left_table();
3926        let right = build_semi_anti_right_table();
3927        // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13
3928        let on = vec![(
3929            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3930            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3931        )];
3932
3933        let column_indices = vec![ColumnIndex {
3934            index: 0,
3935            side: JoinSide::Left,
3936        }];
3937        let intermediate_schema =
3938            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3939
3940        let filter_expression = Arc::new(BinaryExpr::new(
3941            Arc::new(Column::new("x", 0)),
3942            Operator::NotEq,
3943            Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3944        )) as Arc<dyn PhysicalExpr>;
3945
3946        let filter = JoinFilter::new(
3947            filter_expression,
3948            column_indices,
3949            Arc::new(intermediate_schema.clone()),
3950        );
3951
3952        let join = join_with_filter(
3953            Arc::clone(&left),
3954            Arc::clone(&right),
3955            on.clone(),
3956            filter,
3957            &JoinType::RightAnti,
3958            NullEquality::NullEqualsNothing,
3959        )?;
3960
3961        let columns_header = columns(&join.schema());
3962        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3963
3964        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3965        let batches = common::collect(stream).await?;
3966
3967        // RightAnti join output is expected to preserve right input order
3968        allow_duplicates! {
3969            assert_snapshot!(batches_to_string(&batches), @r"
3970            +----+----+-----+
3971            | a2 | b2 | c2  |
3972            +----+----+-----+
3973            | 12 | 10 | 40  |
3974            | 6  | 6  | 60  |
3975            | 2  | 2  | 80  |
3976            | 10 | 10 | 100 |
3977            | 4  | 4  | 120 |
3978            +----+----+-----+
3979            ");
3980        }
3981
3982        let metrics = join.metrics().unwrap();
3983        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3984
3985        // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
3986        let column_indices = vec![ColumnIndex {
3987            index: 1,
3988            side: JoinSide::Right,
3989        }];
3990        let filter_expression = Arc::new(BinaryExpr::new(
3991            Arc::new(Column::new("x", 0)),
3992            Operator::NotEq,
3993            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3994        )) as Arc<dyn PhysicalExpr>;
3995
3996        let filter = JoinFilter::new(
3997            filter_expression,
3998            column_indices,
3999            Arc::new(intermediate_schema),
4000        );
4001
4002        let join = join_with_filter(
4003            left,
4004            right,
4005            on,
4006            filter,
4007            &JoinType::RightAnti,
4008            NullEquality::NullEqualsNothing,
4009        )?;
4010
4011        let columns_header = columns(&join.schema());
4012        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
4013
4014        let stream = join.execute(0, task_ctx)?;
4015        let batches = common::collect(stream).await?;
4016
4017        // RightAnti join output is expected to preserve right input order
4018        allow_duplicates! {
4019            assert_snapshot!(batches_to_string(&batches), @r"
4020            +----+----+-----+
4021            | a2 | b2 | c2  |
4022            +----+----+-----+
4023            | 8  | 8  | 20  |
4024            | 6  | 6  | 60  |
4025            | 2  | 2  | 80  |
4026            | 4  | 4  | 120 |
4027            +----+----+-----+
4028            ");
4029        }
4030
4031        let metrics = join.metrics().unwrap();
4032        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4033
4034        Ok(())
4035    }
4036
4037    #[apply(hash_join_exec_configs)]
4038    #[tokio::test]
4039    async fn join_right_one(
4040        batch_size: usize,
4041        use_perfect_hash_join_as_possible: bool,
4042    ) -> Result<()> {
4043        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4044        let left = build_table(
4045            ("a1", &vec![1, 2, 3]),
4046            ("b1", &vec![4, 5, 7]),
4047            ("c1", &vec![7, 8, 9]),
4048        );
4049        let right = build_table(
4050            ("a2", &vec![10, 20, 30]),
4051            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
4052            ("c2", &vec![70, 80, 90]),
4053        );
4054        let on = vec![(
4055            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4056            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4057        )];
4058
4059        let (columns, batches, metrics) = join_collect(
4060            left,
4061            right,
4062            on,
4063            &JoinType::Right,
4064            NullEquality::NullEqualsNothing,
4065            task_ctx,
4066        )
4067        .await?;
4068
4069        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
4070
4071        allow_duplicates! {
4072            assert_snapshot!(batches_to_sort_string(&batches), @r"
4073            +----+----+----+----+----+----+
4074            | a1 | b1 | c1 | a2 | b1 | c2 |
4075            +----+----+----+----+----+----+
4076            |    |    |    | 30 | 6  | 90 |
4077            | 1  | 4  | 7  | 10 | 4  | 70 |
4078            | 2  | 5  | 8  | 20 | 5  | 80 |
4079            +----+----+----+----+----+----+
4080            ");
4081        }
4082
4083        assert_join_metrics!(metrics, 3);
4084        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4085
4086        Ok(())
4087    }
4088
4089    #[apply(hash_join_exec_configs)]
4090    #[tokio::test]
4091    async fn partitioned_join_right_one(
4092        batch_size: usize,
4093        use_perfect_hash_join_as_possible: bool,
4094    ) -> Result<()> {
4095        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4096        let left = build_table(
4097            ("a1", &vec![1, 2, 3]),
4098            ("b1", &vec![4, 5, 7]),
4099            ("c1", &vec![7, 8, 9]),
4100        );
4101        let right = build_table(
4102            ("a2", &vec![10, 20, 30]),
4103            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
4104            ("c2", &vec![70, 80, 90]),
4105        );
4106        let on = vec![(
4107            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4108            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4109        )];
4110
4111        let (columns, batches, metrics) = partitioned_join_collect(
4112            left,
4113            right,
4114            on,
4115            &JoinType::Right,
4116            NullEquality::NullEqualsNothing,
4117            task_ctx,
4118        )
4119        .await?;
4120
4121        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
4122
4123        allow_duplicates! {
4124            assert_snapshot!(batches_to_sort_string(&batches), @r"
4125            +----+----+----+----+----+----+
4126            | a1 | b1 | c1 | a2 | b1 | c2 |
4127            +----+----+----+----+----+----+
4128            |    |    |    | 30 | 6  | 90 |
4129            | 1  | 4  | 7  | 10 | 4  | 70 |
4130            | 2  | 5  | 8  | 20 | 5  | 80 |
4131            +----+----+----+----+----+----+
4132            ");
4133        }
4134
4135        assert_join_metrics!(metrics, 3);
4136        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4137
4138        Ok(())
4139    }
4140
4141    #[apply(hash_join_exec_configs)]
4142    #[tokio::test]
4143    async fn join_full_one(
4144        batch_size: usize,
4145        use_perfect_hash_join_as_possible: bool,
4146    ) -> Result<()> {
4147        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4148        let left = build_table(
4149            ("a1", &vec![1, 2, 3]),
4150            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4151            ("c1", &vec![7, 8, 9]),
4152        );
4153        let right = build_table(
4154            ("a2", &vec![10, 20, 30]),
4155            ("b2", &vec![4, 5, 6]),
4156            ("c2", &vec![70, 80, 90]),
4157        );
4158        let on = vec![(
4159            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4160            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4161        )];
4162
4163        let join = join(
4164            left,
4165            right,
4166            on,
4167            &JoinType::Full,
4168            NullEquality::NullEqualsNothing,
4169        )?;
4170
4171        let columns = columns(&join.schema());
4172        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
4173
4174        let stream = join.execute(0, task_ctx)?;
4175        let batches = common::collect(stream).await?;
4176
4177        allow_duplicates! {
4178            assert_snapshot!(batches_to_sort_string(&batches), @r"
4179            +----+----+----+----+----+----+
4180            | a1 | b1 | c1 | a2 | b2 | c2 |
4181            +----+----+----+----+----+----+
4182            |    |    |    | 30 | 6  | 90 |
4183            | 1  | 4  | 7  | 10 | 4  | 70 |
4184            | 2  | 5  | 8  | 20 | 5  | 80 |
4185            | 3  | 7  | 9  |    |    |    |
4186            +----+----+----+----+----+----+
4187            ");
4188        }
4189
4190        let metrics = join.metrics().unwrap();
4191        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4192
4193        Ok(())
4194    }
4195
4196    #[apply(hash_join_exec_configs)]
4197    #[tokio::test]
4198    async fn join_left_mark(
4199        batch_size: usize,
4200        use_perfect_hash_join_as_possible: bool,
4201    ) -> Result<()> {
4202        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4203        let left = build_table(
4204            ("a1", &vec![1, 2, 3]),
4205            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4206            ("c1", &vec![7, 8, 9]),
4207        );
4208        let right = build_table(
4209            ("a2", &vec![10, 20, 30]),
4210            ("b1", &vec![4, 5, 6]),
4211            ("c2", &vec![70, 80, 90]),
4212        );
4213        let on = vec![(
4214            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4215            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4216        )];
4217
4218        let (columns, batches, metrics) = join_collect(
4219            Arc::clone(&left),
4220            Arc::clone(&right),
4221            on.clone(),
4222            &JoinType::LeftMark,
4223            NullEquality::NullEqualsNothing,
4224            task_ctx,
4225        )
4226        .await?;
4227
4228        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4229
4230        allow_duplicates! {
4231            assert_snapshot!(batches_to_sort_string(&batches), @r"
4232            +----+----+----+-------+
4233            | a1 | b1 | c1 | mark  |
4234            +----+----+----+-------+
4235            | 1  | 4  | 7  | true  |
4236            | 2  | 5  | 8  | true  |
4237            | 3  | 7  | 9  | false |
4238            +----+----+----+-------+
4239            ");
4240        }
4241
4242        assert_join_metrics!(metrics, 3);
4243        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4244
4245        Ok(())
4246    }
4247
4248    #[apply(hash_join_exec_configs)]
4249    #[tokio::test]
4250    async fn partitioned_join_left_mark(
4251        batch_size: usize,
4252        use_perfect_hash_join_as_possible: bool,
4253    ) -> Result<()> {
4254        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4255        let left = build_table(
4256            ("a1", &vec![1, 2, 3]),
4257            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4258            ("c1", &vec![7, 8, 9]),
4259        );
4260        let right = build_table(
4261            ("a2", &vec![10, 20, 30, 40]),
4262            ("b1", &vec![4, 4, 5, 6]),
4263            ("c2", &vec![60, 70, 80, 90]),
4264        );
4265        let on = vec![(
4266            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4267            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4268        )];
4269
4270        let (columns, batches, metrics) = partitioned_join_collect(
4271            Arc::clone(&left),
4272            Arc::clone(&right),
4273            on.clone(),
4274            &JoinType::LeftMark,
4275            NullEquality::NullEqualsNothing,
4276            task_ctx,
4277        )
4278        .await?;
4279
4280        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4281
4282        allow_duplicates! {
4283            assert_snapshot!(batches_to_sort_string(&batches), @r"
4284            +----+----+----+-------+
4285            | a1 | b1 | c1 | mark  |
4286            +----+----+----+-------+
4287            | 1  | 4  | 7  | true  |
4288            | 2  | 5  | 8  | true  |
4289            | 3  | 7  | 9  | false |
4290            +----+----+----+-------+
4291            ");
4292        }
4293
4294        assert_join_metrics!(metrics, 3);
4295        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4296
4297        Ok(())
4298    }
4299
4300    #[apply(hash_join_exec_configs)]
4301    #[tokio::test]
4302    async fn join_right_mark(
4303        batch_size: usize,
4304        use_perfect_hash_join_as_possible: bool,
4305    ) -> Result<()> {
4306        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4307        let left = build_table(
4308            ("a1", &vec![1, 2, 3]),
4309            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4310            ("c1", &vec![7, 8, 9]),
4311        );
4312        let right = build_table(
4313            ("a2", &vec![10, 20, 30]),
4314            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
4315            ("c2", &vec![70, 80, 90]),
4316        );
4317        let on = vec![(
4318            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4319            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4320        )];
4321
4322        let (columns, batches, metrics) = join_collect(
4323            Arc::clone(&left),
4324            Arc::clone(&right),
4325            on.clone(),
4326            &JoinType::RightMark,
4327            NullEquality::NullEqualsNothing,
4328            task_ctx,
4329        )
4330        .await?;
4331
4332        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4333
4334        let expected = [
4335            "+----+----+----+-------+",
4336            "| a2 | b1 | c2 | mark  |",
4337            "+----+----+----+-------+",
4338            "| 10 | 4  | 70 | true  |",
4339            "| 20 | 5  | 80 | true  |",
4340            "| 30 | 6  | 90 | false |",
4341            "+----+----+----+-------+",
4342        ];
4343        assert_batches_sorted_eq!(expected, &batches);
4344
4345        assert_join_metrics!(metrics, 3);
4346        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4347
4348        Ok(())
4349    }
4350
4351    #[apply(hash_join_exec_configs)]
4352    #[tokio::test]
4353    async fn partitioned_join_right_mark(
4354        batch_size: usize,
4355        use_perfect_hash_join_as_possible: bool,
4356    ) -> Result<()> {
4357        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4358        let left = build_table(
4359            ("a1", &vec![1, 2, 3]),
4360            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
4361            ("c1", &vec![7, 8, 9]),
4362        );
4363        let right = build_table(
4364            ("a2", &vec![10, 20, 30, 40]),
4365            ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left
4366            ("c2", &vec![60, 70, 80, 90]),
4367        );
4368        let on = vec![(
4369            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4370            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4371        )];
4372
4373        let (columns, batches, metrics) = partitioned_join_collect(
4374            Arc::clone(&left),
4375            Arc::clone(&right),
4376            on.clone(),
4377            &JoinType::RightMark,
4378            NullEquality::NullEqualsNothing,
4379            task_ctx,
4380        )
4381        .await?;
4382
4383        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4384
4385        let expected = [
4386            "+----+----+----+-------+",
4387            "| a2 | b1 | c2 | mark  |",
4388            "+----+----+----+-------+",
4389            "| 10 | 4  | 60 | true  |",
4390            "| 20 | 4  | 70 | true  |",
4391            "| 30 | 5  | 80 | true  |",
4392            "| 40 | 6  | 90 | false |",
4393            "+----+----+----+-------+",
4394        ];
4395        assert_batches_sorted_eq!(expected, &batches);
4396
4397        assert_join_metrics!(metrics, 4);
4398        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4399
4400        Ok(())
4401    }
4402
4403    #[test]
4404    fn join_with_hash_collisions_64() -> Result<()> {
4405        let mut hashmap_left = HashTable::with_capacity(4);
4406        let left = build_table_i32(
4407            ("a", &vec![10, 20]),
4408            ("x", &vec![100, 200]),
4409            ("y", &vec![200, 300]),
4410        );
4411
4412        let random_state = RandomState::with_seed(0);
4413        let hashes_buff = &mut vec![0; left.num_rows()];
4414        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4415
4416        // Maps both values to both indices (1 and 2, representing input 0 and 1)
4417        // 0 -> (0, 1)
4418        // 1 -> (0, 2)
4419        // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
4420        hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
4421        hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
4422
4423        hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
4424        hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
4425
4426        let next = vec![2, 0];
4427
4428        let right = build_table_i32(
4429            ("a", &vec![10, 20]),
4430            ("b", &vec![0, 0]),
4431            ("c", &vec![30, 40]),
4432        );
4433
4434        // Join key column for both join sides
4435        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4436
4437        let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
4438
4439        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4440        let right_keys_values =
4441            key_column.evaluate(&right)?.into_array(right.num_rows())?;
4442        let mut hashes_buffer = vec![0; right.num_rows()];
4443        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4444
4445        let mut probe_indices_buffer = Vec::new();
4446        let mut build_indices_buffer = Vec::new();
4447        let (l, r, _) = lookup_join_hashmap(
4448            &join_hash_map,
4449            &[left_keys_values],
4450            &[right_keys_values],
4451            NullEquality::NullEqualsNothing,
4452            &hashes_buffer,
4453            8192,
4454            (0, None),
4455            &mut probe_indices_buffer,
4456            &mut build_indices_buffer,
4457        )?;
4458
4459        let left_ids: UInt64Array = vec![0, 1].into();
4460
4461        let right_ids: UInt32Array = vec![0, 1].into();
4462
4463        assert_eq!(left_ids, l);
4464
4465        assert_eq!(right_ids, r);
4466
4467        Ok(())
4468    }
4469
4470    #[test]
4471    fn join_with_hash_collisions_u32() -> Result<()> {
4472        let mut hashmap_left = HashTable::with_capacity(4);
4473        let left = build_table_i32(
4474            ("a", &vec![10, 20]),
4475            ("x", &vec![100, 200]),
4476            ("y", &vec![200, 300]),
4477        );
4478
4479        let random_state = RandomState::with_seed(0);
4480        let hashes_buff = &mut vec![0; left.num_rows()];
4481        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4482
4483        hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
4484        hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
4485        hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
4486        hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
4487
4488        let next: Vec<u32> = vec![2, 0];
4489
4490        let right = build_table_i32(
4491            ("a", &vec![10, 20]),
4492            ("b", &vec![0, 0]),
4493            ("c", &vec![30, 40]),
4494        );
4495
4496        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4497
4498        let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
4499
4500        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4501        let right_keys_values =
4502            key_column.evaluate(&right)?.into_array(right.num_rows())?;
4503        let mut hashes_buffer = vec![0; right.num_rows()];
4504        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4505
4506        let mut probe_indices_buffer = Vec::new();
4507        let mut build_indices_buffer = Vec::new();
4508        let (l, r, _) = lookup_join_hashmap(
4509            &join_hash_map,
4510            &[left_keys_values],
4511            &[right_keys_values],
4512            NullEquality::NullEqualsNothing,
4513            &hashes_buffer,
4514            8192,
4515            (0, None),
4516            &mut probe_indices_buffer,
4517            &mut build_indices_buffer,
4518        )?;
4519
4520        // We still expect to match rows 0 and 1 on both sides
4521        let left_ids: UInt64Array = vec![0, 1].into();
4522        let right_ids: UInt32Array = vec![0, 1].into();
4523
4524        assert_eq!(left_ids, l);
4525        assert_eq!(right_ids, r);
4526
4527        Ok(())
4528    }
4529
4530    #[tokio::test]
4531    async fn join_with_duplicated_column_names() -> Result<()> {
4532        let task_ctx = Arc::new(TaskContext::default());
4533        let left = build_table(
4534            ("a", &vec![1, 2, 3]),
4535            ("b", &vec![4, 5, 7]),
4536            ("c", &vec![7, 8, 9]),
4537        );
4538        let right = build_table(
4539            ("a", &vec![10, 20, 30]),
4540            ("b", &vec![1, 2, 7]),
4541            ("c", &vec![70, 80, 90]),
4542        );
4543        let on = vec![(
4544            // join on a=b so there are duplicate column names on unjoined columns
4545            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4546            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4547        )];
4548
4549        let join = join(
4550            left,
4551            right,
4552            on,
4553            &JoinType::Inner,
4554            NullEquality::NullEqualsNothing,
4555        )?;
4556
4557        let columns = columns(&join.schema());
4558        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4559
4560        let stream = join.execute(0, task_ctx)?;
4561        let batches = common::collect(stream).await?;
4562
4563        allow_duplicates! {
4564            assert_snapshot!(batches_to_sort_string(&batches), @r"
4565            +---+---+---+----+---+----+
4566            | a | b | c | a  | b | c  |
4567            +---+---+---+----+---+----+
4568            | 1 | 4 | 7 | 10 | 1 | 70 |
4569            | 2 | 5 | 8 | 20 | 2 | 80 |
4570            +---+---+---+----+---+----+
4571            ");
4572        }
4573
4574        Ok(())
4575    }
4576
4577    fn prepare_join_filter() -> JoinFilter {
4578        let column_indices = vec![
4579            ColumnIndex {
4580                index: 2,
4581                side: JoinSide::Left,
4582            },
4583            ColumnIndex {
4584                index: 2,
4585                side: JoinSide::Right,
4586            },
4587        ];
4588        let intermediate_schema = Schema::new(vec![
4589            Field::new("c", DataType::Int32, true),
4590            Field::new("c", DataType::Int32, true),
4591        ]);
4592        let filter_expression = Arc::new(BinaryExpr::new(
4593            Arc::new(Column::new("c", 0)),
4594            Operator::Gt,
4595            Arc::new(Column::new("c", 1)),
4596        )) as Arc<dyn PhysicalExpr>;
4597
4598        JoinFilter::new(
4599            filter_expression,
4600            column_indices,
4601            Arc::new(intermediate_schema),
4602        )
4603    }
4604
4605    #[apply(hash_join_exec_configs)]
4606    #[tokio::test]
4607    async fn join_inner_with_filter(
4608        batch_size: usize,
4609        use_perfect_hash_join_as_possible: bool,
4610    ) -> Result<()> {
4611        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4612        let left = build_table(
4613            ("a", &vec![0, 1, 2, 2]),
4614            ("b", &vec![4, 5, 7, 8]),
4615            ("c", &vec![7, 8, 9, 1]),
4616        );
4617        let right = build_table(
4618            ("a", &vec![10, 20, 30, 40]),
4619            ("b", &vec![2, 2, 3, 4]),
4620            ("c", &vec![7, 5, 6, 4]),
4621        );
4622        let on = vec![(
4623            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4624            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4625        )];
4626        let filter = prepare_join_filter();
4627
4628        let join = join_with_filter(
4629            left,
4630            right,
4631            on,
4632            filter,
4633            &JoinType::Inner,
4634            NullEquality::NullEqualsNothing,
4635        )?;
4636
4637        let columns = columns(&join.schema());
4638        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4639
4640        let stream = join.execute(0, task_ctx)?;
4641        let batches = common::collect(stream).await?;
4642
4643        allow_duplicates! {
4644            assert_snapshot!(batches_to_sort_string(&batches), @r"
4645            +---+---+---+----+---+---+
4646            | a | b | c | a  | b | c |
4647            +---+---+---+----+---+---+
4648            | 2 | 7 | 9 | 10 | 2 | 7 |
4649            | 2 | 7 | 9 | 20 | 2 | 5 |
4650            +---+---+---+----+---+---+
4651            ");
4652        }
4653
4654        let metrics = join.metrics().unwrap();
4655        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4656
4657        Ok(())
4658    }
4659
4660    #[apply(hash_join_exec_configs)]
4661    #[tokio::test]
4662    async fn join_left_with_filter(
4663        batch_size: usize,
4664        use_perfect_hash_join_as_possible: bool,
4665    ) -> Result<()> {
4666        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4667        let left = build_table(
4668            ("a", &vec![0, 1, 2, 2]),
4669            ("b", &vec![4, 5, 7, 8]),
4670            ("c", &vec![7, 8, 9, 1]),
4671        );
4672        let right = build_table(
4673            ("a", &vec![10, 20, 30, 40]),
4674            ("b", &vec![2, 2, 3, 4]),
4675            ("c", &vec![7, 5, 6, 4]),
4676        );
4677        let on = vec![(
4678            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4679            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4680        )];
4681        let filter = prepare_join_filter();
4682
4683        let join = join_with_filter(
4684            left,
4685            right,
4686            on,
4687            filter,
4688            &JoinType::Left,
4689            NullEquality::NullEqualsNothing,
4690        )?;
4691
4692        let columns = columns(&join.schema());
4693        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4694
4695        let stream = join.execute(0, task_ctx)?;
4696        let batches = common::collect(stream).await?;
4697
4698        allow_duplicates! {
4699            assert_snapshot!(batches_to_sort_string(&batches), @r"
4700            +---+---+---+----+---+---+
4701            | a | b | c | a  | b | c |
4702            +---+---+---+----+---+---+
4703            | 0 | 4 | 7 |    |   |   |
4704            | 1 | 5 | 8 |    |   |   |
4705            | 2 | 7 | 9 | 10 | 2 | 7 |
4706            | 2 | 7 | 9 | 20 | 2 | 5 |
4707            | 2 | 8 | 1 |    |   |   |
4708            +---+---+---+----+---+---+
4709            ");
4710        }
4711
4712        let metrics = join.metrics().unwrap();
4713        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4714
4715        Ok(())
4716    }
4717
4718    #[apply(hash_join_exec_configs)]
4719    #[tokio::test]
4720    async fn join_right_with_filter(
4721        batch_size: usize,
4722        use_perfect_hash_join_as_possible: bool,
4723    ) -> Result<()> {
4724        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4725        let left = build_table(
4726            ("a", &vec![0, 1, 2, 2]),
4727            ("b", &vec![4, 5, 7, 8]),
4728            ("c", &vec![7, 8, 9, 1]),
4729        );
4730        let right = build_table(
4731            ("a", &vec![10, 20, 30, 40]),
4732            ("b", &vec![2, 2, 3, 4]),
4733            ("c", &vec![7, 5, 6, 4]),
4734        );
4735        let on = vec![(
4736            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4737            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4738        )];
4739        let filter = prepare_join_filter();
4740
4741        let join = join_with_filter(
4742            left,
4743            right,
4744            on,
4745            filter,
4746            &JoinType::Right,
4747            NullEquality::NullEqualsNothing,
4748        )?;
4749
4750        let columns = columns(&join.schema());
4751        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4752
4753        let stream = join.execute(0, task_ctx)?;
4754        let batches = common::collect(stream).await?;
4755
4756        allow_duplicates! {
4757            assert_snapshot!(batches_to_sort_string(&batches), @r"
4758            +---+---+---+----+---+---+
4759            | a | b | c | a  | b | c |
4760            +---+---+---+----+---+---+
4761            |   |   |   | 30 | 3 | 6 |
4762            |   |   |   | 40 | 4 | 4 |
4763            | 2 | 7 | 9 | 10 | 2 | 7 |
4764            | 2 | 7 | 9 | 20 | 2 | 5 |
4765            +---+---+---+----+---+---+
4766            ");
4767        }
4768
4769        let metrics = join.metrics().unwrap();
4770        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4771
4772        Ok(())
4773    }
4774
4775    #[apply(hash_join_exec_configs)]
4776    #[tokio::test]
4777    async fn join_full_with_filter(
4778        batch_size: usize,
4779        use_perfect_hash_join_as_possible: bool,
4780    ) -> Result<()> {
4781        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4782        let left = build_table(
4783            ("a", &vec![0, 1, 2, 2]),
4784            ("b", &vec![4, 5, 7, 8]),
4785            ("c", &vec![7, 8, 9, 1]),
4786        );
4787        let right = build_table(
4788            ("a", &vec![10, 20, 30, 40]),
4789            ("b", &vec![2, 2, 3, 4]),
4790            ("c", &vec![7, 5, 6, 4]),
4791        );
4792        let on = vec![(
4793            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4794            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4795        )];
4796        let filter = prepare_join_filter();
4797
4798        let join = join_with_filter(
4799            left,
4800            right,
4801            on,
4802            filter,
4803            &JoinType::Full,
4804            NullEquality::NullEqualsNothing,
4805        )?;
4806
4807        let columns = columns(&join.schema());
4808        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4809
4810        let stream = join.execute(0, task_ctx)?;
4811        let batches = common::collect(stream).await?;
4812
4813        let expected = [
4814            "+---+---+---+----+---+---+",
4815            "| a | b | c | a  | b | c |",
4816            "+---+---+---+----+---+---+",
4817            "|   |   |   | 30 | 3 | 6 |",
4818            "|   |   |   | 40 | 4 | 4 |",
4819            "| 2 | 7 | 9 | 10 | 2 | 7 |",
4820            "| 2 | 7 | 9 | 20 | 2 | 5 |",
4821            "| 0 | 4 | 7 |    |   |   |",
4822            "| 1 | 5 | 8 |    |   |   |",
4823            "| 2 | 8 | 1 |    |   |   |",
4824            "+---+---+---+----+---+---+",
4825        ];
4826        assert_batches_sorted_eq!(expected, &batches);
4827
4828        let metrics = join.metrics().unwrap();
4829        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4830
4831        // THIS MIGRATION HALTED DUE TO ISSUE #15312
4832        //allow_duplicates! {
4833        //    assert_snapshot!(batches_to_sort_string(&batches), @r#"
4834        //    +---+---+---+----+---+---+
4835        //    | a | b | c | a  | b | c |
4836        //    +---+---+---+----+---+---+
4837        //    |   |   |   | 30 | 3 | 6 |
4838        //    |   |   |   | 40 | 4 | 4 |
4839        //    | 2 | 7 | 9 | 10 | 2 | 7 |
4840        //    | 2 | 7 | 9 | 20 | 2 | 5 |
4841        //    | 0 | 4 | 7 |    |   |   |
4842        //    | 1 | 5 | 8 |    |   |   |
4843        //    | 2 | 8 | 1 |    |   |   |
4844        //    +---+---+---+----+---+---+
4845        //        "#)
4846        //}
4847
4848        Ok(())
4849    }
4850
4851    /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft
4852    #[tokio::test]
4853    async fn test_collect_left_multiple_partitions_join() -> Result<()> {
4854        let task_ctx = Arc::new(TaskContext::default());
4855        let left = build_table(
4856            ("a1", &vec![1, 2, 3]),
4857            ("b1", &vec![4, 5, 7]),
4858            ("c1", &vec![7, 8, 9]),
4859        );
4860        let right = build_table(
4861            ("a2", &vec![10, 20, 30]),
4862            ("b2", &vec![4, 5, 6]),
4863            ("c2", &vec![70, 80, 90]),
4864        );
4865        let on = vec![(
4866            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4867            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4868        )];
4869
4870        let expected_inner = vec![
4871            "+----+----+----+----+----+----+",
4872            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4873            "+----+----+----+----+----+----+",
4874            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4875            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4876            "+----+----+----+----+----+----+",
4877        ];
4878        let expected_left = vec![
4879            "+----+----+----+----+----+----+",
4880            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4881            "+----+----+----+----+----+----+",
4882            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4883            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4884            "| 3  | 7  | 9  |    |    |    |",
4885            "+----+----+----+----+----+----+",
4886        ];
4887        let expected_right = vec![
4888            "+----+----+----+----+----+----+",
4889            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4890            "+----+----+----+----+----+----+",
4891            "|    |    |    | 30 | 6  | 90 |",
4892            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4893            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4894            "+----+----+----+----+----+----+",
4895        ];
4896        let expected_full = vec![
4897            "+----+----+----+----+----+----+",
4898            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4899            "+----+----+----+----+----+----+",
4900            "|    |    |    | 30 | 6  | 90 |",
4901            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4902            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4903            "| 3  | 7  | 9  |    |    |    |",
4904            "+----+----+----+----+----+----+",
4905        ];
4906        let expected_left_semi = vec![
4907            "+----+----+----+",
4908            "| a1 | b1 | c1 |",
4909            "+----+----+----+",
4910            "| 1  | 4  | 7  |",
4911            "| 2  | 5  | 8  |",
4912            "+----+----+----+",
4913        ];
4914        let expected_left_anti = vec![
4915            "+----+----+----+",
4916            "| a1 | b1 | c1 |",
4917            "+----+----+----+",
4918            "| 3  | 7  | 9  |",
4919            "+----+----+----+",
4920        ];
4921        let expected_right_semi = vec![
4922            "+----+----+----+",
4923            "| a2 | b2 | c2 |",
4924            "+----+----+----+",
4925            "| 10 | 4  | 70 |",
4926            "| 20 | 5  | 80 |",
4927            "+----+----+----+",
4928        ];
4929        let expected_right_anti = vec![
4930            "+----+----+----+",
4931            "| a2 | b2 | c2 |",
4932            "+----+----+----+",
4933            "| 30 | 6  | 90 |",
4934            "+----+----+----+",
4935        ];
4936        let expected_left_mark = vec![
4937            "+----+----+----+-------+",
4938            "| a1 | b1 | c1 | mark  |",
4939            "+----+----+----+-------+",
4940            "| 1  | 4  | 7  | true  |",
4941            "| 2  | 5  | 8  | true  |",
4942            "| 3  | 7  | 9  | false |",
4943            "+----+----+----+-------+",
4944        ];
4945        let expected_right_mark = vec![
4946            "+----+----+----+-------+",
4947            "| a2 | b2 | c2 | mark  |",
4948            "+----+----+----+-------+",
4949            "| 10 | 4  | 70 | true  |",
4950            "| 20 | 5  | 80 | true  |",
4951            "| 30 | 6  | 90 | false |",
4952            "+----+----+----+-------+",
4953        ];
4954
4955        let test_cases = vec![
4956            (JoinType::Inner, expected_inner),
4957            (JoinType::Left, expected_left),
4958            (JoinType::Right, expected_right),
4959            (JoinType::Full, expected_full),
4960            (JoinType::LeftSemi, expected_left_semi),
4961            (JoinType::LeftAnti, expected_left_anti),
4962            (JoinType::RightSemi, expected_right_semi),
4963            (JoinType::RightAnti, expected_right_anti),
4964            (JoinType::LeftMark, expected_left_mark),
4965            (JoinType::RightMark, expected_right_mark),
4966        ];
4967
4968        for (join_type, expected) in test_cases {
4969            let (_, batches, metrics) = join_collect_with_partition_mode(
4970                Arc::clone(&left),
4971                Arc::clone(&right),
4972                on.clone(),
4973                &join_type,
4974                PartitionMode::CollectLeft,
4975                NullEquality::NullEqualsNothing,
4976                Arc::clone(&task_ctx),
4977            )
4978            .await?;
4979            assert_batches_sorted_eq!(expected, &batches);
4980            assert_join_metrics!(metrics, expected.len() - 4);
4981        }
4982
4983        Ok(())
4984    }
4985
4986    #[tokio::test]
4987    async fn join_date32() -> Result<()> {
4988        let schema = Arc::new(Schema::new(vec![
4989            Field::new("date", DataType::Date32, false),
4990            Field::new("n", DataType::Int32, false),
4991        ]));
4992
4993        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4994        let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4995        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4996        let left =
4997            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4998                .unwrap();
4999        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
5000        let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
5001        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
5002        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
5003        let on = vec![(
5004            Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
5005            Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
5006        )];
5007
5008        let join = join(
5009            left,
5010            right,
5011            on,
5012            &JoinType::Inner,
5013            NullEquality::NullEqualsNothing,
5014        )?;
5015
5016        let task_ctx = Arc::new(TaskContext::default());
5017        let stream = join.execute(0, task_ctx)?;
5018        let batches = common::collect(stream).await?;
5019
5020        allow_duplicates! {
5021            assert_snapshot!(batches_to_sort_string(&batches), @r"
5022            +------------+---+------------+---+
5023            | date       | n | date       | n |
5024            +------------+---+------------+---+
5025            | 2022-04-26 | 2 | 2022-04-26 | 4 |
5026            | 2022-04-26 | 2 | 2022-04-26 | 5 |
5027            | 2022-04-27 | 3 | 2022-04-27 | 6 |
5028            +------------+---+------------+---+
5029            ");
5030        }
5031
5032        Ok(())
5033    }
5034
5035    #[tokio::test]
5036    async fn join_with_error_right() {
5037        let left = build_table(
5038            ("a1", &vec![1, 2, 3]),
5039            ("b1", &vec![4, 5, 7]),
5040            ("c1", &vec![7, 8, 9]),
5041        );
5042
5043        // right input stream returns one good batch and then one error.
5044        // The error should be returned.
5045        let err = exec_err!("bad data error");
5046        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
5047
5048        let on = vec![(
5049            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
5050            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
5051        )];
5052        let schema = right.schema();
5053        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
5054        let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
5055
5056        let join_types = vec![
5057            JoinType::Inner,
5058            JoinType::Left,
5059            JoinType::Right,
5060            JoinType::Full,
5061            JoinType::LeftSemi,
5062            JoinType::LeftAnti,
5063            JoinType::RightSemi,
5064            JoinType::RightAnti,
5065        ];
5066
5067        for join_type in join_types {
5068            let join = join(
5069                Arc::clone(&left),
5070                Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
5071                on.clone(),
5072                &join_type,
5073                NullEquality::NullEqualsNothing,
5074            )
5075            .unwrap();
5076            let task_ctx = Arc::new(TaskContext::default());
5077
5078            let stream = join.execute(0, task_ctx).unwrap();
5079
5080            // Expect that an error is returned
5081            let result_string = common::collect(stream).await.unwrap_err().to_string();
5082            assert!(
5083                result_string.contains("bad data error"),
5084                "actual: {result_string}"
5085            );
5086        }
5087    }
5088
5089    #[tokio::test]
5090    async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
5091        assert_empty_build_probe_behavior(
5092            &[
5093                JoinType::Inner,
5094                JoinType::Left,
5095                JoinType::LeftSemi,
5096                JoinType::LeftAnti,
5097                JoinType::LeftMark,
5098                JoinType::RightSemi,
5099            ],
5100            false,
5101            false,
5102        )
5103        .await;
5104    }
5105
5106    #[tokio::test]
5107    async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
5108        assert_empty_build_probe_behavior(
5109            &[
5110                JoinType::Inner,
5111                JoinType::Left,
5112                JoinType::LeftSemi,
5113                JoinType::LeftAnti,
5114                JoinType::LeftMark,
5115                JoinType::RightSemi,
5116            ],
5117            false,
5118            true,
5119        )
5120        .await;
5121    }
5122
5123    #[tokio::test]
5124    async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5125        assert_empty_build_probe_behavior(
5126            &[
5127                JoinType::Right,
5128                JoinType::Full,
5129                JoinType::RightAnti,
5130                JoinType::RightMark,
5131            ],
5132            true,
5133            false,
5134        )
5135        .await;
5136    }
5137
5138    #[tokio::test]
5139    async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
5140        assert_empty_build_probe_behavior(
5141            &[
5142                JoinType::Right,
5143                JoinType::Full,
5144                JoinType::RightAnti,
5145                JoinType::RightMark,
5146            ],
5147            true,
5148            true,
5149        )
5150        .await;
5151    }
5152
5153    #[tokio::test]
5154    async fn join_split_batch() {
5155        let left = build_table(
5156            ("a1", &vec![1, 2, 3, 4]),
5157            ("b1", &vec![1, 1, 1, 1]),
5158            ("c1", &vec![0, 0, 0, 0]),
5159        );
5160        let right = build_table(
5161            ("a2", &vec![10, 20, 30, 40, 50]),
5162            ("b2", &vec![1, 1, 1, 1, 1]),
5163            ("c2", &vec![0, 0, 0, 0, 0]),
5164        );
5165        let on = vec![(
5166            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
5167            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5168        )];
5169
5170        let join_types = vec![
5171            JoinType::Inner,
5172            JoinType::Left,
5173            JoinType::Right,
5174            JoinType::Full,
5175            JoinType::RightSemi,
5176            JoinType::RightAnti,
5177            JoinType::LeftSemi,
5178            JoinType::LeftAnti,
5179        ];
5180        let expected_resultset_records = 20;
5181        let common_result = [
5182            "+----+----+----+----+----+----+",
5183            "| a1 | b1 | c1 | a2 | b2 | c2 |",
5184            "+----+----+----+----+----+----+",
5185            "| 1  | 1  | 0  | 10 | 1  | 0  |",
5186            "| 2  | 1  | 0  | 10 | 1  | 0  |",
5187            "| 3  | 1  | 0  | 10 | 1  | 0  |",
5188            "| 4  | 1  | 0  | 10 | 1  | 0  |",
5189            "| 1  | 1  | 0  | 20 | 1  | 0  |",
5190            "| 2  | 1  | 0  | 20 | 1  | 0  |",
5191            "| 3  | 1  | 0  | 20 | 1  | 0  |",
5192            "| 4  | 1  | 0  | 20 | 1  | 0  |",
5193            "| 1  | 1  | 0  | 30 | 1  | 0  |",
5194            "| 2  | 1  | 0  | 30 | 1  | 0  |",
5195            "| 3  | 1  | 0  | 30 | 1  | 0  |",
5196            "| 4  | 1  | 0  | 30 | 1  | 0  |",
5197            "| 1  | 1  | 0  | 40 | 1  | 0  |",
5198            "| 2  | 1  | 0  | 40 | 1  | 0  |",
5199            "| 3  | 1  | 0  | 40 | 1  | 0  |",
5200            "| 4  | 1  | 0  | 40 | 1  | 0  |",
5201            "| 1  | 1  | 0  | 50 | 1  | 0  |",
5202            "| 2  | 1  | 0  | 50 | 1  | 0  |",
5203            "| 3  | 1  | 0  | 50 | 1  | 0  |",
5204            "| 4  | 1  | 0  | 50 | 1  | 0  |",
5205            "+----+----+----+----+----+----+",
5206        ];
5207        let left_batch = [
5208            "+----+----+----+",
5209            "| a1 | b1 | c1 |",
5210            "+----+----+----+",
5211            "| 1  | 1  | 0  |",
5212            "| 2  | 1  | 0  |",
5213            "| 3  | 1  | 0  |",
5214            "| 4  | 1  | 0  |",
5215            "+----+----+----+",
5216        ];
5217        let right_batch = [
5218            "+----+----+----+",
5219            "| a2 | b2 | c2 |",
5220            "+----+----+----+",
5221            "| 10 | 1  | 0  |",
5222            "| 20 | 1  | 0  |",
5223            "| 30 | 1  | 0  |",
5224            "| 40 | 1  | 0  |",
5225            "| 50 | 1  | 0  |",
5226            "+----+----+----+",
5227        ];
5228        let right_empty = [
5229            "+----+----+----+",
5230            "| a2 | b2 | c2 |",
5231            "+----+----+----+",
5232            "+----+----+----+",
5233        ];
5234        let left_empty = [
5235            "+----+----+----+",
5236            "| a1 | b1 | c1 |",
5237            "+----+----+----+",
5238            "+----+----+----+",
5239        ];
5240
5241        // validation of partial join results output for different batch_size setting
5242        for join_type in join_types {
5243            for batch_size in (1..21).rev() {
5244                let task_ctx = prepare_task_ctx(batch_size, true);
5245
5246                let join = join(
5247                    Arc::clone(&left),
5248                    Arc::clone(&right),
5249                    on.clone(),
5250                    &join_type,
5251                    NullEquality::NullEqualsNothing,
5252                )
5253                .unwrap();
5254
5255                let stream = join.execute(0, task_ctx).unwrap();
5256                let batches = common::collect(stream).await.unwrap();
5257
5258                // For inner/right join expected batch count equals dev_ceil result,
5259                // as there is no need to append non-joined build side data.
5260                // For other join types it'll be div_ceil + 1 -- for additional batch
5261                // containing not visited build side rows (empty in this test case).
5262                let expected_batch_count = match join_type {
5263                    JoinType::Inner
5264                    | JoinType::Right
5265                    | JoinType::RightSemi
5266                    | JoinType::RightAnti => {
5267                        div_ceil(expected_resultset_records, batch_size)
5268                    }
5269                    _ => div_ceil(expected_resultset_records, batch_size) + 1,
5270                };
5271                // With batch coalescing, we may have fewer batches than expected
5272                assert!(
5273                    batches.len() <= expected_batch_count,
5274                    "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
5275                    batches.len()
5276                );
5277
5278                let expected = match join_type {
5279                    JoinType::RightSemi => right_batch.to_vec(),
5280                    JoinType::RightAnti => right_empty.to_vec(),
5281                    JoinType::LeftSemi => left_batch.to_vec(),
5282                    JoinType::LeftAnti => left_empty.to_vec(),
5283                    _ => common_result.to_vec(),
5284                };
5285                // For anti joins with empty results, we may get zero batches
5286                // (with coalescing) instead of one empty batch with schema
5287                if batches.is_empty() {
5288                    // Verify this is an expected empty result case
5289                    assert!(
5290                        matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
5291                        "Unexpected empty result for {join_type} join"
5292                    );
5293                } else {
5294                    assert_batches_eq!(expected, &batches);
5295                }
5296            }
5297        }
5298    }
5299
5300    #[tokio::test]
5301    async fn single_partition_join_overallocation() -> Result<()> {
5302        let left = build_table(
5303            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5304            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5305            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5306        );
5307        let right = build_table(
5308            ("a2", &vec![10, 11]),
5309            ("b2", &vec![12, 13]),
5310            ("c2", &vec![14, 15]),
5311        );
5312        let on = vec![(
5313            Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
5314            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5315        )];
5316
5317        let join_types = vec![
5318            JoinType::Inner,
5319            JoinType::Left,
5320            JoinType::Right,
5321            JoinType::Full,
5322            JoinType::LeftSemi,
5323            JoinType::LeftAnti,
5324            JoinType::RightSemi,
5325            JoinType::RightAnti,
5326            JoinType::LeftMark,
5327            JoinType::RightMark,
5328        ];
5329
5330        for join_type in join_types {
5331            let runtime = RuntimeEnvBuilder::new()
5332                .with_memory_limit(100, 1.0)
5333                .build_arc()?;
5334            let task_ctx = TaskContext::default().with_runtime(runtime);
5335            let task_ctx = Arc::new(task_ctx);
5336
5337            let join = join(
5338                Arc::clone(&left),
5339                Arc::clone(&right),
5340                on.clone(),
5341                &join_type,
5342                NullEquality::NullEqualsNothing,
5343            )?;
5344
5345            let stream = join.execute(0, task_ctx)?;
5346            let err = common::collect(stream).await.unwrap_err();
5347
5348            // Asserting that operator-level reservation attempting to overallocate
5349            assert_contains!(
5350                err.to_string(),
5351                "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n  HashJoinInput"
5352            );
5353
5354            assert_contains!(
5355                err.to_string(),
5356                "Failed to allocate additional 120.0 B for HashJoinInput"
5357            );
5358        }
5359
5360        Ok(())
5361    }
5362
5363    #[tokio::test]
5364    async fn partitioned_join_overallocation() -> Result<()> {
5365        // Prepare partitioned inputs for HashJoinExec
5366        // No need to adjust partitioning, as execution should fail with `Resources exhausted` error
5367        let left_batch = build_table_i32(
5368            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5369            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5370            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5371        );
5372        let left = TestMemoryExec::try_new_exec(
5373            &[vec![left_batch.clone()], vec![left_batch.clone()]],
5374            left_batch.schema(),
5375            None,
5376        )
5377        .unwrap();
5378        let right_batch = build_table_i32(
5379            ("a2", &vec![10, 11]),
5380            ("b2", &vec![12, 13]),
5381            ("c2", &vec![14, 15]),
5382        );
5383        let right = TestMemoryExec::try_new_exec(
5384            &[vec![right_batch.clone()], vec![right_batch.clone()]],
5385            right_batch.schema(),
5386            None,
5387        )
5388        .unwrap();
5389        let on = vec![(
5390            Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
5391            Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
5392        )];
5393
5394        let join_types = vec![
5395            JoinType::Inner,
5396            JoinType::Left,
5397            JoinType::Right,
5398            JoinType::Full,
5399            JoinType::LeftSemi,
5400            JoinType::LeftAnti,
5401            JoinType::RightSemi,
5402            JoinType::RightAnti,
5403        ];
5404
5405        for join_type in join_types {
5406            let runtime = RuntimeEnvBuilder::new()
5407                .with_memory_limit(100, 1.0)
5408                .build_arc()?;
5409            let session_config = SessionConfig::default().with_batch_size(50);
5410            let task_ctx = TaskContext::default()
5411                .with_session_config(session_config)
5412                .with_runtime(runtime);
5413            let task_ctx = Arc::new(task_ctx);
5414
5415            let join = HashJoinExec::try_new(
5416                Arc::clone(&left) as Arc<dyn ExecutionPlan>,
5417                Arc::clone(&right) as Arc<dyn ExecutionPlan>,
5418                on.clone(),
5419                None,
5420                &join_type,
5421                None,
5422                PartitionMode::Partitioned,
5423                NullEquality::NullEqualsNothing,
5424                false,
5425            )?;
5426
5427            let stream = join.execute(1, task_ctx)?;
5428            let err = common::collect(stream).await.unwrap_err();
5429
5430            // Asserting that stream-level reservation attempting to overallocate
5431            assert_contains!(
5432                err.to_string(),
5433                "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n  HashJoinInput[1]"
5434            );
5435
5436            assert_contains!(
5437                err.to_string(),
5438                "Failed to allocate additional 120.0 B for HashJoinInput[1]"
5439            );
5440        }
5441
5442        Ok(())
5443    }
5444
5445    fn build_table_struct(
5446        struct_name: &str,
5447        field_name_and_values: (&str, &Vec<Option<i32>>),
5448        nulls: Option<NullBuffer>,
5449    ) -> Arc<dyn ExecutionPlan> {
5450        let (field_name, values) = field_name_and_values;
5451        let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
5452        let schema = Schema::new(vec![Field::new(
5453            struct_name,
5454            DataType::Struct(inner_fields.clone().into()),
5455            nulls.is_some(),
5456        )]);
5457
5458        let batch = RecordBatch::try_new(
5459            Arc::new(schema),
5460            vec![Arc::new(StructArray::new(
5461                inner_fields.into(),
5462                vec![Arc::new(Int32Array::from(values.clone()))],
5463                nulls,
5464            ))],
5465        )
5466        .unwrap();
5467        let schema_ref = batch.schema();
5468        TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
5469    }
5470
5471    #[tokio::test]
5472    async fn join_on_struct() -> Result<()> {
5473        let task_ctx = Arc::new(TaskContext::default());
5474        let left =
5475            build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
5476        let right =
5477            build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
5478        let on = vec![(
5479            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5480            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5481        )];
5482
5483        let (columns, batches, metrics) = join_collect(
5484            left,
5485            right,
5486            on,
5487            &JoinType::Inner,
5488            NullEquality::NullEqualsNothing,
5489            task_ctx,
5490        )
5491        .await?;
5492
5493        assert_eq!(columns, vec!["n1", "n2"]);
5494
5495        allow_duplicates! {
5496            assert_snapshot!(batches_to_string(&batches), @r"
5497            +--------+--------+
5498            | n1     | n2     |
5499            +--------+--------+
5500            | {a: }  | {a: }  |
5501            | {a: 1} | {a: 1} |
5502            | {a: 2} | {a: 2} |
5503            +--------+--------+
5504            ");
5505        }
5506
5507        assert_join_metrics!(metrics, 3);
5508
5509        Ok(())
5510    }
5511
5512    #[tokio::test]
5513    async fn join_on_struct_with_nulls() -> Result<()> {
5514        let task_ctx = Arc::new(TaskContext::default());
5515        let left =
5516            build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5517        let right =
5518            build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5519        let on = vec![(
5520            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5521            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5522        )];
5523
5524        let (_, batches_null_eq, metrics) = join_collect(
5525            Arc::clone(&left),
5526            Arc::clone(&right),
5527            on.clone(),
5528            &JoinType::Inner,
5529            NullEquality::NullEqualsNull,
5530            Arc::clone(&task_ctx),
5531        )
5532        .await?;
5533
5534        allow_duplicates! {
5535            assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
5536            +----+----+
5537            | n1 | n2 |
5538            +----+----+
5539            |    |    |
5540            +----+----+
5541            ");
5542        }
5543
5544        assert_join_metrics!(metrics, 1);
5545
5546        let (_, batches_null_neq, metrics) = join_collect(
5547            left,
5548            right,
5549            on,
5550            &JoinType::Inner,
5551            NullEquality::NullEqualsNothing,
5552            task_ctx,
5553        )
5554        .await?;
5555
5556        assert_join_metrics!(metrics, 0);
5557
5558        // With batch coalescing, empty results may not emit any batches
5559        // Check that either we have no batches, or an empty batch with proper schema
5560        if batches_null_neq.is_empty() {
5561            // This is fine - no output rows
5562        } else {
5563            let expected_null_neq =
5564                ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
5565            assert_batches_eq!(expected_null_neq, &batches_null_neq);
5566        }
5567
5568        Ok(())
5569    }
5570
5571    /// Returns the column names on the schema
5572    fn columns(schema: &Schema) -> Vec<String> {
5573        schema.fields().iter().map(|f| f.name().clone()).collect()
5574    }
5575
5576    /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
5577    #[tokio::test]
5578    async fn test_hash_join_marks_filter_complete() -> Result<()> {
5579        let task_ctx = Arc::new(TaskContext::default());
5580        let left = build_table(
5581            ("a1", &vec![1, 2, 3]),
5582            ("b1", &vec![4, 5, 6]),
5583            ("c1", &vec![7, 8, 9]),
5584        );
5585        let right = build_table(
5586            ("a2", &vec![10, 20, 30]),
5587            ("b1", &vec![4, 5, 6]),
5588            ("c2", &vec![70, 80, 90]),
5589        );
5590
5591        let on = vec![(
5592            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5593            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5594        )];
5595
5596        let (join, dynamic_filter) =
5597            hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5598
5599        // Execute the join
5600        let stream = join.execute(0, task_ctx)?;
5601        let _batches = common::collect(stream).await?;
5602
5603        // After the join completes, the dynamic filter should be marked as complete
5604        // wait_complete() should return immediately
5605        dynamic_filter.wait_complete().await;
5606
5607        Ok(())
5608    }
5609
5610    /// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
5611    #[tokio::test]
5612    async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
5613        let task_ctx = Arc::new(TaskContext::default());
5614        // Empty left side (build side)
5615        let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
5616        let right = build_table(
5617            ("a2", &vec![10, 20, 30]),
5618            ("b1", &vec![4, 5, 6]),
5619            ("c2", &vec![70, 80, 90]),
5620        );
5621
5622        let on = vec![(
5623            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5624            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5625        )];
5626
5627        let (join, dynamic_filter) =
5628            hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5629
5630        // Execute the join
5631        let stream = join.execute(0, task_ctx)?;
5632        let _batches = common::collect(stream).await?;
5633
5634        // Even with empty build side, the dynamic filter should be marked as complete
5635        // wait_complete() should return immediately
5636        dynamic_filter.wait_complete().await;
5637
5638        Ok(())
5639    }
5640
5641    #[tokio::test]
5642    async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
5643    -> Result<()> {
5644        let mut session_config = SessionConfig::default();
5645        session_config
5646            .options_mut()
5647            .optimizer
5648            .enable_dynamic_filter_pushdown = true;
5649        let task_ctx =
5650            Arc::new(TaskContext::default().with_session_config(session_config));
5651
5652        let child_left_schema = Arc::new(Schema::new(vec![
5653            Field::new("child_left_payload", DataType::Int32, false),
5654            Field::new("child_key", DataType::Int32, false),
5655            Field::new("child_left_extra", DataType::Int32, false),
5656        ]));
5657        let child_right_schema = Arc::new(Schema::new(vec![
5658            Field::new("child_right_payload", DataType::Int32, false),
5659            Field::new("child_right_key", DataType::Int32, false),
5660            Field::new("child_right_extra", DataType::Int32, false),
5661        ]));
5662        let parent_left_schema = Arc::new(Schema::new(vec![
5663            Field::new("parent_payload", DataType::Int32, false),
5664            Field::new("parent_key", DataType::Int32, false),
5665            Field::new("parent_extra", DataType::Int32, false),
5666        ]));
5667
5668        let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5669            &[
5670                vec![build_table_i32(
5671                    ("child_left_payload", &vec![10]),
5672                    ("child_key", &vec![0]),
5673                    ("child_left_extra", &vec![100]),
5674                )],
5675                vec![build_table_i32(
5676                    ("child_left_payload", &vec![11]),
5677                    ("child_key", &vec![1]),
5678                    ("child_left_extra", &vec![101]),
5679                )],
5680                vec![build_table_i32(
5681                    ("child_left_payload", &vec![12]),
5682                    ("child_key", &vec![2]),
5683                    ("child_left_extra", &vec![102]),
5684                )],
5685                vec![build_table_i32(
5686                    ("child_left_payload", &vec![13]),
5687                    ("child_key", &vec![3]),
5688                    ("child_left_extra", &vec![103]),
5689                )],
5690            ],
5691            Arc::clone(&child_left_schema),
5692            None,
5693        )?;
5694        let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5695            &[
5696                vec![build_table_i32(
5697                    ("child_right_payload", &vec![20]),
5698                    ("child_right_key", &vec![0]),
5699                    ("child_right_extra", &vec![200]),
5700                )],
5701                vec![build_table_i32(
5702                    ("child_right_payload", &vec![21]),
5703                    ("child_right_key", &vec![1]),
5704                    ("child_right_extra", &vec![201]),
5705                )],
5706                vec![build_table_i32(
5707                    ("child_right_payload", &vec![22]),
5708                    ("child_right_key", &vec![2]),
5709                    ("child_right_extra", &vec![202]),
5710                )],
5711                vec![build_table_i32(
5712                    ("child_right_payload", &vec![23]),
5713                    ("child_right_key", &vec![3]),
5714                    ("child_right_extra", &vec![203]),
5715                )],
5716            ],
5717            Arc::clone(&child_right_schema),
5718            None,
5719        )?;
5720        let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5721            &[
5722                vec![build_table_i32(
5723                    ("parent_payload", &vec![30]),
5724                    ("parent_key", &vec![0]),
5725                    ("parent_extra", &vec![300]),
5726                )],
5727                vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5728                vec![build_table_i32(
5729                    ("parent_payload", &vec![32]),
5730                    ("parent_key", &vec![2]),
5731                    ("parent_extra", &vec![302]),
5732                )],
5733                vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5734            ],
5735            Arc::clone(&parent_left_schema),
5736            None,
5737        )?;
5738
5739        let child_on = vec![(
5740            Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
5741            Arc::new(Column::new_with_schema(
5742                "child_right_key",
5743                &child_right_schema,
5744            )?) as _,
5745        )];
5746        let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
5747            child_left,
5748            child_right,
5749            child_on,
5750            JoinType::Inner,
5751            PartitionMode::Partitioned,
5752        )?;
5753        let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);
5754
5755        let parent_on = vec![(
5756            Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
5757            Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
5758        )];
5759        let parent_join = HashJoinExec::try_new(
5760            parent_left,
5761            child_join,
5762            parent_on,
5763            None,
5764            &JoinType::RightSemi,
5765            None,
5766            PartitionMode::Partitioned,
5767            NullEquality::NullEqualsNothing,
5768            false,
5769        )?;
5770
5771        let batches = tokio::time::timeout(
5772            std::time::Duration::from_secs(5),
5773            crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
5774        )
5775        .await
5776        .expect("partitioned right-semi join should not hang")?;
5777
5778        assert_batches_sorted_eq!(
5779            [
5780                "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5781                "| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
5782                "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5783                "| 10                 | 0         | 100              | 20                  | 0               | 200               |",
5784                "| 12                 | 2         | 102              | 22                  | 2               | 202               |",
5785                "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5786            ],
5787            &batches
5788        );
5789
5790        Ok(())
5791    }
5792
5793    #[tokio::test]
5794    async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
5795    -> Result<()> {
5796        let task_ctx = Arc::new(TaskContext::default());
5797        let (left, right, on) = empty_build_with_probe_error_inputs();
5798
5799        // Keep an extra consumer reference so execute() enables dynamic filter pushdown
5800        // and enters the WaitPartitionBoundsReport path before deciding whether to poll
5801        // the probe side.
5802        let (join, dynamic_filter) =
5803            hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5804
5805        let stream = join.execute(0, task_ctx)?;
5806        let batches = common::collect(stream).await?;
5807        assert!(batches.is_empty());
5808
5809        dynamic_filter.wait_complete().await;
5810
5811        Ok(())
5812    }
5813
5814    #[tokio::test]
5815    async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> {
5816        let task_ctx = prepare_task_ctx(8192, true);
5817        let (left_schema, right_schema, on) = build_schema_and_on()?;
5818
5819        let left_batch = RecordBatch::try_new(
5820            Arc::clone(&left_schema),
5821            vec![
5822                Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
5823                Arc::new(Int32Array::from(vec![-1, 0, 1])) as ArrayRef,
5824            ],
5825        )?;
5826        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5827
5828        let right_batch = RecordBatch::try_new(
5829            Arc::clone(&right_schema),
5830            vec![
5831                Arc::new(Int32Array::from(vec![10, 20, 30, 40])) as ArrayRef,
5832                Arc::new(Int32Array::from(vec![1, -1, 0, 2])) as ArrayRef,
5833            ],
5834        )?;
5835        let right =
5836            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5837
5838        let (columns, batches, metrics) = join_collect(
5839            left,
5840            right,
5841            on,
5842            &JoinType::Inner,
5843            NullEquality::NullEqualsNothing,
5844            task_ctx,
5845        )
5846        .await?;
5847
5848        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5849
5850        assert_batches_sorted_eq!(
5851            [
5852                "+----+----+----+----+",
5853                "| a1 | b1 | a2 | b1 |",
5854                "+----+----+----+----+",
5855                "| 1  | -1 | 20 | -1 |",
5856                "| 2  | 0  | 30 | 0  |",
5857                "| 3  | 1  | 10 | 1  |",
5858                "+----+----+----+----+",
5859            ],
5860            &batches
5861        );
5862
5863        assert_phj_used(&metrics, true);
5864
5865        Ok(())
5866    }
5867
5868    #[tokio::test]
5869    async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
5870        let task_ctx = prepare_task_ctx(8192, true);
5871        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5872        let batch = RecordBatch::try_new(
5873            Arc::clone(&schema),
5874            vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
5875        )?;
5876        let left = TestMemoryExec::try_new_exec(
5877            &[vec![batch.clone()]],
5878            Arc::clone(&schema),
5879            None,
5880        )?;
5881        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
5882        let on: JoinOn = vec![(
5883            Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
5884            Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
5885        )];
5886        let (_columns, batches, _metrics) = join_collect(
5887            left,
5888            right,
5889            on,
5890            &JoinType::Inner,
5891            NullEquality::NullEqualsNothing,
5892            task_ctx,
5893        )
5894        .await?;
5895        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
5896        assert_eq!(total_rows, 2);
5897        Ok(())
5898    }
5899
5900    #[apply(hash_join_exec_configs)]
5901    #[tokio::test]
5902    async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
5903        batch_size: usize,
5904        use_perfect_hash_join_as_possible: bool,
5905    ) -> Result<()> {
5906        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5907        let (left_schema, right_schema, on) = build_schema_and_on()?;
5908
5909        let left_batch = RecordBatch::try_new(
5910            Arc::clone(&left_schema),
5911            vec![
5912                Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
5913                Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef,
5914            ],
5915        )?;
5916        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5917
5918        let right_batch = RecordBatch::try_new(
5919            Arc::clone(&right_schema),
5920            vec![
5921                Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef,
5922                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5923            ],
5924        )?;
5925        let right =
5926            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5927
5928        let (columns, batches, metrics) = join_collect(
5929            left,
5930            right,
5931            on,
5932            &JoinType::Inner,
5933            NullEquality::NullEqualsNull,
5934            task_ctx,
5935        )
5936        .await?;
5937
5938        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5939        assert_batches_sorted_eq!(
5940            [
5941                "+----+----+----+----+",
5942                "| a1 | b1 | a2 | b1 |",
5943                "+----+----+----+----+",
5944                "| 1  | 10 | 3  | 10 |",
5945                "+----+----+----+----+",
5946            ],
5947            &batches
5948        );
5949
5950        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5951
5952        Ok(())
5953    }
5954
5955    #[apply(hash_join_exec_configs)]
5956    #[tokio::test]
5957    async fn test_phj_null_equals_nothing_build_probe_all_have_nulls(
5958        batch_size: usize,
5959        use_perfect_hash_join_as_possible: bool,
5960    ) -> Result<()> {
5961        let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5962        let (left_schema, right_schema, on) = build_schema_and_on()?;
5963
5964        let left_batch = RecordBatch::try_new(
5965            Arc::clone(&left_schema),
5966            vec![
5967                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef,
5968                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5969            ],
5970        )?;
5971        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5972
5973        let right_batch = RecordBatch::try_new(
5974            Arc::clone(&right_schema),
5975            vec![
5976                Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5977                Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5978            ],
5979        )?;
5980        let right =
5981            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5982
5983        let (columns, batches, metrics) = join_collect(
5984            left,
5985            right,
5986            on,
5987            &JoinType::Inner,
5988            NullEquality::NullEqualsNothing,
5989            task_ctx,
5990        )
5991        .await?;
5992
5993        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5994        assert_batches_sorted_eq!(
5995            [
5996                "+----+----+----+----+",
5997                "| a1 | b1 | a2 | b1 |",
5998                "+----+----+----+----+",
5999                "| 1  | 10 | 3  | 10 |",
6000                "+----+----+----+----+",
6001            ],
6002            &batches
6003        );
6004
6005        assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
6006
6007        Ok(())
6008    }
6009
6010    #[tokio::test]
6011    async fn test_phj_null_equals_null_build_have_nulls() -> Result<()> {
6012        let task_ctx = prepare_task_ctx(8192, true);
6013        let (left_schema, right_schema, on) = build_schema_and_on()?;
6014
6015        let left_batch = RecordBatch::try_new(
6016            Arc::clone(&left_schema),
6017            vec![
6018                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef,
6019                Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef,
6020            ],
6021        )?;
6022        let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
6023
6024        let right_batch = RecordBatch::try_new(
6025            Arc::clone(&right_schema),
6026            vec![
6027                Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
6028                Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef,
6029            ],
6030        )?;
6031        let right =
6032            TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
6033
6034        let (columns, batches, metrics) = join_collect(
6035            left,
6036            right,
6037            on,
6038            &JoinType::Inner,
6039            NullEquality::NullEqualsNull,
6040            task_ctx,
6041        )
6042        .await?;
6043
6044        assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
6045        assert_batches_sorted_eq!(
6046            [
6047                "+----+----+----+----+",
6048                "| a1 | b1 | a2 | b1 |",
6049                "+----+----+----+----+",
6050                "| 1  | 10 | 3  | 10 |",
6051                "+----+----+----+----+",
6052            ],
6053            &batches
6054        );
6055
6056        assert_phj_used(&metrics, false);
6057
6058        Ok(())
6059    }
6060
6061    /// Test null-aware anti join when probe side (right) contains NULL
6062    /// Expected: no rows should be output (NULL in subquery means all results are unknown)
6063    #[apply(hash_join_exec_configs)]
6064    #[tokio::test]
6065    async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> {
6066        let task_ctx = prepare_task_ctx(batch_size, false);
6067
6068        // Build left table (rows to potentially output)
6069        let left = build_table_two_cols(
6070            ("c1", &vec![Some(1), Some(2), Some(3), Some(4)]),
6071            ("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]),
6072        );
6073
6074        // Build right table (subquery with NULL)
6075        let right = build_table_two_cols(
6076            ("c2", &vec![Some(1), Some(2), Some(3), None]),
6077            ("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]),
6078        );
6079
6080        let on = vec![(
6081            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6082            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6083        )];
6084
6085        // Create null-aware anti join
6086        let join = HashJoinExec::try_new(
6087            left,
6088            right,
6089            on,
6090            None,
6091            &JoinType::LeftAnti,
6092            None,
6093            PartitionMode::CollectLeft,
6094            NullEquality::NullEqualsNothing,
6095            true, // null_aware = true
6096        )?;
6097
6098        let stream = join.execute(0, task_ctx)?;
6099        let batches = common::collect(stream).await?;
6100
6101        // Expected: empty result (probe side has NULL, so no rows should be output)
6102        allow_duplicates! {
6103            assert_snapshot!(batches_to_sort_string(&batches), @r"
6104            ++
6105            ++
6106            ");
6107        }
6108        Ok(())
6109    }
6110
6111    /// Test null-aware anti join when build side (left) contains NULL keys
6112    /// Expected: rows with NULL keys should not be output
6113    #[apply(hash_join_exec_configs)]
6114    #[tokio::test]
6115    async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> {
6116        let task_ctx = prepare_task_ctx(batch_size, false);
6117
6118        // Build left table with NULL key (this row should not be output)
6119        let left = build_table_two_cols(
6120            ("c1", &vec![Some(1), Some(4), None]),
6121            ("dummy", &vec![Some(10), Some(40), Some(0)]),
6122        );
6123
6124        // Build right table (no NULL, so probe-side check passes)
6125        let right = build_table_two_cols(
6126            ("c2", &vec![Some(1), Some(2), Some(3)]),
6127            ("dummy", &vec![Some(100), Some(200), Some(300)]),
6128        );
6129
6130        let on = vec![(
6131            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6132            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6133        )];
6134
6135        // Create null-aware anti join
6136        let join = HashJoinExec::try_new(
6137            left,
6138            right,
6139            on,
6140            None,
6141            &JoinType::LeftAnti,
6142            None,
6143            PartitionMode::CollectLeft,
6144            NullEquality::NullEqualsNothing,
6145            true, // null_aware = true
6146        )?;
6147
6148        let stream = join.execute(0, task_ctx)?;
6149        let batches = common::collect(stream).await?;
6150
6151        // Expected: only c1=4 (not c1=1 which matches, not c1=NULL)
6152        allow_duplicates! {
6153            assert_snapshot!(batches_to_sort_string(&batches), @r"
6154            +----+-------+
6155            | c1 | dummy |
6156            +----+-------+
6157            | 4  | 40    |
6158            +----+-------+
6159            ");
6160        }
6161        Ok(())
6162    }
6163
6164    /// Test null-aware anti join with no NULLs (should work like regular anti join)
6165    #[apply(hash_join_exec_configs)]
6166    #[tokio::test]
6167    async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> {
6168        let task_ctx = prepare_task_ctx(batch_size, false);
6169
6170        // Build left table (no NULLs)
6171        let left = build_table_two_cols(
6172            ("c1", &vec![Some(1), Some(2), Some(4), Some(5)]),
6173            ("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]),
6174        );
6175
6176        // Build right table (no NULLs)
6177        let right = build_table_two_cols(
6178            ("c2", &vec![Some(1), Some(2), Some(3)]),
6179            ("dummy", &vec![Some(100), Some(200), Some(300)]),
6180        );
6181
6182        let on = vec![(
6183            Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6184            Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6185        )];
6186
6187        // Create null-aware anti join
6188        let join = HashJoinExec::try_new(
6189            left,
6190            right,
6191            on,
6192            None,
6193            &JoinType::LeftAnti,
6194            None,
6195            PartitionMode::CollectLeft,
6196            NullEquality::NullEqualsNothing,
6197            true, // null_aware = true
6198        )?;
6199
6200        let stream = join.execute(0, task_ctx)?;
6201        let batches = common::collect(stream).await?;
6202
6203        // Expected: c1=4 and c1=5 (they don't match anything in right)
6204        allow_duplicates! {
6205            assert_snapshot!(batches_to_sort_string(&batches), @r"
6206            +----+-------+
6207            | c1 | dummy |
6208            +----+-------+
6209            | 4  | 40    |
6210            | 5  | 50    |
6211            +----+-------+
6212            ");
6213        }
6214        Ok(())
6215    }
6216
6217    /// Test that null_aware validation rejects non-LeftAnti join types
6218    #[tokio::test]
6219    async fn test_null_aware_validation_wrong_join_type() {
6220        let left =
6221            build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)]));
6222        let right =
6223            build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)]));
6224
6225        let on = vec![(
6226            Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _,
6227            Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _,
6228        )];
6229
6230        // Try to create null-aware Inner join (should fail)
6231        let result = HashJoinExec::try_new(
6232            left,
6233            right,
6234            on,
6235            None,
6236            &JoinType::Inner,
6237            None,
6238            PartitionMode::CollectLeft,
6239            NullEquality::NullEqualsNothing,
6240            true, // null_aware = true (invalid for Inner join)
6241        );
6242
6243        assert!(result.is_err());
6244        assert!(
6245            result
6246                .unwrap_err()
6247                .to_string()
6248                .contains("null_aware can only be true for LeftAnti joins")
6249        );
6250    }
6251
6252    /// Test that null_aware validation rejects multi-column joins
6253    #[tokio::test]
6254    async fn test_null_aware_validation_multi_column() {
6255        let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3]));
6256        let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3]));
6257
6258        // Try multi-column join
6259        let on = vec![
6260            (
6261                Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
6262                Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _,
6263            ),
6264            (
6265                Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _,
6266                Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _,
6267            ),
6268        ];
6269
6270        // Try to create null-aware anti join with 2 columns (should fail)
6271        let result = HashJoinExec::try_new(
6272            left,
6273            right,
6274            on,
6275            None,
6276            &JoinType::LeftAnti,
6277            None,
6278            PartitionMode::CollectLeft,
6279            NullEquality::NullEqualsNothing,
6280            true, // null_aware = true (invalid for multi-column)
6281        );
6282
6283        assert!(result.is_err());
6284        assert!(
6285            result
6286                .unwrap_err()
6287                .to_string()
6288                .contains("null_aware anti join only supports single column join key")
6289        );
6290    }
6291
6292    #[test]
6293    fn test_lr_is_preserved() {
6294        assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
6295        assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
6296        assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
6297        assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
6298        assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true));
6299        assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true));
6300        assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
6301        assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true));
6302        assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true));
6303        assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
6304    }
6305
6306    #[test]
6307    fn test_with_dynamic_filter() -> Result<()> {
6308        let (_, _, on) = build_schema_and_on()?;
6309        let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1]));
6310        let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1]));
6311
6312        let join = HashJoinExec::try_new(
6313            left,
6314            right,
6315            on,
6316            None,
6317            &JoinType::Inner,
6318            None,
6319            PartitionMode::CollectLeft,
6320            NullEquality::NullEqualsNothing,
6321            false,
6322        )?;
6323        assert!(join.dynamic_filter_expr().is_none());
6324
6325        let df = Arc::new(DynamicFilterPhysicalExpr::new(
6326            vec![Arc::new(Column::new("b1", 1)) as _],
6327            lit(true),
6328        ));
6329        let join = join.with_dynamic_filter_expr(Arc::clone(&df))?;
6330
6331        let restored = join
6332            .dynamic_filter_expr()
6333            .expect("should have dynamic filter");
6334        assert_eq!(
6335            restored
6336                .expression_id()
6337                .expect("DynamicFilterPhysicalExpr always has an expression_id"),
6338            df.expression_id()
6339                .expect("DynamicFilterPhysicalExpr always has an expression_id"),
6340        );
6341        Ok(())
6342    }
6343
6344    #[test]
6345    fn test_with_dynamic_filter_rejects_invalid_columns() -> Result<()> {
6346        let (_, _, on) = build_schema_and_on()?;
6347        let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1]));
6348        let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1]));
6349
6350        let join = HashJoinExec::try_new(
6351            left,
6352            right,
6353            on,
6354            None,
6355            &JoinType::Inner,
6356            None,
6357            PartitionMode::CollectLeft,
6358            NullEquality::NullEqualsNothing,
6359            false,
6360        )?;
6361
6362        // Column index 99 is out of bounds for the right (probe) side schema.
6363        let df = Arc::new(DynamicFilterPhysicalExpr::new(
6364            vec![Arc::new(Column::new("bad", 99)) as _],
6365            lit(true),
6366        ));
6367        assert!(join.with_dynamic_filter_expr(df).is_err());
6368        Ok(())
6369    }
6370}