datafusion_physical_plan/joins/hash_join/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::ExecutionPlanProperties;
25use crate::execution_plan::{EmissionType, boundedness_from_children};
26use crate::filter_pushdown::{
27    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
28    FilterPushdownPropagation,
29};
30use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
31use crate::joins::hash_join::shared_bounds::{
32    ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
33};
34use crate::joins::hash_join::stream::{
35    BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
36};
37use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
38use crate::joins::utils::{
39    OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
40    swap_join_projection, update_hash,
41};
42use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
43use crate::projection::{
44    EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
45    try_pushdown_through_join,
46};
47use crate::repartition::REPARTITION_RANDOM_STATE;
48use crate::spill::get_record_batch_memory_size;
49use crate::{
50    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
51    PlanProperties, SendableRecordBatchStream, Statistics,
52    common::can_project,
53    joins::utils::{
54        BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
55        build_join_schema, check_join_is_valid, estimate_join_statistics,
56        need_produce_result_in_final, symmetric_join_output_partitioning,
57    },
58    metrics::{ExecutionPlanMetricsSet, MetricsSet},
59};
60
61use arrow::array::{ArrayRef, BooleanBufferBuilder};
62use arrow::compute::concat_batches;
63use arrow::datatypes::SchemaRef;
64use arrow::record_batch::RecordBatch;
65use arrow::util::bit_util;
66use arrow_schema::DataType;
67use datafusion_common::config::ConfigOptions;
68use datafusion_common::utils::memory::estimate_memory_size;
69use datafusion_common::{
70    JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, plan_err,
71    project_schema,
72};
73use datafusion_execution::TaskContext;
74use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
75use datafusion_expr::Accumulator;
76use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
77use datafusion_physical_expr::equivalence::{
78    ProjectionMapping, join_equivalence_properties,
79};
80use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
81use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
82
83use ahash::RandomState;
84use datafusion_physical_expr_common::physical_expr::fmt_sql;
85use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
86use futures::TryStreamExt;
87use parking_lot::Mutex;
88
89use super::partitioned_hash_eval::SeededRandomState;
90
91/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
92pub(crate) const HASH_JOIN_SEED: SeededRandomState =
93    SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
94
95/// HashTable and input data for the left (build side) of a join
96pub(super) struct JoinLeftData {
97    /// The hash table with indices into `batch`
98    /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
99    pub(super) hash_map: Arc<dyn JoinHashMapType>,
100    /// The input rows for the build side
101    batch: RecordBatch,
102    /// The build side on expressions values
103    values: Vec<ArrayRef>,
104    /// Shared bitmap builder for visited left indices
105    visited_indices_bitmap: SharedBitmapBuilder,
106    /// Counter of running probe-threads, potentially
107    /// able to update `visited_indices_bitmap`
108    probe_threads_counter: AtomicUsize,
109    /// We need to keep this field to maintain accurate memory accounting, even though we don't directly use it.
110    /// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
111    /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
112    /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
113    _reservation: MemoryReservation,
114    /// Bounds computed from the build side for dynamic filter pushdown.
115    /// If the partition is empty (no rows) this will be None.
116    /// If the partition has some rows this will be Some with the bounds for each join key column.
117    pub(super) bounds: Option<PartitionBounds>,
118    /// Membership testing strategy for filter pushdown
119    /// Contains either InList values for small build sides or hash table reference for large build sides
120    pub(super) membership: PushdownStrategy,
121}
122
123impl JoinLeftData {
124    /// return a reference to the hash map
125    pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
126        &*self.hash_map
127    }
128
129    /// returns a reference to the build side batch
130    pub(super) fn batch(&self) -> &RecordBatch {
131        &self.batch
132    }
133
134    /// returns a reference to the build side expressions values
135    pub(super) fn values(&self) -> &[ArrayRef] {
136        &self.values
137    }
138
139    /// returns a reference to the visited indices bitmap
140    pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
141        &self.visited_indices_bitmap
142    }
143
144    /// returns a reference to the InList values for filter pushdown
145    pub(super) fn membership(&self) -> &PushdownStrategy {
146        &self.membership
147    }
148
149    /// Decrements the counter of running threads, and returns `true`
150    /// if caller is the last running thread
151    pub(super) fn report_probe_completed(&self) -> bool {
152        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
153    }
154}
155
156#[expect(rustdoc::private_intra_doc_links)]
157/// Join execution plan: Evaluates equijoin predicates in parallel on multiple
158/// partitions using a hash table and an optional filter list to apply post
159/// join.
160///
161/// # Join Expressions
162///
163/// This implementation is optimized for evaluating equijoin predicates  (
164/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
165/// in [`Self::on`].
166///
167/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
168/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
169/// after the equijoin predicates.
170///
171/// # "Build Side" vs "Probe Side"
172///
173/// HashJoin takes two inputs, which are referred to as the "build" and the
174/// "probe". The build side is the first child, and the probe side is the second
175/// child.
176///
177/// The two inputs are treated differently and it is VERY important that the
178/// *smaller* input is placed on the build side to minimize the work of creating
179/// the hash table.
180///
181/// ```text
182///          ┌───────────┐
183///          │ HashJoin  │
184///          │           │
185///          └───────────┘
186///              │   │
187///        ┌─────┘   └─────┐
188///        ▼               ▼
189/// ┌────────────┐  ┌─────────────┐
190/// │   Input    │  │    Input    │
191/// │    [0]     │  │     [1]     │
192/// └────────────┘  └─────────────┘
193///
194///  "build side"    "probe side"
195/// ```
196///
197/// Execution proceeds in 2 stages:
198///
199/// 1. the **build phase** creates a hash table from the tuples of the build side,
200///    and single concatenated batch containing data from all fetched record batches.
201///    Resulting hash table stores hashed join-key fields for each row as a key, and
202///    indices of corresponding rows in concatenated batch.
203///
204/// Hash join uses LIFO data structure as a hash table, and in order to retain
205/// original build-side input order while obtaining data during probe phase, hash
206/// table is updated by iterating batch sequence in reverse order -- it allows to
207/// keep rows with smaller indices "on the top" of hash table, and still maintain
208/// correct indexing for concatenated build-side data batch.
209///
210/// Example of build phase for 3 record batches:
211///
212///
213/// ```text
214///
215///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
216///                                                                         ┌───────────────────────────┐
217///                             hashmap.insert(row-hash, row-idx + offset)  │                      idx  │
218///            ┌───────┐                                                    │          ┌───────┐        │
219///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
220///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
221///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
222///            └───────┘                                                    │          └───────┘        │
223///                                                                         │                           │
224///            ┌───────┐                                                    │          ┌───────┐        │
225///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
226///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
227///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
228///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
229///            │ Row 5 │                                                    │          │ Row 5 │    4   │
230///            └───────┘                                                    │          └───────┘        │
231///                                                                         │                           │
232///            ┌───────┐                                                    │          ┌───────┐        │
233///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
234///   Batch 3  │       │           - hashmap.insert(Row 2, idx 6)           │ Batch 1  │       │        │
235///            │ Row 7 │           - hashmap.insert(Row 1, idx 5)           │          │ Row 2 │    6   │
236///            └───────┘                                                    │          └───────┘        │
237///                                                                         │                           │
238///                                                                         └───────────────────────────┘
239/// ```
240///
241/// 2. the **probe phase** where the tuples of the probe side are streamed
242///    through, checking for matches of the join keys in the hash table.
243///
244/// ```text
245///                 ┌────────────────┐          ┌────────────────┐
246///                 │ ┌─────────┐    │          │ ┌─────────┐    │
247///                 │ │  Hash   │    │          │ │  Hash   │    │
248///                 │ │  Table  │    │          │ │  Table  │    │
249///                 │ │(keys are│    │          │ │(keys are│    │
250///                 │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
251///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
252/// *entire* build  │ │         │    │          │ │         │    │  streamed through, and
253///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
254/// into the hash   │      ▲         │          │          ▲     │   contents of the hash
255///     table       │       HashJoin │          │  HashJoin      │          table
256///                 └──────┼─────────┘          └──────────┼─────┘
257///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
258///            │                                                         │
259///
260///            │                                                         │
261///     ┌────────────┐                                            ┌────────────┐
262///     │RecordBatch │                                            │RecordBatch │
263///     └────────────┘                                            └────────────┘
264///     ┌────────────┐                                            ┌────────────┐
265///     │RecordBatch │                                            │RecordBatch │
266///     └────────────┘                                            └────────────┘
267///           ...                                                       ...
268///     ┌────────────┐                                            ┌────────────┐
269///     │RecordBatch │                                            │RecordBatch │
270///     └────────────┘                                            └────────────┘
271///
272///        build side                                                probe side
273/// ```
274///
275/// # Example "Optimal" Plans
276///
277/// The differences in the inputs means that for classic "Star Schema Query",
278/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
279/// one where there is one large table and several smaller "dimension" tables,
280/// joined on `Foreign Key = Primary Key` predicates.
281///
282/// A "Right Deep Tree" looks like this large table as the probe side on the
283/// lowest join:
284///
285/// ```text
286///             ┌───────────┐
287///             │ HashJoin  │
288///             │           │
289///             └───────────┘
290///                 │   │
291///         ┌───────┘   └──────────┐
292///         ▼                      ▼
293/// ┌───────────────┐        ┌───────────┐
294/// │ small table 1 │        │ HashJoin  │
295/// │  "dimension"  │        │           │
296/// └───────────────┘        └───┬───┬───┘
297///                   ┌──────────┘   └───────┐
298///                   │                      │
299///                   ▼                      ▼
300///           ┌───────────────┐        ┌───────────┐
301///           │ small table 2 │        │ HashJoin  │
302///           │  "dimension"  │        │           │
303///           └───────────────┘        └───┬───┬───┘
304///                               ┌────────┘   └────────┐
305///                               │                     │
306///                               ▼                     ▼
307///                       ┌───────────────┐     ┌───────────────┐
308///                       │ small table 3 │     │  large table  │
309///                       │  "dimension"  │     │    "fact"     │
310///                       └───────────────┘     └───────────────┘
311/// ```
312///
313/// # Clone / Shared State
314///
315/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
316/// loading of the left side with the processing in each output stream.
317/// Therefore it can not be [`Clone`]
318pub struct HashJoinExec {
319    /// left (build) side which gets hashed
320    pub left: Arc<dyn ExecutionPlan>,
321    /// right (probe) side which are filtered by the hash table
322    pub right: Arc<dyn ExecutionPlan>,
323    /// Set of equijoin columns from the relations: `(left_col, right_col)`
324    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
325    /// Filters which are applied while finding matching rows
326    pub filter: Option<JoinFilter>,
327    /// How the join is performed (`OUTER`, `INNER`, etc)
328    pub join_type: JoinType,
329    /// The schema after join. Please be careful when using this schema,
330    /// if there is a projection, the schema isn't the same as the output schema.
331    join_schema: SchemaRef,
332    /// Future that consumes left input and builds the hash table
333    ///
334    /// For CollectLeft partition mode, this structure is *shared* across all output streams.
335    ///
336    /// Each output stream waits on the `OnceAsync` to signal the completion of
337    /// the hash table creation.
338    left_fut: Arc<OnceAsync<JoinLeftData>>,
339    /// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization)
340    random_state: SeededRandomState,
341    /// Partitioning mode to use
342    pub mode: PartitionMode,
343    /// Execution metrics
344    metrics: ExecutionPlanMetricsSet,
345    /// The projection indices of the columns in the output schema of join
346    pub projection: Option<Vec<usize>>,
347    /// Information of index and left / right placement of columns
348    column_indices: Vec<ColumnIndex>,
349    /// The equality null-handling behavior of the join algorithm.
350    pub null_equality: NullEquality,
351    /// Cache holding plan properties like equivalences, output partitioning etc.
352    cache: PlanProperties,
353    /// Dynamic filter for pushing down to the probe side
354    /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
355    /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
356    dynamic_filter: Option<HashJoinExecDynamicFilter>,
357}
358
359#[derive(Clone)]
360struct HashJoinExecDynamicFilter {
361    /// Dynamic filter that we'll update with the results of the build side once that is done.
362    filter: Arc<DynamicFilterPhysicalExpr>,
363    /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
364    /// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
365    build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
366}
367
368impl fmt::Debug for HashJoinExec {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        f.debug_struct("HashJoinExec")
371            .field("left", &self.left)
372            .field("right", &self.right)
373            .field("on", &self.on)
374            .field("filter", &self.filter)
375            .field("join_type", &self.join_type)
376            .field("join_schema", &self.join_schema)
377            .field("left_fut", &self.left_fut)
378            .field("random_state", &self.random_state)
379            .field("mode", &self.mode)
380            .field("metrics", &self.metrics)
381            .field("projection", &self.projection)
382            .field("column_indices", &self.column_indices)
383            .field("null_equality", &self.null_equality)
384            .field("cache", &self.cache)
385            // Explicitly exclude dynamic_filter to avoid runtime state differences in tests
386            .finish()
387    }
388}
389
390impl EmbeddedProjection for HashJoinExec {
391    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
392        self.with_projection(projection)
393    }
394}
395
396impl HashJoinExec {
397    /// Tries to create a new [HashJoinExec].
398    ///
399    /// # Error
400    /// This function errors when it is not possible to join the left and right sides on keys `on`.
401    #[expect(clippy::too_many_arguments)]
402    pub fn try_new(
403        left: Arc<dyn ExecutionPlan>,
404        right: Arc<dyn ExecutionPlan>,
405        on: JoinOn,
406        filter: Option<JoinFilter>,
407        join_type: &JoinType,
408        projection: Option<Vec<usize>>,
409        partition_mode: PartitionMode,
410        null_equality: NullEquality,
411    ) -> Result<Self> {
412        let left_schema = left.schema();
413        let right_schema = right.schema();
414        if on.is_empty() {
415            return plan_err!("On constraints in HashJoinExec should be non-empty");
416        }
417
418        check_join_is_valid(&left_schema, &right_schema, &on)?;
419
420        let (join_schema, column_indices) =
421            build_join_schema(&left_schema, &right_schema, join_type);
422
423        let random_state = HASH_JOIN_SEED;
424
425        let join_schema = Arc::new(join_schema);
426
427        //  check if the projection is valid
428        can_project(&join_schema, projection.as_ref())?;
429
430        let cache = Self::compute_properties(
431            &left,
432            &right,
433            &join_schema,
434            *join_type,
435            &on,
436            partition_mode,
437            projection.as_ref(),
438        )?;
439
440        // Initialize both dynamic filter and bounds accumulator to None
441        // They will be set later if dynamic filtering is enabled
442
443        Ok(HashJoinExec {
444            left,
445            right,
446            on,
447            filter,
448            join_type: *join_type,
449            join_schema,
450            left_fut: Default::default(),
451            random_state,
452            mode: partition_mode,
453            metrics: ExecutionPlanMetricsSet::new(),
454            projection,
455            column_indices,
456            null_equality,
457            cache,
458            dynamic_filter: None,
459        })
460    }
461
462    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
463        // Extract the right-side keys (probe side keys) from the `on` clauses
464        // Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
465        let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
466        // Initialize with a placeholder expression (true) that will be updated when the hash table is built
467        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
468    }
469
470    /// left (build) side which gets hashed
471    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
472        &self.left
473    }
474
475    /// right (probe) side which are filtered by the hash table
476    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
477        &self.right
478    }
479
480    /// Set of common columns used to join on
481    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
482        &self.on
483    }
484
485    /// Filters applied before join output
486    pub fn filter(&self) -> Option<&JoinFilter> {
487        self.filter.as_ref()
488    }
489
490    /// How the join is performed
491    pub fn join_type(&self) -> &JoinType {
492        &self.join_type
493    }
494
495    /// The schema after join. Please be careful when using this schema,
496    /// if there is a projection, the schema isn't the same as the output schema.
497    pub fn join_schema(&self) -> &SchemaRef {
498        &self.join_schema
499    }
500
501    /// The partitioning mode of this hash join
502    pub fn partition_mode(&self) -> &PartitionMode {
503        &self.mode
504    }
505
506    /// Get null_equality
507    pub fn null_equality(&self) -> NullEquality {
508        self.null_equality
509    }
510
511    /// Get the dynamic filter expression for testing purposes.
512    /// Returns `None` if no dynamic filter has been set.
513    ///
514    /// This method is intended for testing only and should not be used in production code.
515    #[doc(hidden)]
516    pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
517        self.dynamic_filter
518            .as_ref()
519            .map(|df| Arc::clone(&df.filter))
520    }
521
522    /// Calculate order preservation flags for this hash join.
523    fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
524        vec![
525            false,
526            matches!(
527                join_type,
528                JoinType::Inner
529                    | JoinType::Right
530                    | JoinType::RightAnti
531                    | JoinType::RightSemi
532                    | JoinType::RightMark
533            ),
534        ]
535    }
536
537    /// Get probe side information for the hash join.
538    pub fn probe_side() -> JoinSide {
539        // In current implementation right side is always probe side.
540        JoinSide::Right
541    }
542
543    /// Return whether the join contains a projection
544    pub fn contains_projection(&self) -> bool {
545        self.projection.is_some()
546    }
547
548    /// Return new instance of [HashJoinExec] with the given projection.
549    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
550        //  check if the projection is valid
551        can_project(&self.schema(), projection.as_ref())?;
552        let projection = match projection {
553            Some(projection) => match &self.projection {
554                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
555                None => Some(projection),
556            },
557            None => None,
558        };
559        Self::try_new(
560            Arc::clone(&self.left),
561            Arc::clone(&self.right),
562            self.on.clone(),
563            self.filter.clone(),
564            &self.join_type,
565            projection,
566            self.mode,
567            self.null_equality,
568        )
569    }
570
571    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
572    fn compute_properties(
573        left: &Arc<dyn ExecutionPlan>,
574        right: &Arc<dyn ExecutionPlan>,
575        schema: &SchemaRef,
576        join_type: JoinType,
577        on: JoinOnRef,
578        mode: PartitionMode,
579        projection: Option<&Vec<usize>>,
580    ) -> Result<PlanProperties> {
581        // Calculate equivalence properties:
582        let mut eq_properties = join_equivalence_properties(
583            left.equivalence_properties().clone(),
584            right.equivalence_properties().clone(),
585            &join_type,
586            Arc::clone(schema),
587            &Self::maintains_input_order(join_type),
588            Some(Self::probe_side()),
589            on,
590        )?;
591
592        let mut output_partitioning = match mode {
593            PartitionMode::CollectLeft => {
594                asymmetric_join_output_partitioning(left, right, &join_type)?
595            }
596            PartitionMode::Auto => Partitioning::UnknownPartitioning(
597                right.output_partitioning().partition_count(),
598            ),
599            PartitionMode::Partitioned => {
600                symmetric_join_output_partitioning(left, right, &join_type)?
601            }
602        };
603
604        let emission_type = if left.boundedness().is_unbounded() {
605            EmissionType::Final
606        } else if right.pipeline_behavior() == EmissionType::Incremental {
607            match join_type {
608                // If we only need to generate matched rows from the probe side,
609                // we can emit rows incrementally.
610                JoinType::Inner
611                | JoinType::LeftSemi
612                | JoinType::RightSemi
613                | JoinType::Right
614                | JoinType::RightAnti
615                | JoinType::RightMark => EmissionType::Incremental,
616                // If we need to generate unmatched rows from the *build side*,
617                // we need to emit them at the end.
618                JoinType::Left
619                | JoinType::LeftAnti
620                | JoinType::LeftMark
621                | JoinType::Full => EmissionType::Both,
622            }
623        } else {
624            right.pipeline_behavior()
625        };
626
627        // If contains projection, update the PlanProperties.
628        if let Some(projection) = projection {
629            // construct a map from the input expressions to the output expression of the Projection
630            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
631            let out_schema = project_schema(schema, Some(projection))?;
632            output_partitioning =
633                output_partitioning.project(&projection_mapping, &eq_properties);
634            eq_properties = eq_properties.project(&projection_mapping, out_schema);
635        }
636
637        Ok(PlanProperties::new(
638            eq_properties,
639            output_partitioning,
640            emission_type,
641            boundedness_from_children([left, right]),
642        ))
643    }
644
645    /// Returns a new `ExecutionPlan` that computes the same join as this one,
646    /// with the left and right inputs swapped using the  specified
647    /// `partition_mode`.
648    ///
649    /// # Notes:
650    ///
651    /// This function is public so other downstream projects can use it to
652    /// construct `HashJoinExec` with right side as the build side.
653    ///
654    /// For using this interface directly, please refer to below:
655    ///
656    /// Hash join execution may require specific input partitioning (for example,
657    /// the left child may have a single partition while the right child has multiple).
658    ///
659    /// Calling this function on join nodes whose children have already been repartitioned
660    /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning
661    /// requirements of the hash join. Therefore, ensure you call this function
662    /// before inserting any repartitioning operators on the join's children.
663    ///
664    /// In DataFusion's default SQL interface, this function is used by the `JoinSelection`
665    /// physical optimizer rule to determine a good join order, which is
666    /// executed before the `EnforceDistribution` rule (the rule that may
667    /// insert `RepartitionExec` operators).
668    pub fn swap_inputs(
669        &self,
670        partition_mode: PartitionMode,
671    ) -> Result<Arc<dyn ExecutionPlan>> {
672        let left = self.left();
673        let right = self.right();
674        let new_join = HashJoinExec::try_new(
675            Arc::clone(right),
676            Arc::clone(left),
677            self.on()
678                .iter()
679                .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
680                .collect(),
681            self.filter().map(JoinFilter::swap),
682            &self.join_type().swap(),
683            swap_join_projection(
684                left.schema().fields().len(),
685                right.schema().fields().len(),
686                self.projection.as_ref(),
687                self.join_type(),
688            ),
689            partition_mode,
690            self.null_equality(),
691        )?;
692        // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
693        if matches!(
694            self.join_type(),
695            JoinType::LeftSemi
696                | JoinType::RightSemi
697                | JoinType::LeftAnti
698                | JoinType::RightAnti
699                | JoinType::LeftMark
700                | JoinType::RightMark
701        ) || self.projection.is_some()
702        {
703            Ok(Arc::new(new_join))
704        } else {
705            reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
706        }
707    }
708}
709
710impl DisplayAs for HashJoinExec {
711    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
712        match t {
713            DisplayFormatType::Default | DisplayFormatType::Verbose => {
714                let display_filter = self.filter.as_ref().map_or_else(
715                    || "".to_string(),
716                    |f| format!(", filter={}", f.expression()),
717                );
718                let display_projections = if self.contains_projection() {
719                    format!(
720                        ", projection=[{}]",
721                        self.projection
722                            .as_ref()
723                            .unwrap()
724                            .iter()
725                            .map(|index| format!(
726                                "{}@{}",
727                                self.join_schema.fields().get(*index).unwrap().name(),
728                                index
729                            ))
730                            .collect::<Vec<_>>()
731                            .join(", ")
732                    )
733                } else {
734                    "".to_string()
735                };
736                let display_null_equality =
737                    if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
738                        ", NullsEqual: true"
739                    } else {
740                        ""
741                    };
742                let on = self
743                    .on
744                    .iter()
745                    .map(|(c1, c2)| format!("({c1}, {c2})"))
746                    .collect::<Vec<String>>()
747                    .join(", ");
748                write!(
749                    f,
750                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
751                    self.mode,
752                    self.join_type,
753                    on,
754                    display_filter,
755                    display_projections,
756                    display_null_equality,
757                )
758            }
759            DisplayFormatType::TreeRender => {
760                let on = self
761                    .on
762                    .iter()
763                    .map(|(c1, c2)| {
764                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
765                    })
766                    .collect::<Vec<String>>()
767                    .join(", ");
768
769                if *self.join_type() != JoinType::Inner {
770                    writeln!(f, "join_type={:?}", self.join_type)?;
771                }
772
773                writeln!(f, "on={on}")?;
774
775                if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
776                    writeln!(f, "NullsEqual: true")?;
777                }
778
779                if let Some(filter) = self.filter.as_ref() {
780                    writeln!(f, "filter={filter}")?;
781                }
782
783                Ok(())
784            }
785        }
786    }
787}
788
789impl ExecutionPlan for HashJoinExec {
790    fn name(&self) -> &'static str {
791        "HashJoinExec"
792    }
793
794    fn as_any(&self) -> &dyn Any {
795        self
796    }
797
798    fn properties(&self) -> &PlanProperties {
799        &self.cache
800    }
801
802    fn required_input_distribution(&self) -> Vec<Distribution> {
803        match self.mode {
804            PartitionMode::CollectLeft => vec![
805                Distribution::SinglePartition,
806                Distribution::UnspecifiedDistribution,
807            ],
808            PartitionMode::Partitioned => {
809                let (left_expr, right_expr) = self
810                    .on
811                    .iter()
812                    .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
813                    .unzip();
814                vec![
815                    Distribution::HashPartitioned(left_expr),
816                    Distribution::HashPartitioned(right_expr),
817                ]
818            }
819            PartitionMode::Auto => vec![
820                Distribution::UnspecifiedDistribution,
821                Distribution::UnspecifiedDistribution,
822            ],
823        }
824    }
825
826    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by
827    // applying the hash function to convert the join key(s) in each row into a hash value from the
828    // probe side table in the order they're arranged. The hash value is used to look up corresponding
829    // entries in the hash table that was constructed from the build side table during the build phase.
830    //
831    // Because of the immediate generation of result rows once a match is found,
832    // the output of the join tends to follow the order in which the rows were read from
833    // the probe side table. This is simply due to the sequence in which the rows were processed.
834    // Hence, it appears that the hash join is preserving the order of the probe side.
835    //
836    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
837    // the unmatched rows from the probe side are also kept in order.
838    // This is because the **`RightAnti`** join is designed to return rows from the right
839    // (probe side) table that have no match in the left (build side) table. Because the rows
840    // are processed sequentially in the probe phase, and unmatched rows are directly output
841    // as results, these results tend to retain the order of the probe side table.
842    fn maintains_input_order(&self) -> Vec<bool> {
843        Self::maintains_input_order(self.join_type)
844    }
845
846    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
847        vec![&self.left, &self.right]
848    }
849
850    /// Creates a new HashJoinExec with different children while preserving configuration.
851    ///
852    /// This method is called during query optimization when the optimizer creates new
853    /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
854    /// rather than cloning the existing one because partitioning may have changed.
855    fn with_new_children(
856        self: Arc<Self>,
857        children: Vec<Arc<dyn ExecutionPlan>>,
858    ) -> Result<Arc<dyn ExecutionPlan>> {
859        Ok(Arc::new(HashJoinExec {
860            left: Arc::clone(&children[0]),
861            right: Arc::clone(&children[1]),
862            on: self.on.clone(),
863            filter: self.filter.clone(),
864            join_type: self.join_type,
865            join_schema: Arc::clone(&self.join_schema),
866            left_fut: Arc::clone(&self.left_fut),
867            random_state: self.random_state.clone(),
868            mode: self.mode,
869            metrics: ExecutionPlanMetricsSet::new(),
870            projection: self.projection.clone(),
871            column_indices: self.column_indices.clone(),
872            null_equality: self.null_equality,
873            cache: Self::compute_properties(
874                &children[0],
875                &children[1],
876                &self.join_schema,
877                self.join_type,
878                &self.on,
879                self.mode,
880                self.projection.as_ref(),
881            )?,
882            // Keep the dynamic filter, bounds accumulator will be reset
883            dynamic_filter: self.dynamic_filter.clone(),
884        }))
885    }
886
887    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
888        Ok(Arc::new(HashJoinExec {
889            left: Arc::clone(&self.left),
890            right: Arc::clone(&self.right),
891            on: self.on.clone(),
892            filter: self.filter.clone(),
893            join_type: self.join_type,
894            join_schema: Arc::clone(&self.join_schema),
895            // Reset the left_fut to allow re-execution
896            left_fut: Arc::new(OnceAsync::default()),
897            random_state: self.random_state.clone(),
898            mode: self.mode,
899            metrics: ExecutionPlanMetricsSet::new(),
900            projection: self.projection.clone(),
901            column_indices: self.column_indices.clone(),
902            null_equality: self.null_equality,
903            cache: self.cache.clone(),
904            // Reset dynamic filter and bounds accumulator to initial state
905            dynamic_filter: None,
906        }))
907    }
908
909    fn execute(
910        &self,
911        partition: usize,
912        context: Arc<TaskContext>,
913    ) -> Result<SendableRecordBatchStream> {
914        let on_left = self
915            .on
916            .iter()
917            .map(|on| Arc::clone(&on.0))
918            .collect::<Vec<_>>();
919        let left_partitions = self.left.output_partitioning().partition_count();
920        let right_partitions = self.right.output_partitioning().partition_count();
921
922        assert_or_internal_err!(
923            self.mode != PartitionMode::Partitioned
924                || left_partitions == right_partitions,
925            "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
926             consider using RepartitionExec"
927        );
928
929        assert_or_internal_err!(
930            self.mode != PartitionMode::CollectLeft || left_partitions == 1,
931            "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
932             consider using CoalescePartitionsExec or the EnforceDistribution rule"
933        );
934
935        // Only enable dynamic filter pushdown if:
936        // - The session config enables dynamic filter pushdown
937        // - A dynamic filter exists
938        // - At least one consumer is holding a reference to it, this avoids expensive filter
939        //   computation when disabled or when no consumer will use it.
940        let enable_dynamic_filter_pushdown = context
941            .session_config()
942            .options()
943            .optimizer
944            .enable_join_dynamic_filter_pushdown
945            && self
946                .dynamic_filter
947                .as_ref()
948                .map(|df| df.filter.is_used())
949                .unwrap_or(false);
950
951        let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
952        let left_fut = match self.mode {
953            PartitionMode::CollectLeft => self.left_fut.try_once(|| {
954                let left_stream = self.left.execute(0, Arc::clone(&context))?;
955
956                let reservation =
957                    MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
958
959                Ok(collect_left_input(
960                    self.random_state.random_state().clone(),
961                    left_stream,
962                    on_left.clone(),
963                    join_metrics.clone(),
964                    reservation,
965                    need_produce_result_in_final(self.join_type),
966                    self.right().output_partitioning().partition_count(),
967                    enable_dynamic_filter_pushdown,
968                    context
969                        .session_config()
970                        .options()
971                        .optimizer
972                        .hash_join_inlist_pushdown_max_size,
973                    context
974                        .session_config()
975                        .options()
976                        .optimizer
977                        .hash_join_inlist_pushdown_max_distinct_values,
978                ))
979            })?,
980            PartitionMode::Partitioned => {
981                let left_stream = self.left.execute(partition, Arc::clone(&context))?;
982
983                let reservation =
984                    MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
985                        .register(context.memory_pool());
986
987                OnceFut::new(collect_left_input(
988                    self.random_state.random_state().clone(),
989                    left_stream,
990                    on_left.clone(),
991                    join_metrics.clone(),
992                    reservation,
993                    need_produce_result_in_final(self.join_type),
994                    1,
995                    enable_dynamic_filter_pushdown,
996                    context
997                        .session_config()
998                        .options()
999                        .optimizer
1000                        .hash_join_inlist_pushdown_max_size,
1001                    context
1002                        .session_config()
1003                        .options()
1004                        .optimizer
1005                        .hash_join_inlist_pushdown_max_distinct_values,
1006                ))
1007            }
1008            PartitionMode::Auto => {
1009                return plan_err!(
1010                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1011                    PartitionMode::Auto
1012                );
1013            }
1014        };
1015
1016        let batch_size = context.session_config().batch_size();
1017
1018        // Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1019        // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1020        let repartition_random_state = REPARTITION_RANDOM_STATE;
1021        let build_accumulator = enable_dynamic_filter_pushdown
1022            .then(|| {
1023                self.dynamic_filter.as_ref().map(|df| {
1024                    let filter = Arc::clone(&df.filter);
1025                    let on_right = self
1026                        .on
1027                        .iter()
1028                        .map(|(_, right_expr)| Arc::clone(right_expr))
1029                        .collect::<Vec<_>>();
1030                    Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1031                        Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1032                            self.mode,
1033                            self.left.as_ref(),
1034                            self.right.as_ref(),
1035                            filter,
1036                            on_right,
1037                            repartition_random_state,
1038                        ))
1039                    })))
1040                })
1041            })
1042            .flatten()
1043            .flatten();
1044
1045        // we have the batches and the hash map with their keys. We can how create a stream
1046        // over the right that uses this information to issue new batches.
1047        let right_stream = self.right.execute(partition, context)?;
1048
1049        // update column indices to reflect the projection
1050        let column_indices_after_projection = match &self.projection {
1051            Some(projection) => projection
1052                .iter()
1053                .map(|i| self.column_indices[*i].clone())
1054                .collect(),
1055            None => self.column_indices.clone(),
1056        };
1057
1058        let on_right = self
1059            .on
1060            .iter()
1061            .map(|(_, right_expr)| Arc::clone(right_expr))
1062            .collect::<Vec<_>>();
1063
1064        Ok(Box::pin(HashJoinStream::new(
1065            partition,
1066            self.schema(),
1067            on_right,
1068            self.filter.clone(),
1069            self.join_type,
1070            right_stream,
1071            self.random_state.random_state().clone(),
1072            join_metrics,
1073            column_indices_after_projection,
1074            self.null_equality,
1075            HashJoinStreamState::WaitBuildSide,
1076            BuildSide::Initial(BuildSideInitialState { left_fut }),
1077            batch_size,
1078            vec![],
1079            self.right.output_ordering().is_some(),
1080            build_accumulator,
1081            self.mode,
1082        )))
1083    }
1084
1085    fn metrics(&self) -> Option<MetricsSet> {
1086        Some(self.metrics.clone_inner())
1087    }
1088
1089    fn statistics(&self) -> Result<Statistics> {
1090        self.partition_statistics(None)
1091    }
1092
1093    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1094        if partition.is_some() {
1095            return Ok(Statistics::new_unknown(&self.schema()));
1096        }
1097        // TODO stats: it is not possible in general to know the output size of joins
1098        // There are some special cases though, for example:
1099        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1100        let stats = estimate_join_statistics(
1101            self.left.partition_statistics(None)?,
1102            self.right.partition_statistics(None)?,
1103            &self.on,
1104            &self.join_type,
1105            &self.join_schema,
1106        )?;
1107        // Project statistics if there is a projection
1108        Ok(stats.project(self.projection.as_ref()))
1109    }
1110
1111    /// Tries to push `projection` down through `hash_join`. If possible, performs the
1112    /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
1113    /// as its children. Otherwise, returns `None`.
1114    fn try_swapping_with_projection(
1115        &self,
1116        projection: &ProjectionExec,
1117    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1118        // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
1119        if self.contains_projection() {
1120            return Ok(None);
1121        }
1122
1123        let schema = self.schema();
1124        if let Some(JoinData {
1125            projected_left_child,
1126            projected_right_child,
1127            join_filter,
1128            join_on,
1129        }) = try_pushdown_through_join(
1130            projection,
1131            self.left(),
1132            self.right(),
1133            self.on(),
1134            &schema,
1135            self.filter(),
1136        )? {
1137            Ok(Some(Arc::new(HashJoinExec::try_new(
1138                Arc::new(projected_left_child),
1139                Arc::new(projected_right_child),
1140                join_on,
1141                join_filter,
1142                self.join_type(),
1143                // Returned early if projection is not None
1144                None,
1145                *self.partition_mode(),
1146                self.null_equality,
1147            )?)))
1148        } else {
1149            try_embed_projection(projection, self)
1150        }
1151    }
1152
1153    fn gather_filters_for_pushdown(
1154        &self,
1155        phase: FilterPushdownPhase,
1156        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1157        config: &ConfigOptions,
1158    ) -> Result<FilterDescription> {
1159        // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1160        // For now we don't support them.
1161        // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1162        // See https://github.com/apache/datafusion/issues/16973 for tracking.
1163        if self.join_type != JoinType::Inner {
1164            return Ok(FilterDescription::all_unsupported(
1165                &parent_filters,
1166                &self.children(),
1167            ));
1168        }
1169
1170        // Get basic filter descriptions for both children
1171        let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1172            &parent_filters,
1173            self.left(),
1174        )?;
1175        let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1176            &parent_filters,
1177            self.right(),
1178        )?;
1179
1180        // Add dynamic filters in Post phase if enabled
1181        if matches!(phase, FilterPushdownPhase::Post)
1182            && config.optimizer.enable_join_dynamic_filter_pushdown
1183        {
1184            // Add actual dynamic filter to right side (probe side)
1185            let dynamic_filter = Self::create_dynamic_filter(&self.on);
1186            right_child = right_child.with_self_filter(dynamic_filter);
1187        }
1188
1189        Ok(FilterDescription::new()
1190            .with_child(left_child)
1191            .with_child(right_child))
1192    }
1193
1194    fn handle_child_pushdown_result(
1195        &self,
1196        _phase: FilterPushdownPhase,
1197        child_pushdown_result: ChildPushdownResult,
1198        _config: &ConfigOptions,
1199    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1200        // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for
1201        // non-inner joins in `gather_filters_for_pushdown`.
1202        // However it's a cheap check and serves to inform future devs touching this function that they need to be really
1203        // careful pushing down filters through non-inner joins.
1204        if self.join_type != JoinType::Inner {
1205            // Other types of joins can support *some* filters, but restrictions are complex and error prone.
1206            // For now we don't support them.
1207            // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
1208            return Ok(FilterPushdownPropagation::all_unsupported(
1209                child_pushdown_result,
1210            ));
1211        }
1212
1213        let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1214        assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1215        let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1216        // We expect 0 or 1 self filters
1217        if let Some(filter) = right_child_self_filters.first() {
1218            // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1219            // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1220            let predicate = Arc::clone(&filter.predicate);
1221            if let Ok(dynamic_filter) =
1222                Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1223            {
1224                // We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1225                let new_node = Arc::new(HashJoinExec {
1226                    left: Arc::clone(&self.left),
1227                    right: Arc::clone(&self.right),
1228                    on: self.on.clone(),
1229                    filter: self.filter.clone(),
1230                    join_type: self.join_type,
1231                    join_schema: Arc::clone(&self.join_schema),
1232                    left_fut: Arc::clone(&self.left_fut),
1233                    random_state: self.random_state.clone(),
1234                    mode: self.mode,
1235                    metrics: ExecutionPlanMetricsSet::new(),
1236                    projection: self.projection.clone(),
1237                    column_indices: self.column_indices.clone(),
1238                    null_equality: self.null_equality,
1239                    cache: self.cache.clone(),
1240                    dynamic_filter: Some(HashJoinExecDynamicFilter {
1241                        filter: dynamic_filter,
1242                        build_accumulator: OnceLock::new(),
1243                    }),
1244                });
1245                result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1246            }
1247        }
1248        Ok(result)
1249    }
1250}
1251
1252/// Accumulator for collecting min/max bounds from build-side data during hash join.
1253///
1254/// This struct encapsulates the logic for progressively computing column bounds
1255/// (minimum and maximum values) for a specific join key expression as batches
1256/// are processed during the build phase of a hash join.
1257///
1258/// The bounds are used for dynamic filter pushdown optimization, where filters
1259/// based on the actual data ranges can be pushed down to the probe side to
1260/// eliminate unnecessary data early.
1261struct CollectLeftAccumulator {
1262    /// The physical expression to evaluate for each batch
1263    expr: Arc<dyn PhysicalExpr>,
1264    /// Accumulator for tracking the minimum value across all batches
1265    min: MinAccumulator,
1266    /// Accumulator for tracking the maximum value across all batches
1267    max: MaxAccumulator,
1268}
1269
1270impl CollectLeftAccumulator {
1271    /// Creates a new accumulator for tracking bounds of a join key expression.
1272    ///
1273    /// # Arguments
1274    /// * `expr` - The physical expression to track bounds for
1275    /// * `schema` - The schema of the input data
1276    ///
1277    /// # Returns
1278    /// A new `CollectLeftAccumulator` instance configured for the expression's data type
1279    fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1280        /// Recursively unwraps dictionary types to get the underlying value type.
1281        fn dictionary_value_type(data_type: &DataType) -> DataType {
1282            match data_type {
1283                DataType::Dictionary(_, value_type) => {
1284                    dictionary_value_type(value_type.as_ref())
1285                }
1286                _ => data_type.clone(),
1287            }
1288        }
1289
1290        let data_type = expr
1291            .data_type(schema)
1292            // Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
1293            .map(|dt| dictionary_value_type(&dt))?;
1294        Ok(Self {
1295            expr,
1296            min: MinAccumulator::try_new(&data_type)?,
1297            max: MaxAccumulator::try_new(&data_type)?,
1298        })
1299    }
1300
1301    /// Updates the accumulators with values from a new batch.
1302    ///
1303    /// Evaluates the expression on the batch and updates both min and max
1304    /// accumulators with the resulting values.
1305    ///
1306    /// # Arguments
1307    /// * `batch` - The record batch to process
1308    ///
1309    /// # Returns
1310    /// Ok(()) if the update succeeds, or an error if expression evaluation fails
1311    fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1312        let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1313        self.min.update_batch(std::slice::from_ref(&array))?;
1314        self.max.update_batch(std::slice::from_ref(&array))?;
1315        Ok(())
1316    }
1317
1318    /// Finalizes the accumulation and returns the computed bounds.
1319    ///
1320    /// Consumes self to extract the final min and max values from the accumulators.
1321    ///
1322    /// # Returns
1323    /// The `ColumnBounds` containing the minimum and maximum values observed
1324    fn evaluate(mut self) -> Result<ColumnBounds> {
1325        Ok(ColumnBounds::new(
1326            self.min.evaluate()?,
1327            self.max.evaluate()?,
1328        ))
1329    }
1330}
1331
1332/// State for collecting the build-side data during hash join
1333struct BuildSideState {
1334    batches: Vec<RecordBatch>,
1335    num_rows: usize,
1336    metrics: BuildProbeJoinMetrics,
1337    reservation: MemoryReservation,
1338    bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1339}
1340
1341impl BuildSideState {
1342    /// Create a new BuildSideState with optional accumulators for bounds computation
1343    fn try_new(
1344        metrics: BuildProbeJoinMetrics,
1345        reservation: MemoryReservation,
1346        on_left: Vec<Arc<dyn PhysicalExpr>>,
1347        schema: &SchemaRef,
1348        should_compute_dynamic_filters: bool,
1349    ) -> Result<Self> {
1350        Ok(Self {
1351            batches: Vec::new(),
1352            num_rows: 0,
1353            metrics,
1354            reservation,
1355            bounds_accumulators: should_compute_dynamic_filters
1356                .then(|| {
1357                    on_left
1358                        .into_iter()
1359                        .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1360                        .collect::<Result<Vec<_>>>()
1361                })
1362                .transpose()?,
1363        })
1364    }
1365}
1366
1367/// Collects all batches from the left (build) side stream and creates a hash map for joining.
1368///
1369/// This function is responsible for:
1370/// 1. Consuming the entire left stream and collecting all batches into memory
1371/// 2. Building a hash map from the join key columns for efficient probe operations
1372/// 3. Computing bounds for dynamic filter pushdown (if enabled)
1373/// 4. Preparing visited indices bitmap for certain join types
1374///
1375/// # Parameters
1376/// * `random_state` - Random state for consistent hashing across partitions
1377/// * `left_stream` - Stream of record batches from the build side
1378/// * `on_left` - Physical expressions for the left side join keys
1379/// * `metrics` - Metrics collector for tracking memory usage and row counts
1380/// * `reservation` - Memory reservation tracker for the hash table and data
1381/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
1382/// * `probe_threads_count` - Number of threads that will probe this hash table
1383/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering
1384///
1385/// # Dynamic Filter Coordination
1386/// When `should_compute_dynamic_filters` is true, this function computes the min/max bounds
1387/// for each join key column but does NOT update the dynamic filter. Instead, the
1388/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1389/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
1390/// before updating the filter exactly once.
1391///
1392/// # Returns
1393/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
1394/// visited indices bitmap, and computed bounds (if requested).
1395#[expect(clippy::too_many_arguments)]
1396async fn collect_left_input(
1397    random_state: RandomState,
1398    left_stream: SendableRecordBatchStream,
1399    on_left: Vec<PhysicalExprRef>,
1400    metrics: BuildProbeJoinMetrics,
1401    reservation: MemoryReservation,
1402    with_visited_indices_bitmap: bool,
1403    probe_threads_count: usize,
1404    should_compute_dynamic_filters: bool,
1405    max_inlist_size: usize,
1406    max_inlist_distinct_values: usize,
1407) -> Result<JoinLeftData> {
1408    let schema = left_stream.schema();
1409
1410    // This operation performs 2 steps at once:
1411    // 1. creates a [JoinHashMap] of all batches from the stream
1412    // 2. stores the batches in a vector.
1413    let initial = BuildSideState::try_new(
1414        metrics,
1415        reservation,
1416        on_left.clone(),
1417        &schema,
1418        should_compute_dynamic_filters,
1419    )?;
1420
1421    let state = left_stream
1422        .try_fold(initial, |mut state, batch| async move {
1423            // Update accumulators if computing bounds
1424            if let Some(ref mut accumulators) = state.bounds_accumulators {
1425                for accumulator in accumulators {
1426                    accumulator.update_batch(&batch)?;
1427                }
1428            }
1429
1430            // Decide if we spill or not
1431            let batch_size = get_record_batch_memory_size(&batch);
1432            // Reserve memory for incoming batch
1433            state.reservation.try_grow(batch_size)?;
1434            // Update metrics
1435            state.metrics.build_mem_used.add(batch_size);
1436            state.metrics.build_input_batches.add(1);
1437            state.metrics.build_input_rows.add(batch.num_rows());
1438            // Update row count
1439            state.num_rows += batch.num_rows();
1440            // Push batch to output
1441            state.batches.push(batch);
1442            Ok(state)
1443        })
1444        .await?;
1445
1446    // Extract fields from state
1447    let BuildSideState {
1448        batches,
1449        num_rows,
1450        metrics,
1451        mut reservation,
1452        bounds_accumulators,
1453    } = state;
1454
1455    // Estimation of memory size, required for hashtable, prior to allocation.
1456    // Final result can be verified using `RawTable.allocation_info()`
1457    let fixed_size_u32 = size_of::<JoinHashMapU32>();
1458    let fixed_size_u64 = size_of::<JoinHashMapU64>();
1459
1460    // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
1461    // `u64` indice variant
1462    // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
1463    let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1464        let estimated_hashtable_size =
1465            estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1466        reservation.try_grow(estimated_hashtable_size)?;
1467        metrics.build_mem_used.add(estimated_hashtable_size);
1468        Box::new(JoinHashMapU64::with_capacity(num_rows))
1469    } else {
1470        let estimated_hashtable_size =
1471            estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1472        reservation.try_grow(estimated_hashtable_size)?;
1473        metrics.build_mem_used.add(estimated_hashtable_size);
1474        Box::new(JoinHashMapU32::with_capacity(num_rows))
1475    };
1476
1477    let mut hashes_buffer = Vec::new();
1478    let mut offset = 0;
1479
1480    // Updating hashmap starting from the last batch
1481    let batches_iter = batches.iter().rev();
1482    for batch in batches_iter.clone() {
1483        hashes_buffer.clear();
1484        hashes_buffer.resize(batch.num_rows(), 0);
1485        update_hash(
1486            &on_left,
1487            batch,
1488            &mut *hashmap,
1489            offset,
1490            &random_state,
1491            &mut hashes_buffer,
1492            0,
1493            true,
1494        )?;
1495        offset += batch.num_rows();
1496    }
1497    // Merge all batches into a single batch, so we can directly index into the arrays
1498    let batch = concat_batches(&schema, batches_iter)?;
1499
1500    // Reserve additional memory for visited indices bitmap and create shared builder
1501    let visited_indices_bitmap = if with_visited_indices_bitmap {
1502        let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
1503        reservation.try_grow(bitmap_size)?;
1504        metrics.build_mem_used.add(bitmap_size);
1505
1506        let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
1507        bitmap_buffer.append_n(num_rows, false);
1508        bitmap_buffer
1509    } else {
1510        BooleanBufferBuilder::new(0)
1511    };
1512
1513    let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
1514
1515    // Compute bounds for dynamic filter if enabled
1516    let bounds = match bounds_accumulators {
1517        Some(accumulators) if num_rows > 0 => {
1518            let bounds = accumulators
1519                .into_iter()
1520                .map(CollectLeftAccumulator::evaluate)
1521                .collect::<Result<Vec<_>>>()?;
1522            Some(PartitionBounds::new(bounds))
1523        }
1524        _ => None,
1525    };
1526
1527    // Convert Box to Arc for sharing with SharedBuildAccumulator
1528    let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
1529
1530    let membership = if num_rows == 0 {
1531        PushdownStrategy::Empty
1532    } else {
1533        // If the build side is small enough we can use IN list pushdown.
1534        // If it's too big we fall back to pushing down a reference to the hash table.
1535        // See `PushdownStrategy` for more details.
1536        let estimated_size = left_values
1537            .iter()
1538            .map(|arr| arr.get_array_memory_size())
1539            .sum::<usize>();
1540        if left_values.is_empty()
1541            || left_values[0].is_empty()
1542            || estimated_size > max_inlist_size
1543            || hash_map.len() > max_inlist_distinct_values
1544        {
1545            PushdownStrategy::HashTable(Arc::clone(&hash_map))
1546        } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
1547            PushdownStrategy::InList(in_list_values)
1548        } else {
1549            PushdownStrategy::HashTable(Arc::clone(&hash_map))
1550        }
1551    };
1552
1553    let data = JoinLeftData {
1554        hash_map,
1555        batch,
1556        values: left_values,
1557        visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
1558        probe_threads_counter: AtomicUsize::new(probe_threads_count),
1559        _reservation: reservation,
1560        bounds,
1561        membership,
1562    };
1563
1564    Ok(data)
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569    use super::*;
1570    use crate::coalesce_partitions::CoalescePartitionsExec;
1571    use crate::joins::hash_join::stream::lookup_join_hashmap;
1572    use crate::test::{TestMemoryExec, assert_join_metrics};
1573    use crate::{
1574        common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1575        test::exec::MockExec,
1576    };
1577
1578    use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1579    use arrow::buffer::NullBuffer;
1580    use arrow::datatypes::{DataType, Field};
1581    use arrow_schema::Schema;
1582    use datafusion_common::hash_utils::create_hashes;
1583    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1584    use datafusion_common::{
1585        ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
1586        exec_err, internal_err,
1587    };
1588    use datafusion_execution::config::SessionConfig;
1589    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1590    use datafusion_expr::Operator;
1591    use datafusion_physical_expr::PhysicalExpr;
1592    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1593    use hashbrown::HashTable;
1594    use insta::{allow_duplicates, assert_snapshot};
1595    use rstest::*;
1596    use rstest_reuse::*;
1597
1598    fn div_ceil(a: usize, b: usize) -> usize {
1599        a.div_ceil(b)
1600    }
1601
1602    #[template]
1603    #[rstest]
1604    fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1605
1606    fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1607        let session_config = SessionConfig::default().with_batch_size(batch_size);
1608        Arc::new(TaskContext::default().with_session_config(session_config))
1609    }
1610
1611    fn build_table(
1612        a: (&str, &Vec<i32>),
1613        b: (&str, &Vec<i32>),
1614        c: (&str, &Vec<i32>),
1615    ) -> Arc<dyn ExecutionPlan> {
1616        let batch = build_table_i32(a, b, c);
1617        let schema = batch.schema();
1618        TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1619    }
1620
1621    fn join(
1622        left: Arc<dyn ExecutionPlan>,
1623        right: Arc<dyn ExecutionPlan>,
1624        on: JoinOn,
1625        join_type: &JoinType,
1626        null_equality: NullEquality,
1627    ) -> Result<HashJoinExec> {
1628        HashJoinExec::try_new(
1629            left,
1630            right,
1631            on,
1632            None,
1633            join_type,
1634            None,
1635            PartitionMode::CollectLeft,
1636            null_equality,
1637        )
1638    }
1639
1640    fn join_with_filter(
1641        left: Arc<dyn ExecutionPlan>,
1642        right: Arc<dyn ExecutionPlan>,
1643        on: JoinOn,
1644        filter: JoinFilter,
1645        join_type: &JoinType,
1646        null_equality: NullEquality,
1647    ) -> Result<HashJoinExec> {
1648        HashJoinExec::try_new(
1649            left,
1650            right,
1651            on,
1652            Some(filter),
1653            join_type,
1654            None,
1655            PartitionMode::CollectLeft,
1656            null_equality,
1657        )
1658    }
1659
1660    async fn join_collect(
1661        left: Arc<dyn ExecutionPlan>,
1662        right: Arc<dyn ExecutionPlan>,
1663        on: JoinOn,
1664        join_type: &JoinType,
1665        null_equality: NullEquality,
1666        context: Arc<TaskContext>,
1667    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1668        let join = join(left, right, on, join_type, null_equality)?;
1669        let columns_header = columns(&join.schema());
1670
1671        let stream = join.execute(0, context)?;
1672        let batches = common::collect(stream).await?;
1673        let metrics = join.metrics().unwrap();
1674
1675        Ok((columns_header, batches, metrics))
1676    }
1677
1678    async fn partitioned_join_collect(
1679        left: Arc<dyn ExecutionPlan>,
1680        right: Arc<dyn ExecutionPlan>,
1681        on: JoinOn,
1682        join_type: &JoinType,
1683        null_equality: NullEquality,
1684        context: Arc<TaskContext>,
1685    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1686        join_collect_with_partition_mode(
1687            left,
1688            right,
1689            on,
1690            join_type,
1691            PartitionMode::Partitioned,
1692            null_equality,
1693            context,
1694        )
1695        .await
1696    }
1697
1698    async fn join_collect_with_partition_mode(
1699        left: Arc<dyn ExecutionPlan>,
1700        right: Arc<dyn ExecutionPlan>,
1701        on: JoinOn,
1702        join_type: &JoinType,
1703        partition_mode: PartitionMode,
1704        null_equality: NullEquality,
1705        context: Arc<TaskContext>,
1706    ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1707        let partition_count = 4;
1708
1709        let (left_expr, right_expr) = on
1710            .iter()
1711            .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1712            .unzip();
1713
1714        let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1715            PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1716            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1717                left,
1718                Partitioning::Hash(left_expr, partition_count),
1719            )?),
1720            PartitionMode::Auto => {
1721                return internal_err!("Unexpected PartitionMode::Auto in join tests");
1722            }
1723        };
1724
1725        let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1726            PartitionMode::CollectLeft => {
1727                let partition_column_name = right.schema().field(0).name().clone();
1728                let partition_expr = vec![Arc::new(Column::new_with_schema(
1729                    &partition_column_name,
1730                    &right.schema(),
1731                )?) as _];
1732                Arc::new(RepartitionExec::try_new(
1733                    right,
1734                    Partitioning::Hash(partition_expr, partition_count),
1735                )?) as _
1736            }
1737            PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1738                right,
1739                Partitioning::Hash(right_expr, partition_count),
1740            )?),
1741            PartitionMode::Auto => {
1742                return internal_err!("Unexpected PartitionMode::Auto in join tests");
1743            }
1744        };
1745
1746        let join = HashJoinExec::try_new(
1747            left_repartitioned,
1748            right_repartitioned,
1749            on,
1750            None,
1751            join_type,
1752            None,
1753            partition_mode,
1754            null_equality,
1755        )?;
1756
1757        let columns = columns(&join.schema());
1758
1759        let mut batches = vec![];
1760        for i in 0..partition_count {
1761            let stream = join.execute(i, Arc::clone(&context))?;
1762            let more_batches = common::collect(stream).await?;
1763            batches.extend(
1764                more_batches
1765                    .into_iter()
1766                    .filter(|b| b.num_rows() > 0)
1767                    .collect::<Vec<_>>(),
1768            );
1769        }
1770        let metrics = join.metrics().unwrap();
1771
1772        Ok((columns, batches, metrics))
1773    }
1774
1775    #[apply(batch_sizes)]
1776    #[tokio::test]
1777    async fn join_inner_one(batch_size: usize) -> Result<()> {
1778        let task_ctx = prepare_task_ctx(batch_size);
1779        let left = build_table(
1780            ("a1", &vec![1, 2, 3]),
1781            ("b1", &vec![4, 5, 5]), // this has a repetition
1782            ("c1", &vec![7, 8, 9]),
1783        );
1784        let right = build_table(
1785            ("a2", &vec![10, 20, 30]),
1786            ("b1", &vec![4, 5, 6]),
1787            ("c2", &vec![70, 80, 90]),
1788        );
1789
1790        let on = vec![(
1791            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1792            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1793        )];
1794
1795        let (columns, batches, metrics) = join_collect(
1796            Arc::clone(&left),
1797            Arc::clone(&right),
1798            on.clone(),
1799            &JoinType::Inner,
1800            NullEquality::NullEqualsNothing,
1801            task_ctx,
1802        )
1803        .await?;
1804
1805        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1806
1807        allow_duplicates! {
1808            // Inner join output is expected to preserve both inputs order
1809            assert_snapshot!(batches_to_string(&batches), @r"
1810            +----+----+----+----+----+----+
1811            | a1 | b1 | c1 | a2 | b1 | c2 |
1812            +----+----+----+----+----+----+
1813            | 1  | 4  | 7  | 10 | 4  | 70 |
1814            | 2  | 5  | 8  | 20 | 5  | 80 |
1815            | 3  | 5  | 9  | 20 | 5  | 80 |
1816            +----+----+----+----+----+----+
1817            ");
1818        }
1819
1820        assert_join_metrics!(metrics, 3);
1821
1822        Ok(())
1823    }
1824
1825    #[apply(batch_sizes)]
1826    #[tokio::test]
1827    async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1828        let task_ctx = prepare_task_ctx(batch_size);
1829        let left = build_table(
1830            ("a1", &vec![1, 2, 3]),
1831            ("b1", &vec![4, 5, 5]), // this has a repetition
1832            ("c1", &vec![7, 8, 9]),
1833        );
1834        let right = build_table(
1835            ("a2", &vec![10, 20, 30]),
1836            ("b1", &vec![4, 5, 6]),
1837            ("c2", &vec![70, 80, 90]),
1838        );
1839        let on = vec![(
1840            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1841            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1842        )];
1843
1844        let (columns, batches, metrics) = partitioned_join_collect(
1845            Arc::clone(&left),
1846            Arc::clone(&right),
1847            on.clone(),
1848            &JoinType::Inner,
1849            NullEquality::NullEqualsNothing,
1850            task_ctx,
1851        )
1852        .await?;
1853
1854        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1855
1856        allow_duplicates! {
1857            assert_snapshot!(batches_to_sort_string(&batches), @r"
1858            +----+----+----+----+----+----+
1859            | a1 | b1 | c1 | a2 | b1 | c2 |
1860            +----+----+----+----+----+----+
1861            | 1  | 4  | 7  | 10 | 4  | 70 |
1862            | 2  | 5  | 8  | 20 | 5  | 80 |
1863            | 3  | 5  | 9  | 20 | 5  | 80 |
1864            +----+----+----+----+----+----+
1865            ");
1866        }
1867
1868        assert_join_metrics!(metrics, 3);
1869
1870        Ok(())
1871    }
1872
1873    #[tokio::test]
1874    async fn join_inner_one_no_shared_column_names() -> Result<()> {
1875        let task_ctx = Arc::new(TaskContext::default());
1876        let left = build_table(
1877            ("a1", &vec![1, 2, 3]),
1878            ("b1", &vec![4, 5, 5]), // this has a repetition
1879            ("c1", &vec![7, 8, 9]),
1880        );
1881        let right = build_table(
1882            ("a2", &vec![10, 20, 30]),
1883            ("b2", &vec![4, 5, 6]),
1884            ("c2", &vec![70, 80, 90]),
1885        );
1886        let on = vec![(
1887            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1888            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1889        )];
1890
1891        let (columns, batches, metrics) = join_collect(
1892            left,
1893            right,
1894            on,
1895            &JoinType::Inner,
1896            NullEquality::NullEqualsNothing,
1897            task_ctx,
1898        )
1899        .await?;
1900
1901        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1902
1903        // Inner join output is expected to preserve both inputs order
1904        allow_duplicates! {
1905            assert_snapshot!(batches_to_string(&batches), @r"
1906            +----+----+----+----+----+----+
1907            | a1 | b1 | c1 | a2 | b2 | c2 |
1908            +----+----+----+----+----+----+
1909            | 1  | 4  | 7  | 10 | 4  | 70 |
1910            | 2  | 5  | 8  | 20 | 5  | 80 |
1911            | 3  | 5  | 9  | 20 | 5  | 80 |
1912            +----+----+----+----+----+----+
1913            ");
1914        }
1915
1916        assert_join_metrics!(metrics, 3);
1917
1918        Ok(())
1919    }
1920
1921    #[tokio::test]
1922    async fn join_inner_one_randomly_ordered() -> Result<()> {
1923        let task_ctx = Arc::new(TaskContext::default());
1924        let left = build_table(
1925            ("a1", &vec![0, 3, 2, 1]),
1926            ("b1", &vec![4, 5, 5, 4]),
1927            ("c1", &vec![6, 9, 8, 7]),
1928        );
1929        let right = build_table(
1930            ("a2", &vec![20, 30, 10]),
1931            ("b2", &vec![5, 6, 4]),
1932            ("c2", &vec![80, 90, 70]),
1933        );
1934        let on = vec![(
1935            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1936            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1937        )];
1938
1939        let (columns, batches, metrics) = join_collect(
1940            left,
1941            right,
1942            on,
1943            &JoinType::Inner,
1944            NullEquality::NullEqualsNothing,
1945            task_ctx,
1946        )
1947        .await?;
1948
1949        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1950
1951        // Inner join output is expected to preserve both inputs order
1952        allow_duplicates! {
1953            assert_snapshot!(batches_to_string(&batches), @r"
1954            +----+----+----+----+----+----+
1955            | a1 | b1 | c1 | a2 | b2 | c2 |
1956            +----+----+----+----+----+----+
1957            | 3  | 5  | 9  | 20 | 5  | 80 |
1958            | 2  | 5  | 8  | 20 | 5  | 80 |
1959            | 0  | 4  | 6  | 10 | 4  | 70 |
1960            | 1  | 4  | 7  | 10 | 4  | 70 |
1961            +----+----+----+----+----+----+
1962            ");
1963        }
1964
1965        assert_join_metrics!(metrics, 4);
1966
1967        Ok(())
1968    }
1969
1970    #[apply(batch_sizes)]
1971    #[tokio::test]
1972    async fn join_inner_two(batch_size: usize) -> Result<()> {
1973        let task_ctx = prepare_task_ctx(batch_size);
1974        let left = build_table(
1975            ("a1", &vec![1, 2, 2]),
1976            ("b2", &vec![1, 2, 2]),
1977            ("c1", &vec![7, 8, 9]),
1978        );
1979        let right = build_table(
1980            ("a1", &vec![1, 2, 3]),
1981            ("b2", &vec![1, 2, 2]),
1982            ("c2", &vec![70, 80, 90]),
1983        );
1984        let on = vec![
1985            (
1986                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1987                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1988            ),
1989            (
1990                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1991                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1992            ),
1993        ];
1994
1995        let (columns, batches, metrics) = join_collect(
1996            left,
1997            right,
1998            on,
1999            &JoinType::Inner,
2000            NullEquality::NullEqualsNothing,
2001            task_ctx,
2002        )
2003        .await?;
2004
2005        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2006
2007        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2008            // Expected number of hash table matches = 3
2009            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2010            let mut expected_batch_count = div_ceil(3, batch_size);
2011            if batch_size == 1 {
2012                expected_batch_count += 1;
2013            }
2014            expected_batch_count
2015        } else {
2016            // With hash collisions enabled, all records will match each other
2017            // and filtered later.
2018            div_ceil(9, batch_size)
2019        };
2020
2021        // With batch coalescing, we may have fewer batches than expected
2022        assert!(
2023            batches.len() <= expected_batch_count,
2024            "expected at most {expected_batch_count} batches, got {}",
2025            batches.len()
2026        );
2027
2028        // Inner join output is expected to preserve both inputs order
2029        allow_duplicates! {
2030            assert_snapshot!(batches_to_string(&batches), @r"
2031            +----+----+----+----+----+----+
2032            | a1 | b2 | c1 | a1 | b2 | c2 |
2033            +----+----+----+----+----+----+
2034            | 1  | 1  | 7  | 1  | 1  | 70 |
2035            | 2  | 2  | 8  | 2  | 2  | 80 |
2036            | 2  | 2  | 9  | 2  | 2  | 80 |
2037            +----+----+----+----+----+----+
2038            ");
2039        }
2040
2041        assert_join_metrics!(metrics, 3);
2042
2043        Ok(())
2044    }
2045
2046    /// Test where the left has 2 parts, the right with 1 part => 1 part
2047    #[apply(batch_sizes)]
2048    #[tokio::test]
2049    async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
2050        let task_ctx = prepare_task_ctx(batch_size);
2051        let batch1 = build_table_i32(
2052            ("a1", &vec![1, 2]),
2053            ("b2", &vec![1, 2]),
2054            ("c1", &vec![7, 8]),
2055        );
2056        let batch2 =
2057            build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2058        let schema = batch1.schema();
2059        let left =
2060            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2061                .unwrap();
2062        let left = Arc::new(CoalescePartitionsExec::new(left));
2063
2064        let right = build_table(
2065            ("a1", &vec![1, 2, 3]),
2066            ("b2", &vec![1, 2, 2]),
2067            ("c2", &vec![70, 80, 90]),
2068        );
2069        let on = vec![
2070            (
2071                Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2072                Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2073            ),
2074            (
2075                Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2076                Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2077            ),
2078        ];
2079
2080        let (columns, batches, metrics) = join_collect(
2081            left,
2082            right,
2083            on,
2084            &JoinType::Inner,
2085            NullEquality::NullEqualsNothing,
2086            task_ctx,
2087        )
2088        .await?;
2089
2090        assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2091
2092        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2093            // Expected number of hash table matches = 3
2094            // in case batch_size is 1 - additional empty batch for remaining 3-2 row
2095            let mut expected_batch_count = div_ceil(3, batch_size);
2096            if batch_size == 1 {
2097                expected_batch_count += 1;
2098            }
2099            expected_batch_count
2100        } else {
2101            // With hash collisions enabled, all records will match each other
2102            // and filtered later.
2103            div_ceil(9, batch_size)
2104        };
2105
2106        // With batch coalescing, we may have fewer batches than expected
2107        assert!(
2108            batches.len() <= expected_batch_count,
2109            "expected at most {expected_batch_count} batches, got {}",
2110            batches.len()
2111        );
2112
2113        // Inner join output is expected to preserve both inputs order
2114        allow_duplicates! {
2115            assert_snapshot!(batches_to_string(&batches), @r"
2116            +----+----+----+----+----+----+
2117            | a1 | b2 | c1 | a1 | b2 | c2 |
2118            +----+----+----+----+----+----+
2119            | 1  | 1  | 7  | 1  | 1  | 70 |
2120            | 2  | 2  | 8  | 2  | 2  | 80 |
2121            | 2  | 2  | 9  | 2  | 2  | 80 |
2122            +----+----+----+----+----+----+
2123            ");
2124        }
2125
2126        assert_join_metrics!(metrics, 3);
2127
2128        Ok(())
2129    }
2130
2131    #[tokio::test]
2132    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2133        let task_ctx = Arc::new(TaskContext::default());
2134        let batch1 = build_table_i32(
2135            ("a1", &vec![0, 3]),
2136            ("b1", &vec![4, 5]),
2137            ("c1", &vec![6, 9]),
2138        );
2139        let batch2 = build_table_i32(
2140            ("a1", &vec![2, 1]),
2141            ("b1", &vec![5, 4]),
2142            ("c1", &vec![8, 7]),
2143        );
2144        let schema = batch1.schema();
2145
2146        let left =
2147            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2148                .unwrap();
2149        let left = Arc::new(CoalescePartitionsExec::new(left));
2150        let right = build_table(
2151            ("a2", &vec![20, 30, 10]),
2152            ("b2", &vec![5, 6, 4]),
2153            ("c2", &vec![80, 90, 70]),
2154        );
2155        let on = vec![(
2156            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2157            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2158        )];
2159
2160        let (columns, batches, metrics) = join_collect(
2161            left,
2162            right,
2163            on,
2164            &JoinType::Inner,
2165            NullEquality::NullEqualsNothing,
2166            task_ctx,
2167        )
2168        .await?;
2169
2170        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2171
2172        // Inner join output is expected to preserve both inputs order
2173        allow_duplicates! {
2174            assert_snapshot!(batches_to_string(&batches), @r"
2175            +----+----+----+----+----+----+
2176            | a1 | b1 | c1 | a2 | b2 | c2 |
2177            +----+----+----+----+----+----+
2178            | 3  | 5  | 9  | 20 | 5  | 80 |
2179            | 2  | 5  | 8  | 20 | 5  | 80 |
2180            | 0  | 4  | 6  | 10 | 4  | 70 |
2181            | 1  | 4  | 7  | 10 | 4  | 70 |
2182            +----+----+----+----+----+----+
2183            ");
2184        }
2185
2186        assert_join_metrics!(metrics, 4);
2187
2188        Ok(())
2189    }
2190
2191    /// Test where the left has 1 part, the right has 2 parts => 2 parts
2192    #[apply(batch_sizes)]
2193    #[tokio::test]
2194    async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
2195        let task_ctx = prepare_task_ctx(batch_size);
2196        let left = build_table(
2197            ("a1", &vec![1, 2, 3]),
2198            ("b1", &vec![4, 5, 5]), // this has a repetition
2199            ("c1", &vec![7, 8, 9]),
2200        );
2201
2202        let batch1 = build_table_i32(
2203            ("a2", &vec![10, 20]),
2204            ("b1", &vec![4, 6]),
2205            ("c2", &vec![70, 80]),
2206        );
2207        let batch2 =
2208            build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2209        let schema = batch1.schema();
2210        let right =
2211            TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2212                .unwrap();
2213
2214        let on = vec![(
2215            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2216            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2217        )];
2218
2219        let join = join(
2220            left,
2221            right,
2222            on,
2223            &JoinType::Inner,
2224            NullEquality::NullEqualsNothing,
2225        )?;
2226
2227        let columns = columns(&join.schema());
2228        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2229
2230        // first part
2231        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2232        let batches = common::collect(stream).await?;
2233
2234        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2235            // Expected number of hash table matches for first right batch = 1
2236            // and additional empty batch for non-joined 20-6-80
2237            let mut expected_batch_count = div_ceil(1, batch_size);
2238            if batch_size == 1 {
2239                expected_batch_count += 1;
2240            }
2241            expected_batch_count
2242        } else {
2243            // With hash collisions enabled, all records will match each other
2244            // and filtered later.
2245            div_ceil(6, batch_size)
2246        };
2247        // With batch coalescing, we may have fewer batches than expected
2248        assert!(
2249            batches.len() <= expected_batch_count,
2250            "expected at most {expected_batch_count} batches, got {}",
2251            batches.len()
2252        );
2253
2254        // Inner join output is expected to preserve both inputs order
2255        allow_duplicates! {
2256            assert_snapshot!(batches_to_string(&batches), @r"
2257            +----+----+----+----+----+----+
2258            | a1 | b1 | c1 | a2 | b1 | c2 |
2259            +----+----+----+----+----+----+
2260            | 1  | 4  | 7  | 10 | 4  | 70 |
2261            +----+----+----+----+----+----+
2262            ");
2263        }
2264
2265        // second part
2266        let stream = join.execute(1, Arc::clone(&task_ctx))?;
2267        let batches = common::collect(stream).await?;
2268
2269        let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2270            // Expected number of hash table matches for second right batch = 2
2271            div_ceil(2, batch_size)
2272        } else {
2273            // With hash collisions enabled, all records will match each other
2274            // and filtered later.
2275            div_ceil(3, batch_size)
2276        };
2277        // With batch coalescing, we may have fewer batches than expected
2278        assert!(
2279            batches.len() <= expected_batch_count,
2280            "expected at most {expected_batch_count} batches, got {}",
2281            batches.len()
2282        );
2283
2284        // Inner join output is expected to preserve both inputs order
2285        allow_duplicates! {
2286            assert_snapshot!(batches_to_string(&batches), @r"
2287            +----+----+----+----+----+----+
2288            | a1 | b1 | c1 | a2 | b1 | c2 |
2289            +----+----+----+----+----+----+
2290            | 2  | 5  | 8  | 30 | 5  | 90 |
2291            | 3  | 5  | 9  | 30 | 5  | 90 |
2292            +----+----+----+----+----+----+
2293            ");
2294        }
2295
2296        Ok(())
2297    }
2298
2299    fn build_table_two_batches(
2300        a: (&str, &Vec<i32>),
2301        b: (&str, &Vec<i32>),
2302        c: (&str, &Vec<i32>),
2303    ) -> Arc<dyn ExecutionPlan> {
2304        let batch = build_table_i32(a, b, c);
2305        let schema = batch.schema();
2306        TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2307    }
2308
2309    #[apply(batch_sizes)]
2310    #[tokio::test]
2311    async fn join_left_multi_batch(batch_size: usize) {
2312        let task_ctx = prepare_task_ctx(batch_size);
2313        let left = build_table(
2314            ("a1", &vec![1, 2, 3]),
2315            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2316            ("c1", &vec![7, 8, 9]),
2317        );
2318        let right = build_table_two_batches(
2319            ("a2", &vec![10, 20, 30]),
2320            ("b1", &vec![4, 5, 6]),
2321            ("c2", &vec![70, 80, 90]),
2322        );
2323        let on = vec![(
2324            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2325            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2326        )];
2327
2328        let join = join(
2329            left,
2330            right,
2331            on,
2332            &JoinType::Left,
2333            NullEquality::NullEqualsNothing,
2334        )
2335        .unwrap();
2336
2337        let columns = columns(&join.schema());
2338        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2339
2340        let stream = join.execute(0, task_ctx).unwrap();
2341        let batches = common::collect(stream).await.unwrap();
2342
2343        allow_duplicates! {
2344            assert_snapshot!(batches_to_sort_string(&batches), @r"
2345            +----+----+----+----+----+----+
2346            | a1 | b1 | c1 | a2 | b1 | c2 |
2347            +----+----+----+----+----+----+
2348            | 1  | 4  | 7  | 10 | 4  | 70 |
2349            | 1  | 4  | 7  | 10 | 4  | 70 |
2350            | 2  | 5  | 8  | 20 | 5  | 80 |
2351            | 2  | 5  | 8  | 20 | 5  | 80 |
2352            | 3  | 7  | 9  |    |    |    |
2353            +----+----+----+----+----+----+
2354            ");
2355        }
2356    }
2357
2358    #[apply(batch_sizes)]
2359    #[tokio::test]
2360    async fn join_full_multi_batch(batch_size: usize) {
2361        let task_ctx = prepare_task_ctx(batch_size);
2362        let left = build_table(
2363            ("a1", &vec![1, 2, 3]),
2364            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2365            ("c1", &vec![7, 8, 9]),
2366        );
2367        // create two identical batches for the right side
2368        let right = build_table_two_batches(
2369            ("a2", &vec![10, 20, 30]),
2370            ("b2", &vec![4, 5, 6]),
2371            ("c2", &vec![70, 80, 90]),
2372        );
2373        let on = vec![(
2374            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2375            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2376        )];
2377
2378        let join = join(
2379            left,
2380            right,
2381            on,
2382            &JoinType::Full,
2383            NullEquality::NullEqualsNothing,
2384        )
2385        .unwrap();
2386
2387        let columns = columns(&join.schema());
2388        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2389
2390        let stream = join.execute(0, task_ctx).unwrap();
2391        let batches = common::collect(stream).await.unwrap();
2392
2393        allow_duplicates! {
2394            assert_snapshot!(batches_to_sort_string(&batches), @r"
2395            +----+----+----+----+----+----+
2396            | a1 | b1 | c1 | a2 | b2 | c2 |
2397            +----+----+----+----+----+----+
2398            |    |    |    | 30 | 6  | 90 |
2399            |    |    |    | 30 | 6  | 90 |
2400            | 1  | 4  | 7  | 10 | 4  | 70 |
2401            | 1  | 4  | 7  | 10 | 4  | 70 |
2402            | 2  | 5  | 8  | 20 | 5  | 80 |
2403            | 2  | 5  | 8  | 20 | 5  | 80 |
2404            | 3  | 7  | 9  |    |    |    |
2405            +----+----+----+----+----+----+
2406            ");
2407        }
2408    }
2409
2410    #[apply(batch_sizes)]
2411    #[tokio::test]
2412    async fn join_left_empty_right(batch_size: usize) {
2413        let task_ctx = prepare_task_ctx(batch_size);
2414        let left = build_table(
2415            ("a1", &vec![1, 2, 3]),
2416            ("b1", &vec![4, 5, 7]),
2417            ("c1", &vec![7, 8, 9]),
2418        );
2419        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2420        let on = vec![(
2421            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2422            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2423        )];
2424        let schema = right.schema();
2425        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2426        let join = join(
2427            left,
2428            right,
2429            on,
2430            &JoinType::Left,
2431            NullEquality::NullEqualsNothing,
2432        )
2433        .unwrap();
2434
2435        let columns = columns(&join.schema());
2436        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2437
2438        let stream = join.execute(0, task_ctx).unwrap();
2439        let batches = common::collect(stream).await.unwrap();
2440
2441        allow_duplicates! {
2442            assert_snapshot!(batches_to_sort_string(&batches), @r"
2443            +----+----+----+----+----+----+
2444            | a1 | b1 | c1 | a2 | b1 | c2 |
2445            +----+----+----+----+----+----+
2446            | 1  | 4  | 7  |    |    |    |
2447            | 2  | 5  | 8  |    |    |    |
2448            | 3  | 7  | 9  |    |    |    |
2449            +----+----+----+----+----+----+
2450            ");
2451        }
2452    }
2453
2454    #[apply(batch_sizes)]
2455    #[tokio::test]
2456    async fn join_full_empty_right(batch_size: usize) {
2457        let task_ctx = prepare_task_ctx(batch_size);
2458        let left = build_table(
2459            ("a1", &vec![1, 2, 3]),
2460            ("b1", &vec![4, 5, 7]),
2461            ("c1", &vec![7, 8, 9]),
2462        );
2463        let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2464        let on = vec![(
2465            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2466            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2467        )];
2468        let schema = right.schema();
2469        let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2470        let join = join(
2471            left,
2472            right,
2473            on,
2474            &JoinType::Full,
2475            NullEquality::NullEqualsNothing,
2476        )
2477        .unwrap();
2478
2479        let columns = columns(&join.schema());
2480        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2481
2482        let stream = join.execute(0, task_ctx).unwrap();
2483        let batches = common::collect(stream).await.unwrap();
2484
2485        allow_duplicates! {
2486            assert_snapshot!(batches_to_sort_string(&batches), @r"
2487            +----+----+----+----+----+----+
2488            | a1 | b1 | c1 | a2 | b2 | c2 |
2489            +----+----+----+----+----+----+
2490            | 1  | 4  | 7  |    |    |    |
2491            | 2  | 5  | 8  |    |    |    |
2492            | 3  | 7  | 9  |    |    |    |
2493            +----+----+----+----+----+----+
2494            ");
2495        }
2496    }
2497
2498    #[apply(batch_sizes)]
2499    #[tokio::test]
2500    async fn join_left_one(batch_size: usize) -> Result<()> {
2501        let task_ctx = prepare_task_ctx(batch_size);
2502        let left = build_table(
2503            ("a1", &vec![1, 2, 3]),
2504            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2505            ("c1", &vec![7, 8, 9]),
2506        );
2507        let right = build_table(
2508            ("a2", &vec![10, 20, 30]),
2509            ("b1", &vec![4, 5, 6]),
2510            ("c2", &vec![70, 80, 90]),
2511        );
2512        let on = vec![(
2513            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2514            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2515        )];
2516
2517        let (columns, batches, metrics) = join_collect(
2518            Arc::clone(&left),
2519            Arc::clone(&right),
2520            on.clone(),
2521            &JoinType::Left,
2522            NullEquality::NullEqualsNothing,
2523            task_ctx,
2524        )
2525        .await?;
2526
2527        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2528
2529        allow_duplicates! {
2530            assert_snapshot!(batches_to_sort_string(&batches), @r"
2531            +----+----+----+----+----+----+
2532            | a1 | b1 | c1 | a2 | b1 | c2 |
2533            +----+----+----+----+----+----+
2534            | 1  | 4  | 7  | 10 | 4  | 70 |
2535            | 2  | 5  | 8  | 20 | 5  | 80 |
2536            | 3  | 7  | 9  |    |    |    |
2537            +----+----+----+----+----+----+
2538            ");
2539        }
2540
2541        assert_join_metrics!(metrics, 3);
2542
2543        Ok(())
2544    }
2545
2546    #[apply(batch_sizes)]
2547    #[tokio::test]
2548    async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2549        let task_ctx = prepare_task_ctx(batch_size);
2550        let left = build_table(
2551            ("a1", &vec![1, 2, 3]),
2552            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
2553            ("c1", &vec![7, 8, 9]),
2554        );
2555        let right = build_table(
2556            ("a2", &vec![10, 20, 30]),
2557            ("b1", &vec![4, 5, 6]),
2558            ("c2", &vec![70, 80, 90]),
2559        );
2560        let on = vec![(
2561            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2562            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2563        )];
2564
2565        let (columns, batches, metrics) = partitioned_join_collect(
2566            Arc::clone(&left),
2567            Arc::clone(&right),
2568            on.clone(),
2569            &JoinType::Left,
2570            NullEquality::NullEqualsNothing,
2571            task_ctx,
2572        )
2573        .await?;
2574
2575        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2576
2577        allow_duplicates! {
2578            assert_snapshot!(batches_to_sort_string(&batches), @r"
2579            +----+----+----+----+----+----+
2580            | a1 | b1 | c1 | a2 | b1 | c2 |
2581            +----+----+----+----+----+----+
2582            | 1  | 4  | 7  | 10 | 4  | 70 |
2583            | 2  | 5  | 8  | 20 | 5  | 80 |
2584            | 3  | 7  | 9  |    |    |    |
2585            +----+----+----+----+----+----+
2586            ");
2587        }
2588
2589        assert_join_metrics!(metrics, 3);
2590
2591        Ok(())
2592    }
2593
2594    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2595        // just two line match
2596        // b1 = 10
2597        build_table(
2598            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2599            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2600            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2601        )
2602    }
2603
2604    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2605        // just two line match
2606        // b2 = 10
2607        build_table(
2608            ("a2", &vec![8, 12, 6, 2, 10, 4]),
2609            ("b2", &vec![8, 10, 6, 2, 10, 4]),
2610            ("c2", &vec![20, 40, 60, 80, 100, 120]),
2611        )
2612    }
2613
2614    #[apply(batch_sizes)]
2615    #[tokio::test]
2616    async fn join_left_semi(batch_size: usize) -> Result<()> {
2617        let task_ctx = prepare_task_ctx(batch_size);
2618        let left = build_semi_anti_left_table();
2619        let right = build_semi_anti_right_table();
2620        // left_table left semi join right_table on left_table.b1 = right_table.b2
2621        let on = vec![(
2622            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2623            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2624        )];
2625
2626        let join = join(
2627            left,
2628            right,
2629            on,
2630            &JoinType::LeftSemi,
2631            NullEquality::NullEqualsNothing,
2632        )?;
2633
2634        let columns = columns(&join.schema());
2635        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2636
2637        let stream = join.execute(0, task_ctx)?;
2638        let batches = common::collect(stream).await?;
2639
2640        // ignore the order
2641        allow_duplicates! {
2642            assert_snapshot!(batches_to_sort_string(&batches), @r"
2643            +----+----+-----+
2644            | a1 | b1 | c1  |
2645            +----+----+-----+
2646            | 11 | 8  | 110 |
2647            | 13 | 10 | 130 |
2648            | 9  | 8  | 90  |
2649            +----+----+-----+
2650            ");
2651        }
2652
2653        Ok(())
2654    }
2655
2656    #[apply(batch_sizes)]
2657    #[tokio::test]
2658    async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2659        let task_ctx = prepare_task_ctx(batch_size);
2660        let left = build_semi_anti_left_table();
2661        let right = build_semi_anti_right_table();
2662
2663        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 10
2664        let on = vec![(
2665            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2666            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2667        )];
2668
2669        let column_indices = vec![ColumnIndex {
2670            index: 0,
2671            side: JoinSide::Right,
2672        }];
2673        let intermediate_schema =
2674            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2675
2676        let filter_expression = Arc::new(BinaryExpr::new(
2677            Arc::new(Column::new("x", 0)),
2678            Operator::NotEq,
2679            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2680        )) as Arc<dyn PhysicalExpr>;
2681
2682        let filter = JoinFilter::new(
2683            filter_expression,
2684            column_indices.clone(),
2685            Arc::new(intermediate_schema.clone()),
2686        );
2687
2688        let join = join_with_filter(
2689            Arc::clone(&left),
2690            Arc::clone(&right),
2691            on.clone(),
2692            filter,
2693            &JoinType::LeftSemi,
2694            NullEquality::NullEqualsNothing,
2695        )?;
2696
2697        let columns_header = columns(&join.schema());
2698        assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2699
2700        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2701        let batches = common::collect(stream).await?;
2702
2703        allow_duplicates! {
2704            assert_snapshot!(batches_to_sort_string(&batches), @r"
2705            +----+----+-----+
2706            | a1 | b1 | c1  |
2707            +----+----+-----+
2708            | 11 | 8  | 110 |
2709            | 13 | 10 | 130 |
2710            | 9  | 8  | 90  |
2711            +----+----+-----+
2712            ");
2713        }
2714
2715        // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10
2716        let filter_expression = Arc::new(BinaryExpr::new(
2717            Arc::new(Column::new("x", 0)),
2718            Operator::Gt,
2719            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2720        )) as Arc<dyn PhysicalExpr>;
2721        let filter = JoinFilter::new(
2722            filter_expression,
2723            column_indices,
2724            Arc::new(intermediate_schema),
2725        );
2726
2727        let join = join_with_filter(
2728            left,
2729            right,
2730            on,
2731            filter,
2732            &JoinType::LeftSemi,
2733            NullEquality::NullEqualsNothing,
2734        )?;
2735
2736        let columns_header = columns(&join.schema());
2737        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2738
2739        let stream = join.execute(0, task_ctx)?;
2740        let batches = common::collect(stream).await?;
2741
2742        allow_duplicates! {
2743            assert_snapshot!(batches_to_sort_string(&batches), @r"
2744            +----+----+-----+
2745            | a1 | b1 | c1  |
2746            +----+----+-----+
2747            | 13 | 10 | 130 |
2748            +----+----+-----+
2749            ");
2750        }
2751
2752        Ok(())
2753    }
2754
2755    #[apply(batch_sizes)]
2756    #[tokio::test]
2757    async fn join_right_semi(batch_size: usize) -> Result<()> {
2758        let task_ctx = prepare_task_ctx(batch_size);
2759        let left = build_semi_anti_left_table();
2760        let right = build_semi_anti_right_table();
2761
2762        // left_table right semi join right_table on left_table.b1 = right_table.b2
2763        let on = vec![(
2764            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2765            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2766        )];
2767
2768        let join = join(
2769            left,
2770            right,
2771            on,
2772            &JoinType::RightSemi,
2773            NullEquality::NullEqualsNothing,
2774        )?;
2775
2776        let columns = columns(&join.schema());
2777        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2778
2779        let stream = join.execute(0, task_ctx)?;
2780        let batches = common::collect(stream).await?;
2781
2782        // RightSemi join output is expected to preserve right input order
2783        allow_duplicates! {
2784            assert_snapshot!(batches_to_string(&batches), @r"
2785            +----+----+-----+
2786            | a2 | b2 | c2  |
2787            +----+----+-----+
2788            | 8  | 8  | 20  |
2789            | 12 | 10 | 40  |
2790            | 10 | 10 | 100 |
2791            +----+----+-----+
2792            ");
2793        }
2794
2795        Ok(())
2796    }
2797
2798    #[apply(batch_sizes)]
2799    #[tokio::test]
2800    async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2801        let task_ctx = prepare_task_ctx(batch_size);
2802        let left = build_semi_anti_left_table();
2803        let right = build_semi_anti_right_table();
2804
2805        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2806        let on = vec![(
2807            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2808            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2809        )];
2810
2811        let column_indices = vec![ColumnIndex {
2812            index: 0,
2813            side: JoinSide::Left,
2814        }];
2815        let intermediate_schema =
2816            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2817
2818        let filter_expression = Arc::new(BinaryExpr::new(
2819            Arc::new(Column::new("x", 0)),
2820            Operator::NotEq,
2821            Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2822        )) as Arc<dyn PhysicalExpr>;
2823
2824        let filter = JoinFilter::new(
2825            filter_expression,
2826            column_indices.clone(),
2827            Arc::new(intermediate_schema.clone()),
2828        );
2829
2830        let join = join_with_filter(
2831            Arc::clone(&left),
2832            Arc::clone(&right),
2833            on.clone(),
2834            filter,
2835            &JoinType::RightSemi,
2836            NullEquality::NullEqualsNothing,
2837        )?;
2838
2839        let columns = columns(&join.schema());
2840        assert_eq!(columns, vec!["a2", "b2", "c2"]);
2841
2842        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2843        let batches = common::collect(stream).await?;
2844
2845        // RightSemi join output is expected to preserve right input order
2846        allow_duplicates! {
2847            assert_snapshot!(batches_to_string(&batches), @r"
2848            +----+----+-----+
2849            | a2 | b2 | c2  |
2850            +----+----+-----+
2851            | 8  | 8  | 20  |
2852            | 12 | 10 | 40  |
2853            | 10 | 10 | 100 |
2854            +----+----+-----+
2855            ");
2856        }
2857
2858        // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
2859        let filter_expression = Arc::new(BinaryExpr::new(
2860            Arc::new(Column::new("x", 0)),
2861            Operator::Gt,
2862            Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2863        )) as Arc<dyn PhysicalExpr>;
2864
2865        let filter = JoinFilter::new(
2866            filter_expression,
2867            column_indices,
2868            Arc::new(intermediate_schema.clone()),
2869        );
2870
2871        let join = join_with_filter(
2872            left,
2873            right,
2874            on,
2875            filter,
2876            &JoinType::RightSemi,
2877            NullEquality::NullEqualsNothing,
2878        )?;
2879        let stream = join.execute(0, task_ctx)?;
2880        let batches = common::collect(stream).await?;
2881
2882        // RightSemi join output is expected to preserve right input order
2883        allow_duplicates! {
2884            assert_snapshot!(batches_to_string(&batches), @r"
2885            +----+----+-----+
2886            | a2 | b2 | c2  |
2887            +----+----+-----+
2888            | 12 | 10 | 40  |
2889            | 10 | 10 | 100 |
2890            +----+----+-----+
2891            ");
2892        }
2893
2894        Ok(())
2895    }
2896
2897    #[apply(batch_sizes)]
2898    #[tokio::test]
2899    async fn join_left_anti(batch_size: usize) -> Result<()> {
2900        let task_ctx = prepare_task_ctx(batch_size);
2901        let left = build_semi_anti_left_table();
2902        let right = build_semi_anti_right_table();
2903        // left_table left anti join right_table on left_table.b1 = right_table.b2
2904        let on = vec![(
2905            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2906            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2907        )];
2908
2909        let join = join(
2910            left,
2911            right,
2912            on,
2913            &JoinType::LeftAnti,
2914            NullEquality::NullEqualsNothing,
2915        )?;
2916
2917        let columns = columns(&join.schema());
2918        assert_eq!(columns, vec!["a1", "b1", "c1"]);
2919
2920        let stream = join.execute(0, task_ctx)?;
2921        let batches = common::collect(stream).await?;
2922
2923        allow_duplicates! {
2924            assert_snapshot!(batches_to_sort_string(&batches), @r"
2925            +----+----+----+
2926            | a1 | b1 | c1 |
2927            +----+----+----+
2928            | 1  | 1  | 10 |
2929            | 3  | 3  | 30 |
2930            | 5  | 5  | 50 |
2931            | 7  | 7  | 70 |
2932            +----+----+----+
2933            ");
2934        }
2935        Ok(())
2936    }
2937
2938    #[apply(batch_sizes)]
2939    #[tokio::test]
2940    async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2941        let task_ctx = prepare_task_ctx(batch_size);
2942        let left = build_semi_anti_left_table();
2943        let right = build_semi_anti_right_table();
2944        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8
2945        let on = vec![(
2946            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2947            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2948        )];
2949
2950        let column_indices = vec![ColumnIndex {
2951            index: 0,
2952            side: JoinSide::Right,
2953        }];
2954        let intermediate_schema =
2955            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2956        let filter_expression = Arc::new(BinaryExpr::new(
2957            Arc::new(Column::new("x", 0)),
2958            Operator::NotEq,
2959            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2960        )) as Arc<dyn PhysicalExpr>;
2961
2962        let filter = JoinFilter::new(
2963            filter_expression,
2964            column_indices.clone(),
2965            Arc::new(intermediate_schema.clone()),
2966        );
2967
2968        let join = join_with_filter(
2969            Arc::clone(&left),
2970            Arc::clone(&right),
2971            on.clone(),
2972            filter,
2973            &JoinType::LeftAnti,
2974            NullEquality::NullEqualsNothing,
2975        )?;
2976
2977        let columns_header = columns(&join.schema());
2978        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2979
2980        let stream = join.execute(0, Arc::clone(&task_ctx))?;
2981        let batches = common::collect(stream).await?;
2982
2983        allow_duplicates! {
2984            assert_snapshot!(batches_to_sort_string(&batches), @r"
2985            +----+----+-----+
2986            | a1 | b1 | c1  |
2987            +----+----+-----+
2988            | 1  | 1  | 10  |
2989            | 11 | 8  | 110 |
2990            | 3  | 3  | 30  |
2991            | 5  | 5  | 50  |
2992            | 7  | 7  | 70  |
2993            | 9  | 8  | 90  |
2994            +----+----+-----+
2995            ");
2996        }
2997
2998        // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13
2999        let filter_expression = Arc::new(BinaryExpr::new(
3000            Arc::new(Column::new("x", 0)),
3001            Operator::NotEq,
3002            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3003        )) as Arc<dyn PhysicalExpr>;
3004
3005        let filter = JoinFilter::new(
3006            filter_expression,
3007            column_indices,
3008            Arc::new(intermediate_schema),
3009        );
3010
3011        let join = join_with_filter(
3012            left,
3013            right,
3014            on,
3015            filter,
3016            &JoinType::LeftAnti,
3017            NullEquality::NullEqualsNothing,
3018        )?;
3019
3020        let columns_header = columns(&join.schema());
3021        assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3022
3023        let stream = join.execute(0, task_ctx)?;
3024        let batches = common::collect(stream).await?;
3025
3026        allow_duplicates! {
3027            assert_snapshot!(batches_to_sort_string(&batches), @r"
3028            +----+----+-----+
3029            | a1 | b1 | c1  |
3030            +----+----+-----+
3031            | 1  | 1  | 10  |
3032            | 11 | 8  | 110 |
3033            | 3  | 3  | 30  |
3034            | 5  | 5  | 50  |
3035            | 7  | 7  | 70  |
3036            | 9  | 8  | 90  |
3037            +----+----+-----+
3038            ");
3039        }
3040
3041        Ok(())
3042    }
3043
3044    #[apply(batch_sizes)]
3045    #[tokio::test]
3046    async fn join_right_anti(batch_size: usize) -> Result<()> {
3047        let task_ctx = prepare_task_ctx(batch_size);
3048        let left = build_semi_anti_left_table();
3049        let right = build_semi_anti_right_table();
3050        let on = vec![(
3051            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3052            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3053        )];
3054
3055        let join = join(
3056            left,
3057            right,
3058            on,
3059            &JoinType::RightAnti,
3060            NullEquality::NullEqualsNothing,
3061        )?;
3062
3063        let columns = columns(&join.schema());
3064        assert_eq!(columns, vec!["a2", "b2", "c2"]);
3065
3066        let stream = join.execute(0, task_ctx)?;
3067        let batches = common::collect(stream).await?;
3068
3069        // RightAnti join output is expected to preserve right input order
3070        allow_duplicates! {
3071            assert_snapshot!(batches_to_string(&batches), @r"
3072            +----+----+-----+
3073            | a2 | b2 | c2  |
3074            +----+----+-----+
3075            | 6  | 6  | 60  |
3076            | 2  | 2  | 80  |
3077            | 4  | 4  | 120 |
3078            +----+----+-----+
3079            ");
3080        }
3081        Ok(())
3082    }
3083
3084    #[apply(batch_sizes)]
3085    #[tokio::test]
3086    async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
3087        let task_ctx = prepare_task_ctx(batch_size);
3088        let left = build_semi_anti_left_table();
3089        let right = build_semi_anti_right_table();
3090        // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13
3091        let on = vec![(
3092            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3093            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3094        )];
3095
3096        let column_indices = vec![ColumnIndex {
3097            index: 0,
3098            side: JoinSide::Left,
3099        }];
3100        let intermediate_schema =
3101            Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3102
3103        let filter_expression = Arc::new(BinaryExpr::new(
3104            Arc::new(Column::new("x", 0)),
3105            Operator::NotEq,
3106            Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3107        )) as Arc<dyn PhysicalExpr>;
3108
3109        let filter = JoinFilter::new(
3110            filter_expression,
3111            column_indices,
3112            Arc::new(intermediate_schema.clone()),
3113        );
3114
3115        let join = join_with_filter(
3116            Arc::clone(&left),
3117            Arc::clone(&right),
3118            on.clone(),
3119            filter,
3120            &JoinType::RightAnti,
3121            NullEquality::NullEqualsNothing,
3122        )?;
3123
3124        let columns_header = columns(&join.schema());
3125        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3126
3127        let stream = join.execute(0, Arc::clone(&task_ctx))?;
3128        let batches = common::collect(stream).await?;
3129
3130        // RightAnti join output is expected to preserve right input order
3131        allow_duplicates! {
3132            assert_snapshot!(batches_to_string(&batches), @r"
3133            +----+----+-----+
3134            | a2 | b2 | c2  |
3135            +----+----+-----+
3136            | 12 | 10 | 40  |
3137            | 6  | 6  | 60  |
3138            | 2  | 2  | 80  |
3139            | 10 | 10 | 100 |
3140            | 4  | 4  | 120 |
3141            +----+----+-----+
3142            ");
3143        }
3144
3145        // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
3146        let column_indices = vec![ColumnIndex {
3147            index: 1,
3148            side: JoinSide::Right,
3149        }];
3150        let filter_expression = Arc::new(BinaryExpr::new(
3151            Arc::new(Column::new("x", 0)),
3152            Operator::NotEq,
3153            Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3154        )) as Arc<dyn PhysicalExpr>;
3155
3156        let filter = JoinFilter::new(
3157            filter_expression,
3158            column_indices,
3159            Arc::new(intermediate_schema),
3160        );
3161
3162        let join = join_with_filter(
3163            left,
3164            right,
3165            on,
3166            filter,
3167            &JoinType::RightAnti,
3168            NullEquality::NullEqualsNothing,
3169        )?;
3170
3171        let columns_header = columns(&join.schema());
3172        assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3173
3174        let stream = join.execute(0, task_ctx)?;
3175        let batches = common::collect(stream).await?;
3176
3177        // RightAnti join output is expected to preserve right input order
3178        allow_duplicates! {
3179            assert_snapshot!(batches_to_string(&batches), @r"
3180            +----+----+-----+
3181            | a2 | b2 | c2  |
3182            +----+----+-----+
3183            | 8  | 8  | 20  |
3184            | 6  | 6  | 60  |
3185            | 2  | 2  | 80  |
3186            | 4  | 4  | 120 |
3187            +----+----+-----+
3188            ");
3189        }
3190
3191        Ok(())
3192    }
3193
3194    #[apply(batch_sizes)]
3195    #[tokio::test]
3196    async fn join_right_one(batch_size: usize) -> Result<()> {
3197        let task_ctx = prepare_task_ctx(batch_size);
3198        let left = build_table(
3199            ("a1", &vec![1, 2, 3]),
3200            ("b1", &vec![4, 5, 7]),
3201            ("c1", &vec![7, 8, 9]),
3202        );
3203        let right = build_table(
3204            ("a2", &vec![10, 20, 30]),
3205            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3206            ("c2", &vec![70, 80, 90]),
3207        );
3208        let on = vec![(
3209            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3210            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3211        )];
3212
3213        let (columns, batches, metrics) = join_collect(
3214            left,
3215            right,
3216            on,
3217            &JoinType::Right,
3218            NullEquality::NullEqualsNothing,
3219            task_ctx,
3220        )
3221        .await?;
3222
3223        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3224
3225        allow_duplicates! {
3226            assert_snapshot!(batches_to_sort_string(&batches), @r"
3227            +----+----+----+----+----+----+
3228            | a1 | b1 | c1 | a2 | b1 | c2 |
3229            +----+----+----+----+----+----+
3230            |    |    |    | 30 | 6  | 90 |
3231            | 1  | 4  | 7  | 10 | 4  | 70 |
3232            | 2  | 5  | 8  | 20 | 5  | 80 |
3233            +----+----+----+----+----+----+
3234            ");
3235        }
3236
3237        assert_join_metrics!(metrics, 3);
3238
3239        Ok(())
3240    }
3241
3242    #[apply(batch_sizes)]
3243    #[tokio::test]
3244    async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3245        let task_ctx = prepare_task_ctx(batch_size);
3246        let left = build_table(
3247            ("a1", &vec![1, 2, 3]),
3248            ("b1", &vec![4, 5, 7]),
3249            ("c1", &vec![7, 8, 9]),
3250        );
3251        let right = build_table(
3252            ("a2", &vec![10, 20, 30]),
3253            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3254            ("c2", &vec![70, 80, 90]),
3255        );
3256        let on = vec![(
3257            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3258            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3259        )];
3260
3261        let (columns, batches, metrics) = partitioned_join_collect(
3262            left,
3263            right,
3264            on,
3265            &JoinType::Right,
3266            NullEquality::NullEqualsNothing,
3267            task_ctx,
3268        )
3269        .await?;
3270
3271        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3272
3273        allow_duplicates! {
3274            assert_snapshot!(batches_to_sort_string(&batches), @r"
3275            +----+----+----+----+----+----+
3276            | a1 | b1 | c1 | a2 | b1 | c2 |
3277            +----+----+----+----+----+----+
3278            |    |    |    | 30 | 6  | 90 |
3279            | 1  | 4  | 7  | 10 | 4  | 70 |
3280            | 2  | 5  | 8  | 20 | 5  | 80 |
3281            +----+----+----+----+----+----+
3282            ");
3283        }
3284
3285        assert_join_metrics!(metrics, 3);
3286
3287        Ok(())
3288    }
3289
3290    #[apply(batch_sizes)]
3291    #[tokio::test]
3292    async fn join_full_one(batch_size: usize) -> Result<()> {
3293        let task_ctx = prepare_task_ctx(batch_size);
3294        let left = build_table(
3295            ("a1", &vec![1, 2, 3]),
3296            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3297            ("c1", &vec![7, 8, 9]),
3298        );
3299        let right = build_table(
3300            ("a2", &vec![10, 20, 30]),
3301            ("b2", &vec![4, 5, 6]),
3302            ("c2", &vec![70, 80, 90]),
3303        );
3304        let on = vec![(
3305            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3306            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3307        )];
3308
3309        let join = join(
3310            left,
3311            right,
3312            on,
3313            &JoinType::Full,
3314            NullEquality::NullEqualsNothing,
3315        )?;
3316
3317        let columns = columns(&join.schema());
3318        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3319
3320        let stream = join.execute(0, task_ctx)?;
3321        let batches = common::collect(stream).await?;
3322
3323        allow_duplicates! {
3324            assert_snapshot!(batches_to_sort_string(&batches), @r"
3325            +----+----+----+----+----+----+
3326            | a1 | b1 | c1 | a2 | b2 | c2 |
3327            +----+----+----+----+----+----+
3328            |    |    |    | 30 | 6  | 90 |
3329            | 1  | 4  | 7  | 10 | 4  | 70 |
3330            | 2  | 5  | 8  | 20 | 5  | 80 |
3331            | 3  | 7  | 9  |    |    |    |
3332            +----+----+----+----+----+----+
3333            ");
3334        }
3335
3336        Ok(())
3337    }
3338
3339    #[apply(batch_sizes)]
3340    #[tokio::test]
3341    async fn join_left_mark(batch_size: usize) -> Result<()> {
3342        let task_ctx = prepare_task_ctx(batch_size);
3343        let left = build_table(
3344            ("a1", &vec![1, 2, 3]),
3345            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3346            ("c1", &vec![7, 8, 9]),
3347        );
3348        let right = build_table(
3349            ("a2", &vec![10, 20, 30]),
3350            ("b1", &vec![4, 5, 6]),
3351            ("c2", &vec![70, 80, 90]),
3352        );
3353        let on = vec![(
3354            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3355            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3356        )];
3357
3358        let (columns, batches, metrics) = join_collect(
3359            Arc::clone(&left),
3360            Arc::clone(&right),
3361            on.clone(),
3362            &JoinType::LeftMark,
3363            NullEquality::NullEqualsNothing,
3364            task_ctx,
3365        )
3366        .await?;
3367
3368        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3369
3370        allow_duplicates! {
3371            assert_snapshot!(batches_to_sort_string(&batches), @r"
3372            +----+----+----+-------+
3373            | a1 | b1 | c1 | mark  |
3374            +----+----+----+-------+
3375            | 1  | 4  | 7  | true  |
3376            | 2  | 5  | 8  | true  |
3377            | 3  | 7  | 9  | false |
3378            +----+----+----+-------+
3379            ");
3380        }
3381
3382        assert_join_metrics!(metrics, 3);
3383
3384        Ok(())
3385    }
3386
3387    #[apply(batch_sizes)]
3388    #[tokio::test]
3389    async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3390        let task_ctx = prepare_task_ctx(batch_size);
3391        let left = build_table(
3392            ("a1", &vec![1, 2, 3]),
3393            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3394            ("c1", &vec![7, 8, 9]),
3395        );
3396        let right = build_table(
3397            ("a2", &vec![10, 20, 30, 40]),
3398            ("b1", &vec![4, 4, 5, 6]),
3399            ("c2", &vec![60, 70, 80, 90]),
3400        );
3401        let on = vec![(
3402            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3403            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3404        )];
3405
3406        let (columns, batches, metrics) = partitioned_join_collect(
3407            Arc::clone(&left),
3408            Arc::clone(&right),
3409            on.clone(),
3410            &JoinType::LeftMark,
3411            NullEquality::NullEqualsNothing,
3412            task_ctx,
3413        )
3414        .await?;
3415
3416        assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3417
3418        allow_duplicates! {
3419            assert_snapshot!(batches_to_sort_string(&batches), @r"
3420            +----+----+----+-------+
3421            | a1 | b1 | c1 | mark  |
3422            +----+----+----+-------+
3423            | 1  | 4  | 7  | true  |
3424            | 2  | 5  | 8  | true  |
3425            | 3  | 7  | 9  | false |
3426            +----+----+----+-------+
3427            ");
3428        }
3429
3430        assert_join_metrics!(metrics, 3);
3431
3432        Ok(())
3433    }
3434
3435    #[apply(batch_sizes)]
3436    #[tokio::test]
3437    async fn join_right_mark(batch_size: usize) -> Result<()> {
3438        let task_ctx = prepare_task_ctx(batch_size);
3439        let left = build_table(
3440            ("a1", &vec![1, 2, 3]),
3441            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3442            ("c1", &vec![7, 8, 9]),
3443        );
3444        let right = build_table(
3445            ("a2", &vec![10, 20, 30]),
3446            ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
3447            ("c2", &vec![70, 80, 90]),
3448        );
3449        let on = vec![(
3450            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3451            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3452        )];
3453
3454        let (columns, batches, metrics) = join_collect(
3455            Arc::clone(&left),
3456            Arc::clone(&right),
3457            on.clone(),
3458            &JoinType::RightMark,
3459            NullEquality::NullEqualsNothing,
3460            task_ctx,
3461        )
3462        .await?;
3463
3464        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3465
3466        let expected = [
3467            "+----+----+----+-------+",
3468            "| a2 | b1 | c2 | mark  |",
3469            "+----+----+----+-------+",
3470            "| 10 | 4  | 70 | true  |",
3471            "| 20 | 5  | 80 | true  |",
3472            "| 30 | 6  | 90 | false |",
3473            "+----+----+----+-------+",
3474        ];
3475        assert_batches_sorted_eq!(expected, &batches);
3476
3477        assert_join_metrics!(metrics, 3);
3478
3479        Ok(())
3480    }
3481
3482    #[apply(batch_sizes)]
3483    #[tokio::test]
3484    async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3485        let task_ctx = prepare_task_ctx(batch_size);
3486        let left = build_table(
3487            ("a1", &vec![1, 2, 3]),
3488            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
3489            ("c1", &vec![7, 8, 9]),
3490        );
3491        let right = build_table(
3492            ("a2", &vec![10, 20, 30, 40]),
3493            ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left
3494            ("c2", &vec![60, 70, 80, 90]),
3495        );
3496        let on = vec![(
3497            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3498            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3499        )];
3500
3501        let (columns, batches, metrics) = partitioned_join_collect(
3502            Arc::clone(&left),
3503            Arc::clone(&right),
3504            on.clone(),
3505            &JoinType::RightMark,
3506            NullEquality::NullEqualsNothing,
3507            task_ctx,
3508        )
3509        .await?;
3510
3511        assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3512
3513        let expected = [
3514            "+----+----+----+-------+",
3515            "| a2 | b1 | c2 | mark  |",
3516            "+----+----+----+-------+",
3517            "| 10 | 4  | 60 | true  |",
3518            "| 20 | 4  | 70 | true  |",
3519            "| 30 | 5  | 80 | true  |",
3520            "| 40 | 6  | 90 | false |",
3521            "+----+----+----+-------+",
3522        ];
3523        assert_batches_sorted_eq!(expected, &batches);
3524
3525        assert_join_metrics!(metrics, 4);
3526
3527        Ok(())
3528    }
3529
3530    #[test]
3531    fn join_with_hash_collisions_64() -> Result<()> {
3532        let mut hashmap_left = HashTable::with_capacity(4);
3533        let left = build_table_i32(
3534            ("a", &vec![10, 20]),
3535            ("x", &vec![100, 200]),
3536            ("y", &vec![200, 300]),
3537        );
3538
3539        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3540        let hashes_buff = &mut vec![0; left.num_rows()];
3541        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3542
3543        // Maps both values to both indices (1 and 2, representing input 0 and 1)
3544        // 0 -> (0, 1)
3545        // 1 -> (0, 2)
3546        // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
3547        hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3548        hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3549
3550        hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3551        hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3552
3553        let next = vec![2, 0];
3554
3555        let right = build_table_i32(
3556            ("a", &vec![10, 20]),
3557            ("b", &vec![0, 0]),
3558            ("c", &vec![30, 40]),
3559        );
3560
3561        // Join key column for both join sides
3562        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3563
3564        let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3565
3566        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3567        let right_keys_values =
3568            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3569        let mut hashes_buffer = vec![0; right.num_rows()];
3570        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3571
3572        let mut probe_indices_buffer = Vec::new();
3573        let mut build_indices_buffer = Vec::new();
3574        let (l, r, _) = lookup_join_hashmap(
3575            &join_hash_map,
3576            &[left_keys_values],
3577            &[right_keys_values],
3578            NullEquality::NullEqualsNothing,
3579            &hashes_buffer,
3580            8192,
3581            (0, None),
3582            &mut probe_indices_buffer,
3583            &mut build_indices_buffer,
3584        )?;
3585
3586        let left_ids: UInt64Array = vec![0, 1].into();
3587
3588        let right_ids: UInt32Array = vec![0, 1].into();
3589
3590        assert_eq!(left_ids, l);
3591
3592        assert_eq!(right_ids, r);
3593
3594        Ok(())
3595    }
3596
3597    #[test]
3598    fn join_with_hash_collisions_u32() -> Result<()> {
3599        let mut hashmap_left = HashTable::with_capacity(4);
3600        let left = build_table_i32(
3601            ("a", &vec![10, 20]),
3602            ("x", &vec![100, 200]),
3603            ("y", &vec![200, 300]),
3604        );
3605
3606        let random_state = RandomState::with_seeds(0, 0, 0, 0);
3607        let hashes_buff = &mut vec![0; left.num_rows()];
3608        let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3609
3610        hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3611        hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3612        hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3613        hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3614
3615        let next: Vec<u32> = vec![2, 0];
3616
3617        let right = build_table_i32(
3618            ("a", &vec![10, 20]),
3619            ("b", &vec![0, 0]),
3620            ("c", &vec![30, 40]),
3621        );
3622
3623        let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3624
3625        let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3626
3627        let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3628        let right_keys_values =
3629            key_column.evaluate(&right)?.into_array(right.num_rows())?;
3630        let mut hashes_buffer = vec![0; right.num_rows()];
3631        create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3632
3633        let mut probe_indices_buffer = Vec::new();
3634        let mut build_indices_buffer = Vec::new();
3635        let (l, r, _) = lookup_join_hashmap(
3636            &join_hash_map,
3637            &[left_keys_values],
3638            &[right_keys_values],
3639            NullEquality::NullEqualsNothing,
3640            &hashes_buffer,
3641            8192,
3642            (0, None),
3643            &mut probe_indices_buffer,
3644            &mut build_indices_buffer,
3645        )?;
3646
3647        // We still expect to match rows 0 and 1 on both sides
3648        let left_ids: UInt64Array = vec![0, 1].into();
3649        let right_ids: UInt32Array = vec![0, 1].into();
3650
3651        assert_eq!(left_ids, l);
3652        assert_eq!(right_ids, r);
3653
3654        Ok(())
3655    }
3656
3657    #[tokio::test]
3658    async fn join_with_duplicated_column_names() -> Result<()> {
3659        let task_ctx = Arc::new(TaskContext::default());
3660        let left = build_table(
3661            ("a", &vec![1, 2, 3]),
3662            ("b", &vec![4, 5, 7]),
3663            ("c", &vec![7, 8, 9]),
3664        );
3665        let right = build_table(
3666            ("a", &vec![10, 20, 30]),
3667            ("b", &vec![1, 2, 7]),
3668            ("c", &vec![70, 80, 90]),
3669        );
3670        let on = vec![(
3671            // join on a=b so there are duplicate column names on unjoined columns
3672            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3673            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3674        )];
3675
3676        let join = join(
3677            left,
3678            right,
3679            on,
3680            &JoinType::Inner,
3681            NullEquality::NullEqualsNothing,
3682        )?;
3683
3684        let columns = columns(&join.schema());
3685        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3686
3687        let stream = join.execute(0, task_ctx)?;
3688        let batches = common::collect(stream).await?;
3689
3690        allow_duplicates! {
3691            assert_snapshot!(batches_to_sort_string(&batches), @r"
3692            +---+---+---+----+---+----+
3693            | a | b | c | a  | b | c  |
3694            +---+---+---+----+---+----+
3695            | 1 | 4 | 7 | 10 | 1 | 70 |
3696            | 2 | 5 | 8 | 20 | 2 | 80 |
3697            +---+---+---+----+---+----+
3698            ");
3699        }
3700
3701        Ok(())
3702    }
3703
3704    fn prepare_join_filter() -> JoinFilter {
3705        let column_indices = vec![
3706            ColumnIndex {
3707                index: 2,
3708                side: JoinSide::Left,
3709            },
3710            ColumnIndex {
3711                index: 2,
3712                side: JoinSide::Right,
3713            },
3714        ];
3715        let intermediate_schema = Schema::new(vec![
3716            Field::new("c", DataType::Int32, true),
3717            Field::new("c", DataType::Int32, true),
3718        ]);
3719        let filter_expression = Arc::new(BinaryExpr::new(
3720            Arc::new(Column::new("c", 0)),
3721            Operator::Gt,
3722            Arc::new(Column::new("c", 1)),
3723        )) as Arc<dyn PhysicalExpr>;
3724
3725        JoinFilter::new(
3726            filter_expression,
3727            column_indices,
3728            Arc::new(intermediate_schema),
3729        )
3730    }
3731
3732    #[apply(batch_sizes)]
3733    #[tokio::test]
3734    async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3735        let task_ctx = prepare_task_ctx(batch_size);
3736        let left = build_table(
3737            ("a", &vec![0, 1, 2, 2]),
3738            ("b", &vec![4, 5, 7, 8]),
3739            ("c", &vec![7, 8, 9, 1]),
3740        );
3741        let right = build_table(
3742            ("a", &vec![10, 20, 30, 40]),
3743            ("b", &vec![2, 2, 3, 4]),
3744            ("c", &vec![7, 5, 6, 4]),
3745        );
3746        let on = vec![(
3747            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3748            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3749        )];
3750        let filter = prepare_join_filter();
3751
3752        let join = join_with_filter(
3753            left,
3754            right,
3755            on,
3756            filter,
3757            &JoinType::Inner,
3758            NullEquality::NullEqualsNothing,
3759        )?;
3760
3761        let columns = columns(&join.schema());
3762        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3763
3764        let stream = join.execute(0, task_ctx)?;
3765        let batches = common::collect(stream).await?;
3766
3767        allow_duplicates! {
3768            assert_snapshot!(batches_to_sort_string(&batches), @r"
3769            +---+---+---+----+---+---+
3770            | a | b | c | a  | b | c |
3771            +---+---+---+----+---+---+
3772            | 2 | 7 | 9 | 10 | 2 | 7 |
3773            | 2 | 7 | 9 | 20 | 2 | 5 |
3774            +---+---+---+----+---+---+
3775            ");
3776        }
3777
3778        Ok(())
3779    }
3780
3781    #[apply(batch_sizes)]
3782    #[tokio::test]
3783    async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3784        let task_ctx = prepare_task_ctx(batch_size);
3785        let left = build_table(
3786            ("a", &vec![0, 1, 2, 2]),
3787            ("b", &vec![4, 5, 7, 8]),
3788            ("c", &vec![7, 8, 9, 1]),
3789        );
3790        let right = build_table(
3791            ("a", &vec![10, 20, 30, 40]),
3792            ("b", &vec![2, 2, 3, 4]),
3793            ("c", &vec![7, 5, 6, 4]),
3794        );
3795        let on = vec![(
3796            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3797            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3798        )];
3799        let filter = prepare_join_filter();
3800
3801        let join = join_with_filter(
3802            left,
3803            right,
3804            on,
3805            filter,
3806            &JoinType::Left,
3807            NullEquality::NullEqualsNothing,
3808        )?;
3809
3810        let columns = columns(&join.schema());
3811        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3812
3813        let stream = join.execute(0, task_ctx)?;
3814        let batches = common::collect(stream).await?;
3815
3816        allow_duplicates! {
3817            assert_snapshot!(batches_to_sort_string(&batches), @r"
3818            +---+---+---+----+---+---+
3819            | a | b | c | a  | b | c |
3820            +---+---+---+----+---+---+
3821            | 0 | 4 | 7 |    |   |   |
3822            | 1 | 5 | 8 |    |   |   |
3823            | 2 | 7 | 9 | 10 | 2 | 7 |
3824            | 2 | 7 | 9 | 20 | 2 | 5 |
3825            | 2 | 8 | 1 |    |   |   |
3826            +---+---+---+----+---+---+
3827            ");
3828        }
3829
3830        Ok(())
3831    }
3832
3833    #[apply(batch_sizes)]
3834    #[tokio::test]
3835    async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3836        let task_ctx = prepare_task_ctx(batch_size);
3837        let left = build_table(
3838            ("a", &vec![0, 1, 2, 2]),
3839            ("b", &vec![4, 5, 7, 8]),
3840            ("c", &vec![7, 8, 9, 1]),
3841        );
3842        let right = build_table(
3843            ("a", &vec![10, 20, 30, 40]),
3844            ("b", &vec![2, 2, 3, 4]),
3845            ("c", &vec![7, 5, 6, 4]),
3846        );
3847        let on = vec![(
3848            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3849            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3850        )];
3851        let filter = prepare_join_filter();
3852
3853        let join = join_with_filter(
3854            left,
3855            right,
3856            on,
3857            filter,
3858            &JoinType::Right,
3859            NullEquality::NullEqualsNothing,
3860        )?;
3861
3862        let columns = columns(&join.schema());
3863        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3864
3865        let stream = join.execute(0, task_ctx)?;
3866        let batches = common::collect(stream).await?;
3867
3868        allow_duplicates! {
3869            assert_snapshot!(batches_to_sort_string(&batches), @r"
3870            +---+---+---+----+---+---+
3871            | a | b | c | a  | b | c |
3872            +---+---+---+----+---+---+
3873            |   |   |   | 30 | 3 | 6 |
3874            |   |   |   | 40 | 4 | 4 |
3875            | 2 | 7 | 9 | 10 | 2 | 7 |
3876            | 2 | 7 | 9 | 20 | 2 | 5 |
3877            +---+---+---+----+---+---+
3878            ");
3879        }
3880
3881        Ok(())
3882    }
3883
3884    #[apply(batch_sizes)]
3885    #[tokio::test]
3886    async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3887        let task_ctx = prepare_task_ctx(batch_size);
3888        let left = build_table(
3889            ("a", &vec![0, 1, 2, 2]),
3890            ("b", &vec![4, 5, 7, 8]),
3891            ("c", &vec![7, 8, 9, 1]),
3892        );
3893        let right = build_table(
3894            ("a", &vec![10, 20, 30, 40]),
3895            ("b", &vec![2, 2, 3, 4]),
3896            ("c", &vec![7, 5, 6, 4]),
3897        );
3898        let on = vec![(
3899            Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3900            Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3901        )];
3902        let filter = prepare_join_filter();
3903
3904        let join = join_with_filter(
3905            left,
3906            right,
3907            on,
3908            filter,
3909            &JoinType::Full,
3910            NullEquality::NullEqualsNothing,
3911        )?;
3912
3913        let columns = columns(&join.schema());
3914        assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3915
3916        let stream = join.execute(0, task_ctx)?;
3917        let batches = common::collect(stream).await?;
3918
3919        let expected = [
3920            "+---+---+---+----+---+---+",
3921            "| a | b | c | a  | b | c |",
3922            "+---+---+---+----+---+---+",
3923            "|   |   |   | 30 | 3 | 6 |",
3924            "|   |   |   | 40 | 4 | 4 |",
3925            "| 2 | 7 | 9 | 10 | 2 | 7 |",
3926            "| 2 | 7 | 9 | 20 | 2 | 5 |",
3927            "| 0 | 4 | 7 |    |   |   |",
3928            "| 1 | 5 | 8 |    |   |   |",
3929            "| 2 | 8 | 1 |    |   |   |",
3930            "+---+---+---+----+---+---+",
3931        ];
3932        assert_batches_sorted_eq!(expected, &batches);
3933
3934        // THIS MIGRATION HALTED DUE TO ISSUE #15312
3935        //allow_duplicates! {
3936        //    assert_snapshot!(batches_to_sort_string(&batches), @r#"
3937        //    +---+---+---+----+---+---+
3938        //    | a | b | c | a  | b | c |
3939        //    +---+---+---+----+---+---+
3940        //    |   |   |   | 30 | 3 | 6 |
3941        //    |   |   |   | 40 | 4 | 4 |
3942        //    | 2 | 7 | 9 | 10 | 2 | 7 |
3943        //    | 2 | 7 | 9 | 20 | 2 | 5 |
3944        //    | 0 | 4 | 7 |    |   |   |
3945        //    | 1 | 5 | 8 |    |   |   |
3946        //    | 2 | 8 | 1 |    |   |   |
3947        //    +---+---+---+----+---+---+
3948        //        "#)
3949        //}
3950
3951        Ok(())
3952    }
3953
3954    /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft
3955    #[tokio::test]
3956    async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3957        let task_ctx = Arc::new(TaskContext::default());
3958        let left = build_table(
3959            ("a1", &vec![1, 2, 3]),
3960            ("b1", &vec![4, 5, 7]),
3961            ("c1", &vec![7, 8, 9]),
3962        );
3963        let right = build_table(
3964            ("a2", &vec![10, 20, 30]),
3965            ("b2", &vec![4, 5, 6]),
3966            ("c2", &vec![70, 80, 90]),
3967        );
3968        let on = vec![(
3969            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3970            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3971        )];
3972
3973        let expected_inner = vec![
3974            "+----+----+----+----+----+----+",
3975            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3976            "+----+----+----+----+----+----+",
3977            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3978            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3979            "+----+----+----+----+----+----+",
3980        ];
3981        let expected_left = vec![
3982            "+----+----+----+----+----+----+",
3983            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3984            "+----+----+----+----+----+----+",
3985            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3986            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3987            "| 3  | 7  | 9  |    |    |    |",
3988            "+----+----+----+----+----+----+",
3989        ];
3990        let expected_right = vec![
3991            "+----+----+----+----+----+----+",
3992            "| a1 | b1 | c1 | a2 | b2 | c2 |",
3993            "+----+----+----+----+----+----+",
3994            "|    |    |    | 30 | 6  | 90 |",
3995            "| 1  | 4  | 7  | 10 | 4  | 70 |",
3996            "| 2  | 5  | 8  | 20 | 5  | 80 |",
3997            "+----+----+----+----+----+----+",
3998        ];
3999        let expected_full = vec![
4000            "+----+----+----+----+----+----+",
4001            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4002            "+----+----+----+----+----+----+",
4003            "|    |    |    | 30 | 6  | 90 |",
4004            "| 1  | 4  | 7  | 10 | 4  | 70 |",
4005            "| 2  | 5  | 8  | 20 | 5  | 80 |",
4006            "| 3  | 7  | 9  |    |    |    |",
4007            "+----+----+----+----+----+----+",
4008        ];
4009        let expected_left_semi = vec![
4010            "+----+----+----+",
4011            "| a1 | b1 | c1 |",
4012            "+----+----+----+",
4013            "| 1  | 4  | 7  |",
4014            "| 2  | 5  | 8  |",
4015            "+----+----+----+",
4016        ];
4017        let expected_left_anti = vec![
4018            "+----+----+----+",
4019            "| a1 | b1 | c1 |",
4020            "+----+----+----+",
4021            "| 3  | 7  | 9  |",
4022            "+----+----+----+",
4023        ];
4024        let expected_right_semi = vec![
4025            "+----+----+----+",
4026            "| a2 | b2 | c2 |",
4027            "+----+----+----+",
4028            "| 10 | 4  | 70 |",
4029            "| 20 | 5  | 80 |",
4030            "+----+----+----+",
4031        ];
4032        let expected_right_anti = vec![
4033            "+----+----+----+",
4034            "| a2 | b2 | c2 |",
4035            "+----+----+----+",
4036            "| 30 | 6  | 90 |",
4037            "+----+----+----+",
4038        ];
4039        let expected_left_mark = vec![
4040            "+----+----+----+-------+",
4041            "| a1 | b1 | c1 | mark  |",
4042            "+----+----+----+-------+",
4043            "| 1  | 4  | 7  | true  |",
4044            "| 2  | 5  | 8  | true  |",
4045            "| 3  | 7  | 9  | false |",
4046            "+----+----+----+-------+",
4047        ];
4048        let expected_right_mark = vec![
4049            "+----+----+----+-------+",
4050            "| a2 | b2 | c2 | mark  |",
4051            "+----+----+----+-------+",
4052            "| 10 | 4  | 70 | true  |",
4053            "| 20 | 5  | 80 | true  |",
4054            "| 30 | 6  | 90 | false |",
4055            "+----+----+----+-------+",
4056        ];
4057
4058        let test_cases = vec![
4059            (JoinType::Inner, expected_inner),
4060            (JoinType::Left, expected_left),
4061            (JoinType::Right, expected_right),
4062            (JoinType::Full, expected_full),
4063            (JoinType::LeftSemi, expected_left_semi),
4064            (JoinType::LeftAnti, expected_left_anti),
4065            (JoinType::RightSemi, expected_right_semi),
4066            (JoinType::RightAnti, expected_right_anti),
4067            (JoinType::LeftMark, expected_left_mark),
4068            (JoinType::RightMark, expected_right_mark),
4069        ];
4070
4071        for (join_type, expected) in test_cases {
4072            let (_, batches, metrics) = join_collect_with_partition_mode(
4073                Arc::clone(&left),
4074                Arc::clone(&right),
4075                on.clone(),
4076                &join_type,
4077                PartitionMode::CollectLeft,
4078                NullEquality::NullEqualsNothing,
4079                Arc::clone(&task_ctx),
4080            )
4081            .await?;
4082            assert_batches_sorted_eq!(expected, &batches);
4083            assert_join_metrics!(metrics, expected.len() - 4);
4084        }
4085
4086        Ok(())
4087    }
4088
4089    #[tokio::test]
4090    async fn join_date32() -> Result<()> {
4091        let schema = Arc::new(Schema::new(vec![
4092            Field::new("date", DataType::Date32, false),
4093            Field::new("n", DataType::Int32, false),
4094        ]));
4095
4096        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4097        let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4098        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4099        let left =
4100            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4101                .unwrap();
4102        let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4103        let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4104        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4105        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4106        let on = vec![(
4107            Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4108            Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4109        )];
4110
4111        let join = join(
4112            left,
4113            right,
4114            on,
4115            &JoinType::Inner,
4116            NullEquality::NullEqualsNothing,
4117        )?;
4118
4119        let task_ctx = Arc::new(TaskContext::default());
4120        let stream = join.execute(0, task_ctx)?;
4121        let batches = common::collect(stream).await?;
4122
4123        allow_duplicates! {
4124            assert_snapshot!(batches_to_sort_string(&batches), @r"
4125            +------------+---+------------+---+
4126            | date       | n | date       | n |
4127            +------------+---+------------+---+
4128            | 2022-04-26 | 2 | 2022-04-26 | 4 |
4129            | 2022-04-26 | 2 | 2022-04-26 | 5 |
4130            | 2022-04-27 | 3 | 2022-04-27 | 6 |
4131            +------------+---+------------+---+
4132            ");
4133        }
4134
4135        Ok(())
4136    }
4137
4138    #[tokio::test]
4139    async fn join_with_error_right() {
4140        let left = build_table(
4141            ("a1", &vec![1, 2, 3]),
4142            ("b1", &vec![4, 5, 7]),
4143            ("c1", &vec![7, 8, 9]),
4144        );
4145
4146        // right input stream returns one good batch and then one error.
4147        // The error should be returned.
4148        let err = exec_err!("bad data error");
4149        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4150
4151        let on = vec![(
4152            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4153            Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4154        )];
4155        let schema = right.schema();
4156        let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4157        let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4158
4159        let join_types = vec![
4160            JoinType::Inner,
4161            JoinType::Left,
4162            JoinType::Right,
4163            JoinType::Full,
4164            JoinType::LeftSemi,
4165            JoinType::LeftAnti,
4166            JoinType::RightSemi,
4167            JoinType::RightAnti,
4168        ];
4169
4170        for join_type in join_types {
4171            let join = join(
4172                Arc::clone(&left),
4173                Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4174                on.clone(),
4175                &join_type,
4176                NullEquality::NullEqualsNothing,
4177            )
4178            .unwrap();
4179            let task_ctx = Arc::new(TaskContext::default());
4180
4181            let stream = join.execute(0, task_ctx).unwrap();
4182
4183            // Expect that an error is returned
4184            let result_string = common::collect(stream).await.unwrap_err().to_string();
4185            assert!(
4186                result_string.contains("bad data error"),
4187                "actual: {result_string}"
4188            );
4189        }
4190    }
4191
4192    #[tokio::test]
4193    async fn join_split_batch() {
4194        let left = build_table(
4195            ("a1", &vec![1, 2, 3, 4]),
4196            ("b1", &vec![1, 1, 1, 1]),
4197            ("c1", &vec![0, 0, 0, 0]),
4198        );
4199        let right = build_table(
4200            ("a2", &vec![10, 20, 30, 40, 50]),
4201            ("b2", &vec![1, 1, 1, 1, 1]),
4202            ("c2", &vec![0, 0, 0, 0, 0]),
4203        );
4204        let on = vec![(
4205            Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4206            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4207        )];
4208
4209        let join_types = vec![
4210            JoinType::Inner,
4211            JoinType::Left,
4212            JoinType::Right,
4213            JoinType::Full,
4214            JoinType::RightSemi,
4215            JoinType::RightAnti,
4216            JoinType::LeftSemi,
4217            JoinType::LeftAnti,
4218        ];
4219        let expected_resultset_records = 20;
4220        let common_result = [
4221            "+----+----+----+----+----+----+",
4222            "| a1 | b1 | c1 | a2 | b2 | c2 |",
4223            "+----+----+----+----+----+----+",
4224            "| 1  | 1  | 0  | 10 | 1  | 0  |",
4225            "| 2  | 1  | 0  | 10 | 1  | 0  |",
4226            "| 3  | 1  | 0  | 10 | 1  | 0  |",
4227            "| 4  | 1  | 0  | 10 | 1  | 0  |",
4228            "| 1  | 1  | 0  | 20 | 1  | 0  |",
4229            "| 2  | 1  | 0  | 20 | 1  | 0  |",
4230            "| 3  | 1  | 0  | 20 | 1  | 0  |",
4231            "| 4  | 1  | 0  | 20 | 1  | 0  |",
4232            "| 1  | 1  | 0  | 30 | 1  | 0  |",
4233            "| 2  | 1  | 0  | 30 | 1  | 0  |",
4234            "| 3  | 1  | 0  | 30 | 1  | 0  |",
4235            "| 4  | 1  | 0  | 30 | 1  | 0  |",
4236            "| 1  | 1  | 0  | 40 | 1  | 0  |",
4237            "| 2  | 1  | 0  | 40 | 1  | 0  |",
4238            "| 3  | 1  | 0  | 40 | 1  | 0  |",
4239            "| 4  | 1  | 0  | 40 | 1  | 0  |",
4240            "| 1  | 1  | 0  | 50 | 1  | 0  |",
4241            "| 2  | 1  | 0  | 50 | 1  | 0  |",
4242            "| 3  | 1  | 0  | 50 | 1  | 0  |",
4243            "| 4  | 1  | 0  | 50 | 1  | 0  |",
4244            "+----+----+----+----+----+----+",
4245        ];
4246        let left_batch = [
4247            "+----+----+----+",
4248            "| a1 | b1 | c1 |",
4249            "+----+----+----+",
4250            "| 1  | 1  | 0  |",
4251            "| 2  | 1  | 0  |",
4252            "| 3  | 1  | 0  |",
4253            "| 4  | 1  | 0  |",
4254            "+----+----+----+",
4255        ];
4256        let right_batch = [
4257            "+----+----+----+",
4258            "| a2 | b2 | c2 |",
4259            "+----+----+----+",
4260            "| 10 | 1  | 0  |",
4261            "| 20 | 1  | 0  |",
4262            "| 30 | 1  | 0  |",
4263            "| 40 | 1  | 0  |",
4264            "| 50 | 1  | 0  |",
4265            "+----+----+----+",
4266        ];
4267        let right_empty = [
4268            "+----+----+----+",
4269            "| a2 | b2 | c2 |",
4270            "+----+----+----+",
4271            "+----+----+----+",
4272        ];
4273        let left_empty = [
4274            "+----+----+----+",
4275            "| a1 | b1 | c1 |",
4276            "+----+----+----+",
4277            "+----+----+----+",
4278        ];
4279
4280        // validation of partial join results output for different batch_size setting
4281        for join_type in join_types {
4282            for batch_size in (1..21).rev() {
4283                let task_ctx = prepare_task_ctx(batch_size);
4284
4285                let join = join(
4286                    Arc::clone(&left),
4287                    Arc::clone(&right),
4288                    on.clone(),
4289                    &join_type,
4290                    NullEquality::NullEqualsNothing,
4291                )
4292                .unwrap();
4293
4294                let stream = join.execute(0, task_ctx).unwrap();
4295                let batches = common::collect(stream).await.unwrap();
4296
4297                // For inner/right join expected batch count equals dev_ceil result,
4298                // as there is no need to append non-joined build side data.
4299                // For other join types it'll be div_ceil + 1 -- for additional batch
4300                // containing not visited build side rows (empty in this test case).
4301                let expected_batch_count = match join_type {
4302                    JoinType::Inner
4303                    | JoinType::Right
4304                    | JoinType::RightSemi
4305                    | JoinType::RightAnti => {
4306                        div_ceil(expected_resultset_records, batch_size)
4307                    }
4308                    _ => div_ceil(expected_resultset_records, batch_size) + 1,
4309                };
4310                // With batch coalescing, we may have fewer batches than expected
4311                assert!(
4312                    batches.len() <= expected_batch_count,
4313                    "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
4314                    batches.len()
4315                );
4316
4317                let expected = match join_type {
4318                    JoinType::RightSemi => right_batch.to_vec(),
4319                    JoinType::RightAnti => right_empty.to_vec(),
4320                    JoinType::LeftSemi => left_batch.to_vec(),
4321                    JoinType::LeftAnti => left_empty.to_vec(),
4322                    _ => common_result.to_vec(),
4323                };
4324                // For anti joins with empty results, we may get zero batches
4325                // (with coalescing) instead of one empty batch with schema
4326                if batches.is_empty() {
4327                    // Verify this is an expected empty result case
4328                    assert!(
4329                        matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
4330                        "Unexpected empty result for {join_type} join"
4331                    );
4332                } else {
4333                    assert_batches_eq!(expected, &batches);
4334                }
4335            }
4336        }
4337    }
4338
4339    #[tokio::test]
4340    async fn single_partition_join_overallocation() -> Result<()> {
4341        let left = build_table(
4342            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4343            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4344            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4345        );
4346        let right = build_table(
4347            ("a2", &vec![10, 11]),
4348            ("b2", &vec![12, 13]),
4349            ("c2", &vec![14, 15]),
4350        );
4351        let on = vec![(
4352            Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4353            Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4354        )];
4355
4356        let join_types = vec![
4357            JoinType::Inner,
4358            JoinType::Left,
4359            JoinType::Right,
4360            JoinType::Full,
4361            JoinType::LeftSemi,
4362            JoinType::LeftAnti,
4363            JoinType::RightSemi,
4364            JoinType::RightAnti,
4365            JoinType::LeftMark,
4366            JoinType::RightMark,
4367        ];
4368
4369        for join_type in join_types {
4370            let runtime = RuntimeEnvBuilder::new()
4371                .with_memory_limit(100, 1.0)
4372                .build_arc()?;
4373            let task_ctx = TaskContext::default().with_runtime(runtime);
4374            let task_ctx = Arc::new(task_ctx);
4375
4376            let join = join(
4377                Arc::clone(&left),
4378                Arc::clone(&right),
4379                on.clone(),
4380                &join_type,
4381                NullEquality::NullEqualsNothing,
4382            )?;
4383
4384            let stream = join.execute(0, task_ctx)?;
4385            let err = common::collect(stream).await.unwrap_err();
4386
4387            // Asserting that operator-level reservation attempting to overallocate
4388            assert_contains!(
4389                err.to_string(),
4390                "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n  HashJoinInput"
4391            );
4392
4393            assert_contains!(
4394                err.to_string(),
4395                "Failed to allocate additional 120.0 B for HashJoinInput"
4396            );
4397        }
4398
4399        Ok(())
4400    }
4401
4402    #[tokio::test]
4403    async fn partitioned_join_overallocation() -> Result<()> {
4404        // Prepare partitioned inputs for HashJoinExec
4405        // No need to adjust partitioning, as execution should fail with `Resources exhausted` error
4406        let left_batch = build_table_i32(
4407            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4408            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4409            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4410        );
4411        let left = TestMemoryExec::try_new_exec(
4412            &[vec![left_batch.clone()], vec![left_batch.clone()]],
4413            left_batch.schema(),
4414            None,
4415        )
4416        .unwrap();
4417        let right_batch = build_table_i32(
4418            ("a2", &vec![10, 11]),
4419            ("b2", &vec![12, 13]),
4420            ("c2", &vec![14, 15]),
4421        );
4422        let right = TestMemoryExec::try_new_exec(
4423            &[vec![right_batch.clone()], vec![right_batch.clone()]],
4424            right_batch.schema(),
4425            None,
4426        )
4427        .unwrap();
4428        let on = vec![(
4429            Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4430            Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4431        )];
4432
4433        let join_types = vec![
4434            JoinType::Inner,
4435            JoinType::Left,
4436            JoinType::Right,
4437            JoinType::Full,
4438            JoinType::LeftSemi,
4439            JoinType::LeftAnti,
4440            JoinType::RightSemi,
4441            JoinType::RightAnti,
4442        ];
4443
4444        for join_type in join_types {
4445            let runtime = RuntimeEnvBuilder::new()
4446                .with_memory_limit(100, 1.0)
4447                .build_arc()?;
4448            let session_config = SessionConfig::default().with_batch_size(50);
4449            let task_ctx = TaskContext::default()
4450                .with_session_config(session_config)
4451                .with_runtime(runtime);
4452            let task_ctx = Arc::new(task_ctx);
4453
4454            let join = HashJoinExec::try_new(
4455                Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4456                Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4457                on.clone(),
4458                None,
4459                &join_type,
4460                None,
4461                PartitionMode::Partitioned,
4462                NullEquality::NullEqualsNothing,
4463            )?;
4464
4465            let stream = join.execute(1, task_ctx)?;
4466            let err = common::collect(stream).await.unwrap_err();
4467
4468            // Asserting that stream-level reservation attempting to overallocate
4469            assert_contains!(
4470                err.to_string(),
4471                "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n  HashJoinInput[1]"
4472            );
4473
4474            assert_contains!(
4475                err.to_string(),
4476                "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4477            );
4478        }
4479
4480        Ok(())
4481    }
4482
4483    fn build_table_struct(
4484        struct_name: &str,
4485        field_name_and_values: (&str, &Vec<Option<i32>>),
4486        nulls: Option<NullBuffer>,
4487    ) -> Arc<dyn ExecutionPlan> {
4488        let (field_name, values) = field_name_and_values;
4489        let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4490        let schema = Schema::new(vec![Field::new(
4491            struct_name,
4492            DataType::Struct(inner_fields.clone().into()),
4493            nulls.is_some(),
4494        )]);
4495
4496        let batch = RecordBatch::try_new(
4497            Arc::new(schema),
4498            vec![Arc::new(StructArray::new(
4499                inner_fields.into(),
4500                vec![Arc::new(Int32Array::from(values.clone()))],
4501                nulls,
4502            ))],
4503        )
4504        .unwrap();
4505        let schema_ref = batch.schema();
4506        TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4507    }
4508
4509    #[tokio::test]
4510    async fn join_on_struct() -> Result<()> {
4511        let task_ctx = Arc::new(TaskContext::default());
4512        let left =
4513            build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4514        let right =
4515            build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4516        let on = vec![(
4517            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4518            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4519        )];
4520
4521        let (columns, batches, metrics) = join_collect(
4522            left,
4523            right,
4524            on,
4525            &JoinType::Inner,
4526            NullEquality::NullEqualsNothing,
4527            task_ctx,
4528        )
4529        .await?;
4530
4531        assert_eq!(columns, vec!["n1", "n2"]);
4532
4533        allow_duplicates! {
4534            assert_snapshot!(batches_to_string(&batches), @r"
4535            +--------+--------+
4536            | n1     | n2     |
4537            +--------+--------+
4538            | {a: }  | {a: }  |
4539            | {a: 1} | {a: 1} |
4540            | {a: 2} | {a: 2} |
4541            +--------+--------+
4542            ");
4543        }
4544
4545        assert_join_metrics!(metrics, 3);
4546
4547        Ok(())
4548    }
4549
4550    #[tokio::test]
4551    async fn join_on_struct_with_nulls() -> Result<()> {
4552        let task_ctx = Arc::new(TaskContext::default());
4553        let left =
4554            build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4555        let right =
4556            build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4557        let on = vec![(
4558            Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4559            Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4560        )];
4561
4562        let (_, batches_null_eq, metrics) = join_collect(
4563            Arc::clone(&left),
4564            Arc::clone(&right),
4565            on.clone(),
4566            &JoinType::Inner,
4567            NullEquality::NullEqualsNull,
4568            Arc::clone(&task_ctx),
4569        )
4570        .await?;
4571
4572        allow_duplicates! {
4573            assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
4574            +----+----+
4575            | n1 | n2 |
4576            +----+----+
4577            |    |    |
4578            +----+----+
4579            ");
4580        }
4581
4582        assert_join_metrics!(metrics, 1);
4583
4584        let (_, batches_null_neq, metrics) = join_collect(
4585            left,
4586            right,
4587            on,
4588            &JoinType::Inner,
4589            NullEquality::NullEqualsNothing,
4590            task_ctx,
4591        )
4592        .await?;
4593
4594        assert_join_metrics!(metrics, 0);
4595
4596        // With batch coalescing, empty results may not emit any batches
4597        // Check that either we have no batches, or an empty batch with proper schema
4598        if batches_null_neq.is_empty() {
4599            // This is fine - no output rows
4600        } else {
4601            let expected_null_neq =
4602                ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4603            assert_batches_eq!(expected_null_neq, &batches_null_neq);
4604        }
4605
4606        Ok(())
4607    }
4608
4609    /// Returns the column names on the schema
4610    fn columns(schema: &Schema) -> Vec<String> {
4611        schema.fields().iter().map(|f| f.name().clone()).collect()
4612    }
4613
4614    /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
4615    #[tokio::test]
4616    async fn test_hash_join_marks_filter_complete() -> Result<()> {
4617        let task_ctx = Arc::new(TaskContext::default());
4618        let left = build_table(
4619            ("a1", &vec![1, 2, 3]),
4620            ("b1", &vec![4, 5, 6]),
4621            ("c1", &vec![7, 8, 9]),
4622        );
4623        let right = build_table(
4624            ("a2", &vec![10, 20, 30]),
4625            ("b1", &vec![4, 5, 6]),
4626            ("c2", &vec![70, 80, 90]),
4627        );
4628
4629        let on = vec![(
4630            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4631            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4632        )];
4633
4634        // Create a dynamic filter manually
4635        let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4636        let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4637
4638        // Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4639        let _consumer = Arc::clone(&dynamic_filter)
4640            .with_new_children(vec![])
4641            .unwrap();
4642
4643        // Create HashJoinExec with the dynamic filter
4644        let mut join = HashJoinExec::try_new(
4645            left,
4646            right,
4647            on,
4648            None,
4649            &JoinType::Inner,
4650            None,
4651            PartitionMode::CollectLeft,
4652            NullEquality::NullEqualsNothing,
4653        )?;
4654        join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4655            filter: dynamic_filter,
4656            build_accumulator: OnceLock::new(),
4657        });
4658
4659        // Execute the join
4660        let stream = join.execute(0, task_ctx)?;
4661        let _batches = common::collect(stream).await?;
4662
4663        // After the join completes, the dynamic filter should be marked as complete
4664        // wait_complete() should return immediately
4665        dynamic_filter_clone.wait_complete().await;
4666
4667        Ok(())
4668    }
4669
4670    /// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
4671    #[tokio::test]
4672    async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
4673        let task_ctx = Arc::new(TaskContext::default());
4674        // Empty left side (build side)
4675        let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4676        let right = build_table(
4677            ("a2", &vec![10, 20, 30]),
4678            ("b1", &vec![4, 5, 6]),
4679            ("c2", &vec![70, 80, 90]),
4680        );
4681
4682        let on = vec![(
4683            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4684            Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4685        )];
4686
4687        // Create a dynamic filter manually
4688        let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4689        let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4690
4691        // Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4692        let _consumer = Arc::clone(&dynamic_filter)
4693            .with_new_children(vec![])
4694            .unwrap();
4695
4696        // Create HashJoinExec with the dynamic filter
4697        let mut join = HashJoinExec::try_new(
4698            left,
4699            right,
4700            on,
4701            None,
4702            &JoinType::Inner,
4703            None,
4704            PartitionMode::CollectLeft,
4705            NullEquality::NullEqualsNothing,
4706        )?;
4707        join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4708            filter: dynamic_filter,
4709            build_accumulator: OnceLock::new(),
4710        });
4711
4712        // Execute the join
4713        let stream = join.execute(0, task_ctx)?;
4714        let _batches = common::collect(stream).await?;
4715
4716        // Even with empty build side, the dynamic filter should be marked as complete
4717        // wait_complete() should return immediately
4718        dynamic_filter_clone.wait_complete().await;
4719
4720        Ok(())
4721    }
4722}