Skip to main content

datafusion_physical_plan/joins/
symmetric_hash_join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This file implements the symmetric hash join algorithm with range-based
19//! data pruning to join two (potentially infinite) streams.
20//!
21//! A [`SymmetricHashJoinExec`] plan takes two children plan (with appropriate
22//! output ordering) and produces the join output according to the given join
23//! type and other options.
24//!
25//! This plan uses the [`OneSideHashJoiner`] object to facilitate join calculations
26//! for both its children.
27
28use std::any::Any;
29use std::fmt::{self, Debug};
30use std::mem::{size_of, size_of_val};
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use std::vec;
34
35use crate::check_if_same_properties;
36use crate::common::SharedMemoryReservation;
37use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
38use crate::joins::stream_join_utils::{
39    PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
40    calculate_filter_expr_intervals, combine_two_batches,
41    convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
42    get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
43};
44use crate::joins::utils::{
45    BatchSplitter, BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn,
46    JoinOnRef, NoopBatchTransformer, StatefulStreamResult, apply_join_filter_to_indices,
47    build_batch_from_indices, build_join_schema, check_join_is_valid, equal_rows_arr,
48    symmetric_join_output_partitioning, update_hash,
49};
50use crate::projection::{
51    ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
52    physical_to_column_exprs, update_join_filter, update_join_on,
53};
54use crate::{
55    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
56    PlanProperties, RecordBatchStream, SendableRecordBatchStream,
57    joins::StreamJoinPartitionMode,
58    metrics::{ExecutionPlanMetricsSet, MetricsSet},
59};
60
61use arrow::array::{
62    ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array,
63    UInt64Array,
64};
65use arrow::compute::concat_batches;
66use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
67use arrow::record_batch::RecordBatch;
68use datafusion_common::hash_utils::create_hashes;
69use datafusion_common::utils::bisect;
70use datafusion_common::{
71    HashSet, JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err,
72    plan_err,
73};
74use datafusion_execution::TaskContext;
75use datafusion_execution::memory_pool::MemoryConsumer;
76use datafusion_expr::interval_arithmetic::Interval;
77use datafusion_physical_expr::equivalence::join_equivalence_properties;
78use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
79use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
80use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
81
82use ahash::RandomState;
83use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
84use futures::{Stream, StreamExt, ready};
85use parking_lot::Mutex;
86
87const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
88
89/// A symmetric hash join with range conditions is when both streams are hashed on the
90/// join key and the resulting hash tables are used to join the streams.
91/// The join is considered symmetric because the hash table is built on the join keys from both
92/// streams, and the matching of rows is based on the values of the join keys in both streams.
93/// This type of join is efficient in streaming context as it allows for fast lookups in the hash
94/// table, rather than having to scan through one or both of the streams to find matching rows, also it
95/// only considers the elements from the stream that fall within a certain sliding window (w/ range conditions),
96/// making it more efficient and less likely to store stale data. This enables operating on unbounded streaming
97/// data without any memory issues.
98///
99/// For each input stream, create a hash table.
100///   - For each new [RecordBatch] in build side, hash and insert into inputs hash table. Update offsets.
101///   - Test if input is equal to a predefined set of other inputs.
102///   - If so record the visited rows. If the matched row results must be produced (INNER, LEFT), output the [RecordBatch].
103///   - Try to prune other side (probe) with new [RecordBatch].
104///   - If the join type indicates that the unmatched rows results must be produced (LEFT, FULL etc.),
105///     output the [RecordBatch] when a pruning happens or at the end of the data.
106///
107///
108/// ``` text
109///                        +-------------------------+
110///                        |                         |
111///   left stream ---------|  Left OneSideHashJoiner |---+
112///                        |                         |   |
113///                        +-------------------------+   |
114///                                                      |
115///                                                      |--------- Joined output
116///                                                      |
117///                        +-------------------------+   |
118///                        |                         |   |
119///  right stream ---------| Right OneSideHashJoiner |---+
120///                        |                         |
121///                        +-------------------------+
122///
123/// Prune build side when the new RecordBatch comes to the probe side. We utilize interval arithmetic
124/// on JoinFilter's sorted PhysicalExprs to calculate the joinable range.
125///
126///
127///               PROBE SIDE          BUILD SIDE
128///                 BUFFER              BUFFER
129///             +-------------+     +------------+
130///             |             |     |            |    Unjoinable
131///             |             |     |            |    Range
132///             |             |     |            |
133///             |             |  |---------------------------------
134///             |             |  |  |            |
135///             |             |  |  |            |
136///             |             | /   |            |
137///             |             | |   |            |
138///             |             | |   |            |
139///             |             | |   |            |
140///             |             | |   |            |
141///             |             | |   |            |    Joinable
142///             |             |/    |            |    Range
143///             |             ||    |            |
144///             |+-----------+||    |            |
145///             || Record    ||     |            |
146///             || Batch     ||     |            |
147///             |+-----------+||    |            |
148///             +-------------+\    +------------+
149///                             |
150///                             \
151///                              |---------------------------------
152///
153///  This happens when range conditions are provided on sorted columns. E.g.
154///
155///        SELECT * FROM left_table, right_table
156///        ON
157///          left_key = right_key AND
158///          left_time > right_time - INTERVAL 12 MINUTES AND left_time < right_time + INTERVAL 2 HOUR
159///
160/// or
161///       SELECT * FROM left_table, right_table
162///        ON
163///          left_key = right_key AND
164///          left_sorted > right_sorted - 3 AND left_sorted < right_sorted + 10
165///
166/// For general purpose, in the second scenario, when the new data comes to probe side, the conditions can be used to
167/// determine a specific threshold for discarding rows from the inner buffer. For example, if the sort order the
168/// two columns ("left_sorted" and "right_sorted") are ascending (it can be different in another scenarios)
169/// and the join condition is "left_sorted > right_sorted - 3" and the latest value on the right input is 1234, meaning
170/// that the left side buffer must only keep rows where "leftTime > rightTime - 3 > 1234 - 3 > 1231" ,
171/// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending)
172/// than that can be dropped from the inner buffer.
173/// ```
174#[derive(Debug, Clone)]
175pub struct SymmetricHashJoinExec {
176    /// Left side stream
177    pub(crate) left: Arc<dyn ExecutionPlan>,
178    /// Right side stream
179    pub(crate) right: Arc<dyn ExecutionPlan>,
180    /// Set of common columns used to join on
181    pub(crate) on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
182    /// Filters applied when finding matching rows
183    pub(crate) filter: Option<JoinFilter>,
184    /// How the join is performed
185    pub(crate) join_type: JoinType,
186    /// Shares the `RandomState` for the hashing algorithm
187    random_state: RandomState,
188    /// Execution metrics
189    metrics: ExecutionPlanMetricsSet,
190    /// Information of index and left / right placement of columns
191    column_indices: Vec<ColumnIndex>,
192    /// Defines the null equality for the join.
193    pub(crate) null_equality: NullEquality,
194    /// Left side sort expression(s)
195    pub(crate) left_sort_exprs: Option<LexOrdering>,
196    /// Right side sort expression(s)
197    pub(crate) right_sort_exprs: Option<LexOrdering>,
198    /// Partition Mode
199    mode: StreamJoinPartitionMode,
200    /// Cache holding plan properties like equivalences, output partitioning etc.
201    cache: Arc<PlanProperties>,
202}
203
204impl SymmetricHashJoinExec {
205    /// Tries to create a new [SymmetricHashJoinExec].
206    /// # Error
207    /// This function errors when:
208    /// - It is not possible to join the left and right sides on keys `on`, or
209    /// - It fails to construct `SortedFilterExpr`s, or
210    /// - It fails to create the [ExprIntervalGraph].
211    #[expect(clippy::too_many_arguments)]
212    pub fn try_new(
213        left: Arc<dyn ExecutionPlan>,
214        right: Arc<dyn ExecutionPlan>,
215        on: JoinOn,
216        filter: Option<JoinFilter>,
217        join_type: &JoinType,
218        null_equality: NullEquality,
219        left_sort_exprs: Option<LexOrdering>,
220        right_sort_exprs: Option<LexOrdering>,
221        mode: StreamJoinPartitionMode,
222    ) -> Result<Self> {
223        let left_schema = left.schema();
224        let right_schema = right.schema();
225
226        // Error out if no "on" constraints are given:
227        if on.is_empty() {
228            return plan_err!(
229                "On constraints in SymmetricHashJoinExec should be non-empty"
230            );
231        }
232
233        // Check if the join is valid with the given on constraints:
234        check_join_is_valid(&left_schema, &right_schema, &on)?;
235
236        // Build the join schema from the left and right schemas:
237        let (schema, column_indices) =
238            build_join_schema(&left_schema, &right_schema, join_type);
239
240        // Initialize the random state for the join operation:
241        let random_state = RandomState::with_seeds(0, 0, 0, 0);
242        let schema = Arc::new(schema);
243        let cache = Self::compute_properties(&left, &right, schema, *join_type, &on)?;
244        Ok(SymmetricHashJoinExec {
245            left,
246            right,
247            on,
248            filter,
249            join_type: *join_type,
250            random_state,
251            metrics: ExecutionPlanMetricsSet::new(),
252            column_indices,
253            null_equality,
254            left_sort_exprs,
255            right_sort_exprs,
256            mode,
257            cache: Arc::new(cache),
258        })
259    }
260
261    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
262    fn compute_properties(
263        left: &Arc<dyn ExecutionPlan>,
264        right: &Arc<dyn ExecutionPlan>,
265        schema: SchemaRef,
266        join_type: JoinType,
267        join_on: JoinOnRef,
268    ) -> Result<PlanProperties> {
269        // Calculate equivalence properties:
270        let eq_properties = join_equivalence_properties(
271            left.equivalence_properties().clone(),
272            right.equivalence_properties().clone(),
273            &join_type,
274            schema,
275            &[false, false],
276            // Has alternating probe side
277            None,
278            join_on,
279        )?;
280
281        let output_partitioning =
282            symmetric_join_output_partitioning(left, right, &join_type)?;
283
284        Ok(PlanProperties::new(
285            eq_properties,
286            output_partitioning,
287            emission_type_from_children([left, right]),
288            boundedness_from_children([left, right]),
289        ))
290    }
291
292    /// left stream
293    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
294        &self.left
295    }
296
297    /// right stream
298    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
299        &self.right
300    }
301
302    /// Set of common columns used to join on
303    pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
304        &self.on
305    }
306
307    /// Filters applied before join output
308    pub fn filter(&self) -> Option<&JoinFilter> {
309        self.filter.as_ref()
310    }
311
312    /// How the join is performed
313    pub fn join_type(&self) -> &JoinType {
314        &self.join_type
315    }
316
317    /// Get null_equality
318    pub fn null_equality(&self) -> NullEquality {
319        self.null_equality
320    }
321
322    /// Get partition mode
323    pub fn partition_mode(&self) -> StreamJoinPartitionMode {
324        self.mode
325    }
326
327    /// Get left_sort_exprs
328    pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
329        self.left_sort_exprs.as_ref()
330    }
331
332    /// Get right_sort_exprs
333    pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
334        self.right_sort_exprs.as_ref()
335    }
336
337    /// Check if order information covers every column in the filter expression.
338    pub fn check_if_order_information_available(&self) -> Result<bool> {
339        if let Some(filter) = self.filter() {
340            let left = self.left();
341            if let Some(left_ordering) = left.output_ordering() {
342                let right = self.right();
343                if let Some(right_ordering) = right.output_ordering() {
344                    let left_convertible = convert_sort_expr_with_filter_schema(
345                        &JoinSide::Left,
346                        filter,
347                        &left.schema(),
348                        &left_ordering[0],
349                    )?
350                    .is_some();
351                    let right_convertible = convert_sort_expr_with_filter_schema(
352                        &JoinSide::Right,
353                        filter,
354                        &right.schema(),
355                        &right_ordering[0],
356                    )?
357                    .is_some();
358                    return Ok(left_convertible && right_convertible);
359                }
360            }
361        }
362        Ok(false)
363    }
364
365    fn with_new_children_and_same_properties(
366        &self,
367        mut children: Vec<Arc<dyn ExecutionPlan>>,
368    ) -> Self {
369        let left = children.swap_remove(0);
370        let right = children.swap_remove(0);
371        Self {
372            left,
373            right,
374            metrics: ExecutionPlanMetricsSet::new(),
375            ..Self::clone(self)
376        }
377    }
378}
379
380impl DisplayAs for SymmetricHashJoinExec {
381    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
382        match t {
383            DisplayFormatType::Default | DisplayFormatType::Verbose => {
384                let display_filter = self.filter.as_ref().map_or_else(
385                    || "".to_string(),
386                    |f| format!(", filter={}", f.expression()),
387                );
388                let on = self
389                    .on
390                    .iter()
391                    .map(|(c1, c2)| format!("({c1}, {c2})"))
392                    .collect::<Vec<String>>()
393                    .join(", ");
394                write!(
395                    f,
396                    "SymmetricHashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
397                    self.mode, self.join_type, on, display_filter
398                )
399            }
400            DisplayFormatType::TreeRender => {
401                let on = self
402                    .on
403                    .iter()
404                    .map(|(c1, c2)| {
405                        format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
406                    })
407                    .collect::<Vec<String>>()
408                    .join(", ");
409
410                writeln!(f, "mode={:?}", self.mode)?;
411                if *self.join_type() != JoinType::Inner {
412                    writeln!(f, "join_type={:?}", self.join_type)?;
413                }
414                writeln!(f, "on={on}")
415            }
416        }
417    }
418}
419
420impl ExecutionPlan for SymmetricHashJoinExec {
421    fn name(&self) -> &'static str {
422        "SymmetricHashJoinExec"
423    }
424
425    fn as_any(&self) -> &dyn Any {
426        self
427    }
428
429    fn properties(&self) -> &Arc<PlanProperties> {
430        &self.cache
431    }
432
433    fn required_input_distribution(&self) -> Vec<Distribution> {
434        match self.mode {
435            StreamJoinPartitionMode::Partitioned => {
436                let (left_expr, right_expr) = self
437                    .on
438                    .iter()
439                    .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
440                    .unzip();
441                vec![
442                    Distribution::HashPartitioned(left_expr),
443                    Distribution::HashPartitioned(right_expr),
444                ]
445            }
446            StreamJoinPartitionMode::SinglePartition => {
447                vec![Distribution::SinglePartition, Distribution::SinglePartition]
448            }
449        }
450    }
451
452    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
453        vec![
454            self.left_sort_exprs
455                .as_ref()
456                .map(|e| OrderingRequirements::from(e.clone())),
457            self.right_sort_exprs
458                .as_ref()
459                .map(|e| OrderingRequirements::from(e.clone())),
460        ]
461    }
462
463    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
464        vec![&self.left, &self.right]
465    }
466
467    fn with_new_children(
468        self: Arc<Self>,
469        children: Vec<Arc<dyn ExecutionPlan>>,
470    ) -> Result<Arc<dyn ExecutionPlan>> {
471        check_if_same_properties!(self, children);
472        Ok(Arc::new(SymmetricHashJoinExec::try_new(
473            Arc::clone(&children[0]),
474            Arc::clone(&children[1]),
475            self.on.clone(),
476            self.filter.clone(),
477            &self.join_type,
478            self.null_equality,
479            self.left_sort_exprs.clone(),
480            self.right_sort_exprs.clone(),
481            self.mode,
482        )?))
483    }
484
485    fn metrics(&self) -> Option<MetricsSet> {
486        Some(self.metrics.clone_inner())
487    }
488
489    fn execute(
490        &self,
491        partition: usize,
492        context: Arc<TaskContext>,
493    ) -> Result<SendableRecordBatchStream> {
494        let left_partitions = self.left.output_partitioning().partition_count();
495        let right_partitions = self.right.output_partitioning().partition_count();
496        assert_eq_or_internal_err!(
497            left_partitions,
498            right_partitions,
499            "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
500                 consider using RepartitionExec"
501        );
502        // If `filter_state` and `filter` are both present, then calculate sorted
503        // filter expressions for both sides, and build an expression graph.
504        let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
505            self.left_sort_exprs(),
506            self.right_sort_exprs(),
507            &self.filter,
508        ) {
509            (Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
510                let (left, right, graph) = prepare_sorted_exprs(
511                    filter,
512                    &self.left,
513                    &self.right,
514                    left_sort_exprs,
515                    right_sort_exprs,
516                )?;
517                (Some(left), Some(right), Some(graph))
518            }
519            // If `filter_state` or `filter` is not present, then return None
520            // for all three values:
521            _ => (None, None, None),
522        };
523
524        let (on_left, on_right) = self.on.iter().cloned().unzip();
525
526        let left_side_joiner =
527            OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
528        let right_side_joiner =
529            OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
530
531        let left_stream = self.left.execute(partition, Arc::clone(&context))?;
532
533        let right_stream = self.right.execute(partition, Arc::clone(&context))?;
534
535        let batch_size = context.session_config().batch_size();
536        let enforce_batch_size_in_joins =
537            context.session_config().enforce_batch_size_in_joins();
538
539        let reservation = Arc::new(Mutex::new(
540            MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
541                .register(context.memory_pool()),
542        ));
543        if let Some(g) = graph.as_ref() {
544            reservation.lock().try_grow(g.size())?;
545        }
546
547        if enforce_batch_size_in_joins {
548            Ok(Box::pin(SymmetricHashJoinStream {
549                left_stream,
550                right_stream,
551                schema: self.schema(),
552                filter: self.filter.clone(),
553                join_type: self.join_type,
554                random_state: self.random_state.clone(),
555                left: left_side_joiner,
556                right: right_side_joiner,
557                column_indices: self.column_indices.clone(),
558                metrics: StreamJoinMetrics::new(partition, &self.metrics),
559                graph,
560                left_sorted_filter_expr,
561                right_sorted_filter_expr,
562                null_equality: self.null_equality,
563                state: SHJStreamState::PullRight,
564                reservation,
565                batch_transformer: BatchSplitter::new(batch_size),
566            }))
567        } else {
568            Ok(Box::pin(SymmetricHashJoinStream {
569                left_stream,
570                right_stream,
571                schema: self.schema(),
572                filter: self.filter.clone(),
573                join_type: self.join_type,
574                random_state: self.random_state.clone(),
575                left: left_side_joiner,
576                right: right_side_joiner,
577                column_indices: self.column_indices.clone(),
578                metrics: StreamJoinMetrics::new(partition, &self.metrics),
579                graph,
580                left_sorted_filter_expr,
581                right_sorted_filter_expr,
582                null_equality: self.null_equality,
583                state: SHJStreamState::PullRight,
584                reservation,
585                batch_transformer: NoopBatchTransformer::new(),
586            }))
587        }
588    }
589
590    /// Tries to swap the projection with its input [`SymmetricHashJoinExec`]. If it can be done,
591    /// it returns the new swapped version having the [`SymmetricHashJoinExec`] as the top plan.
592    /// Otherwise, it returns None.
593    fn try_swapping_with_projection(
594        &self,
595        projection: &ProjectionExec,
596    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
597        // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed.
598        let Some(projection_as_columns) = physical_to_column_exprs(projection.expr())
599        else {
600            return Ok(None);
601        };
602
603        let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
604            self.left().schema().fields().len(),
605            &projection_as_columns,
606        );
607
608        if !join_allows_pushdown(
609            &projection_as_columns,
610            &self.schema(),
611            far_right_left_col_ind,
612            far_left_right_col_ind,
613        ) {
614            return Ok(None);
615        }
616
617        let Some(new_on) = update_join_on(
618            &projection_as_columns[0..=far_right_left_col_ind as _],
619            &projection_as_columns[far_left_right_col_ind as _..],
620            self.on(),
621            self.left().schema().fields().len(),
622        ) else {
623            return Ok(None);
624        };
625
626        let new_filter = if let Some(filter) = self.filter() {
627            match update_join_filter(
628                &projection_as_columns[0..=far_right_left_col_ind as _],
629                &projection_as_columns[far_left_right_col_ind as _..],
630                filter,
631                self.left().schema().fields().len(),
632            ) {
633                Some(updated_filter) => Some(updated_filter),
634                None => return Ok(None),
635            }
636        } else {
637            None
638        };
639
640        let (new_left, new_right) = new_join_children(
641            &projection_as_columns,
642            far_right_left_col_ind,
643            far_left_right_col_ind,
644            self.left(),
645            self.right(),
646        )?;
647
648        SymmetricHashJoinExec::try_new(
649            Arc::new(new_left),
650            Arc::new(new_right),
651            new_on,
652            new_filter,
653            self.join_type(),
654            self.null_equality(),
655            self.right().output_ordering().cloned(),
656            self.left().output_ordering().cloned(),
657            self.partition_mode(),
658        )
659        .map(|e| Some(Arc::new(e) as _))
660    }
661}
662
663/// A stream that issues [RecordBatch]es as they arrive from the right  of the join.
664struct SymmetricHashJoinStream<T> {
665    /// Input streams
666    left_stream: SendableRecordBatchStream,
667    right_stream: SendableRecordBatchStream,
668    /// Input schema
669    schema: Arc<Schema>,
670    /// join filter
671    filter: Option<JoinFilter>,
672    /// type of the join
673    join_type: JoinType,
674    // left hash joiner
675    left: OneSideHashJoiner,
676    /// right hash joiner
677    right: OneSideHashJoiner,
678    /// Information of index and left / right placement of columns
679    column_indices: Vec<ColumnIndex>,
680    // Expression graph for range pruning.
681    graph: Option<ExprIntervalGraph>,
682    // Left globally sorted filter expr
683    left_sorted_filter_expr: Option<SortedFilterExpr>,
684    // Right globally sorted filter expr
685    right_sorted_filter_expr: Option<SortedFilterExpr>,
686    /// Random state used for hashing initialization
687    random_state: RandomState,
688    /// Defines the null equality for the join.
689    null_equality: NullEquality,
690    /// Metrics
691    metrics: StreamJoinMetrics,
692    /// Memory reservation
693    reservation: SharedMemoryReservation,
694    /// State machine for input execution
695    state: SHJStreamState,
696    /// Transforms the output batch before returning.
697    batch_transformer: T,
698}
699
700impl<T: BatchTransformer + Unpin + Send> RecordBatchStream
701    for SymmetricHashJoinStream<T>
702{
703    fn schema(&self) -> SchemaRef {
704        Arc::clone(&self.schema)
705    }
706}
707
708impl<T: BatchTransformer + Unpin + Send> Stream for SymmetricHashJoinStream<T> {
709    type Item = Result<RecordBatch>;
710
711    fn poll_next(
712        mut self: std::pin::Pin<&mut Self>,
713        cx: &mut Context<'_>,
714    ) -> Poll<Option<Self::Item>> {
715        self.poll_next_impl(cx)
716    }
717}
718
719/// Determine the pruning length for `buffer`.
720///
721/// This function evaluates the build side filter expression, converts the
722/// result into an array and determines the pruning length by performing a
723/// binary search on the array.
724///
725/// # Arguments
726///
727/// * `buffer`: The record batch to be pruned.
728/// * `build_side_filter_expr`: The filter expression on the build side used
729///   to determine the pruning length.
730///
731/// # Returns
732///
733/// A [Result] object that contains the pruning length. The function will return
734/// an error if
735/// - there is an issue evaluating the build side filter expression;
736/// - there is an issue converting the build side filter expression into an array
737fn determine_prune_length(
738    buffer: &RecordBatch,
739    build_side_filter_expr: &SortedFilterExpr,
740) -> Result<usize> {
741    let origin_sorted_expr = build_side_filter_expr.origin_sorted_expr();
742    let interval = build_side_filter_expr.interval();
743    // Evaluate the build side filter expression and convert it into an array
744    let batch_arr = origin_sorted_expr
745        .expr
746        .evaluate(buffer)?
747        .into_array(buffer.num_rows())?;
748
749    // Get the lower or upper interval based on the sort direction
750    let target = if origin_sorted_expr.options.descending {
751        interval.upper().clone()
752    } else {
753        interval.lower().clone()
754    };
755
756    // Perform binary search on the array to determine the length of the record batch to be pruned
757    bisect::<true>(&[batch_arr], &[target], &[origin_sorted_expr.options])
758}
759
760/// This method determines if the result of the join should be produced in the final step or not.
761///
762/// # Arguments
763///
764/// * `build_side` - Enum indicating the side of the join used as the build side.
765/// * `join_type` - Enum indicating the type of join to be performed.
766///
767/// # Returns
768///
769/// A boolean indicating whether the result of the join should be produced in the final step or not.
770/// The result will be true if the build side is JoinSide::Left and the join type is one of
771/// JoinType::Left, JoinType::LeftAnti, JoinType::Full or JoinType::LeftSemi.
772/// If the build side is JoinSide::Right, the result will be true if the join type
773/// is one of JoinType::Right, JoinType::RightAnti, JoinType::Full, or JoinType::RightSemi.
774fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> bool {
775    if build_side == JoinSide::Left {
776        matches!(
777            join_type,
778            JoinType::Left
779                | JoinType::LeftAnti
780                | JoinType::Full
781                | JoinType::LeftSemi
782                | JoinType::LeftMark
783        )
784    } else {
785        matches!(
786            join_type,
787            JoinType::Right
788                | JoinType::RightAnti
789                | JoinType::Full
790                | JoinType::RightSemi
791                | JoinType::RightMark
792        )
793    }
794}
795
796/// Calculate indices by join type.
797///
798/// This method returns a tuple of two arrays: build and probe indices.
799/// The length of both arrays will be the same.
800///
801/// # Arguments
802///
803/// * `build_side`: Join side which defines the build side.
804/// * `prune_length`: Length of the prune data.
805/// * `visited_rows`: Hash set of visited rows of the build side.
806/// * `deleted_offset`: Deleted offset of the build side.
807/// * `join_type`: The type of join to be performed.
808///
809/// # Returns
810///
811/// A tuple of two arrays of primitive types representing the build and probe indices.
812fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: ArrowPrimitiveType>(
813    build_side: JoinSide,
814    prune_length: usize,
815    visited_rows: &HashSet<usize>,
816    deleted_offset: usize,
817    join_type: JoinType,
818) -> Result<(PrimitiveArray<L>, PrimitiveArray<R>)>
819where
820    NativeAdapter<L>: From<<L as ArrowPrimitiveType>::Native>,
821{
822    // Store the result in a tuple
823    let result = match (build_side, join_type) {
824        // For a mark join we “mark” each build‐side row with a dummy 0 in the probe‐side index
825        // if it ever matched. For example, if
826        //
827        // prune_length = 5
828        // deleted_offset = 0
829        // visited_rows = {1, 3}
830        //
831        // then we produce:
832        //
833        // build_indices = [0, 1, 2, 3, 4]
834        // probe_indices = [None, Some(0), None, Some(0), None]
835        //
836        // Example: for each build row i in [0..5):
837        //   – We always output its own index i in `build_indices`
838        //   – We output `Some(0)` in `probe_indices[i]` if row i was ever visited, else `None`
839        (JoinSide::Left, JoinType::LeftMark) => {
840            let build_indices = (0..prune_length)
841                .map(L::Native::from_usize)
842                .collect::<PrimitiveArray<L>>();
843            let probe_indices = (0..prune_length)
844                .map(|idx| {
845                    // For mark join we output a dummy index 0 to indicate the row had a match
846                    visited_rows
847                        .contains(&(idx + deleted_offset))
848                        .then_some(R::Native::from_usize(0).unwrap())
849                })
850                .collect();
851            (build_indices, probe_indices)
852        }
853        (JoinSide::Right, JoinType::RightMark) => {
854            let build_indices = (0..prune_length)
855                .map(L::Native::from_usize)
856                .collect::<PrimitiveArray<L>>();
857            let probe_indices = (0..prune_length)
858                .map(|idx| {
859                    // For mark join we output a dummy index 0 to indicate the row had a match
860                    visited_rows
861                        .contains(&(idx + deleted_offset))
862                        .then_some(R::Native::from_usize(0).unwrap())
863                })
864                .collect();
865            (build_indices, probe_indices)
866        }
867        // In the case of `Left` or `Right` join, or `Full` join, get the anti indices
868        (JoinSide::Left, JoinType::Left | JoinType::LeftAnti)
869        | (JoinSide::Right, JoinType::Right | JoinType::RightAnti)
870        | (_, JoinType::Full) => {
871            let build_unmatched_indices =
872                get_pruning_anti_indices(prune_length, deleted_offset, visited_rows);
873            let mut builder =
874                PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
875            builder.append_nulls(build_unmatched_indices.len());
876            let probe_indices = builder.finish();
877            (build_unmatched_indices, probe_indices)
878        }
879        // In the case of `LeftSemi` or `RightSemi` join, get the semi indices
880        (JoinSide::Left, JoinType::LeftSemi) | (JoinSide::Right, JoinType::RightSemi) => {
881            let build_unmatched_indices =
882                get_pruning_semi_indices(prune_length, deleted_offset, visited_rows);
883            let mut builder =
884                PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
885            builder.append_nulls(build_unmatched_indices.len());
886            let probe_indices = builder.finish();
887            (build_unmatched_indices, probe_indices)
888        }
889        // The case of other join types is not considered
890        _ => unreachable!(),
891    };
892    Ok(result)
893}
894
895/// This function produces unmatched record results based on the build side,
896/// join type and other parameters.
897///
898/// The method uses first `prune_length` rows from the build side input buffer
899/// to produce results.
900///
901/// # Arguments
902///
903/// * `output_schema` - The schema of the final output record batch.
904/// * `prune_length` - The length of the determined prune length.
905/// * `probe_schema` - The schema of the probe [RecordBatch].
906/// * `join_type` - The type of join to be performed.
907/// * `column_indices` - Indices of columns that are being joined.
908///
909/// # Returns
910///
911/// * `Option<RecordBatch>` - The final output record batch if required, otherwise [None].
912pub(crate) fn build_side_determined_results(
913    build_hash_joiner: &OneSideHashJoiner,
914    output_schema: &SchemaRef,
915    prune_length: usize,
916    probe_schema: SchemaRef,
917    join_type: JoinType,
918    column_indices: &[ColumnIndex],
919) -> Result<Option<RecordBatch>> {
920    // Check if we need to produce a result in the final output:
921    if prune_length > 0
922        && need_to_produce_result_in_final(build_hash_joiner.build_side, join_type)
923    {
924        // Calculate the indices for build and probe sides based on join type and build side:
925        let (build_indices, probe_indices) = calculate_indices_by_join_type(
926            build_hash_joiner.build_side,
927            prune_length,
928            &build_hash_joiner.visited_rows,
929            build_hash_joiner.deleted_offset,
930            join_type,
931        )?;
932
933        // Create an empty probe record batch:
934        let empty_probe_batch = RecordBatch::new_empty(probe_schema);
935        // Build the final result from the indices of build and probe sides:
936        build_batch_from_indices(
937            output_schema.as_ref(),
938            &build_hash_joiner.input_buffer,
939            &empty_probe_batch,
940            &build_indices,
941            &probe_indices,
942            column_indices,
943            build_hash_joiner.build_side,
944            join_type,
945        )
946        .map(|batch| (batch.num_rows() > 0).then_some(batch))
947    } else {
948        // If we don't need to produce a result, return None
949        Ok(None)
950    }
951}
952
953/// This method performs a join between the build side input buffer and the probe side batch.
954///
955/// # Arguments
956///
957/// * `build_hash_joiner` - Build side hash joiner
958/// * `probe_hash_joiner` - Probe side hash joiner
959/// * `schema` - A reference to the schema of the output record batch.
960/// * `join_type` - The type of join to be performed.
961/// * `on_probe` - An array of columns on which the join will be performed. The columns are from the probe side of the join.
962/// * `filter` - An optional filter on the join condition.
963/// * `probe_batch` - The second record batch to be joined.
964/// * `column_indices` - An array of columns to be selected for the result of the join.
965/// * `random_state` - The random state for the join.
966/// * `null_equality` - Indicates whether NULL values should be treated as equal when joining.
967///
968/// # Returns
969///
970/// A [Result] containing an optional record batch if the join type is not one of `LeftAnti`, `RightAnti`, `LeftSemi` or `RightSemi`.
971/// If the join type is one of the above four, the function will return [None].
972#[expect(clippy::too_many_arguments)]
973pub(crate) fn join_with_probe_batch(
974    build_hash_joiner: &mut OneSideHashJoiner,
975    probe_hash_joiner: &mut OneSideHashJoiner,
976    schema: &SchemaRef,
977    join_type: JoinType,
978    filter: Option<&JoinFilter>,
979    probe_batch: &RecordBatch,
980    column_indices: &[ColumnIndex],
981    random_state: &RandomState,
982    null_equality: NullEquality,
983) -> Result<Option<RecordBatch>> {
984    if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
985        return Ok(None);
986    }
987    let (build_indices, probe_indices) = lookup_join_hashmap(
988        &build_hash_joiner.hashmap,
989        &build_hash_joiner.input_buffer,
990        probe_batch,
991        &build_hash_joiner.on,
992        &probe_hash_joiner.on,
993        random_state,
994        null_equality,
995        &mut build_hash_joiner.hashes_buffer,
996        Some(build_hash_joiner.deleted_offset),
997    )?;
998
999    let (build_indices, probe_indices) = if let Some(filter) = filter {
1000        apply_join_filter_to_indices(
1001            &build_hash_joiner.input_buffer,
1002            probe_batch,
1003            build_indices,
1004            probe_indices,
1005            filter,
1006            build_hash_joiner.build_side,
1007            None,
1008            join_type,
1009        )?
1010    } else {
1011        (build_indices, probe_indices)
1012    };
1013
1014    if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
1015        record_visited_indices(
1016            &mut build_hash_joiner.visited_rows,
1017            build_hash_joiner.deleted_offset,
1018            &build_indices,
1019        );
1020    }
1021    if need_to_produce_result_in_final(build_hash_joiner.build_side.negate(), join_type) {
1022        record_visited_indices(
1023            &mut probe_hash_joiner.visited_rows,
1024            probe_hash_joiner.offset,
1025            &probe_indices,
1026        );
1027    }
1028    if matches!(
1029        join_type,
1030        JoinType::LeftAnti
1031            | JoinType::RightAnti
1032            | JoinType::LeftSemi
1033            | JoinType::LeftMark
1034            | JoinType::RightSemi
1035            | JoinType::RightMark
1036    ) {
1037        Ok(None)
1038    } else {
1039        build_batch_from_indices(
1040            schema,
1041            &build_hash_joiner.input_buffer,
1042            probe_batch,
1043            &build_indices,
1044            &probe_indices,
1045            column_indices,
1046            build_hash_joiner.build_side,
1047            join_type,
1048        )
1049        .map(|batch| (batch.num_rows() > 0).then_some(batch))
1050    }
1051}
1052
1053/// This method performs lookups against JoinHashMap by hash values of join-key columns, and handles potential
1054/// hash collisions.
1055///
1056/// # Arguments
1057///
1058/// * `build_hashmap` - hashmap collected from build side data.
1059/// * `build_batch` - Build side record batch.
1060/// * `probe_batch` - Probe side record batch.
1061/// * `build_on` - An array of columns on which the join will be performed. The columns are from the build side of the join.
1062/// * `probe_on` - An array of columns on which the join will be performed. The columns are from the probe side of the join.
1063/// * `random_state` - The random state for the join.
1064/// * `null_equality` - Indicates whether NULL values should be treated as equal when joining.
1065/// * `hashes_buffer` - Buffer used for probe side keys hash calculation.
1066/// * `deleted_offset` - deleted offset for build side data.
1067///
1068/// # Returns
1069///
1070/// A [Result] containing a tuple with two equal length arrays, representing indices of rows from build and probe side,
1071/// matched by join key columns.
1072#[expect(clippy::too_many_arguments)]
1073fn lookup_join_hashmap(
1074    build_hashmap: &PruningJoinHashMap,
1075    build_batch: &RecordBatch,
1076    probe_batch: &RecordBatch,
1077    build_on: &[PhysicalExprRef],
1078    probe_on: &[PhysicalExprRef],
1079    random_state: &RandomState,
1080    null_equality: NullEquality,
1081    hashes_buffer: &mut Vec<u64>,
1082    deleted_offset: Option<usize>,
1083) -> Result<(UInt64Array, UInt32Array)> {
1084    let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
1085    let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;
1086
1087    hashes_buffer.clear();
1088    hashes_buffer.resize(probe_batch.num_rows(), 0);
1089    let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
1090
1091    // As SymmetricHashJoin uses LIFO JoinHashMap, the chained list algorithm
1092    // will return build indices for each probe row in a reverse order as such:
1093    // Build Indices: [5, 4, 3]
1094    // Probe Indices: [1, 1, 1]
1095    //
1096    // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side.
1097    // Let's consider probe rows [0,1] as an example:
1098    //
1099    // When the probe iteration sequence is reversed, the following pairings can be derived:
1100    //
1101    // For probe row 1:
1102    //     (5, 1)
1103    //     (4, 1)
1104    //     (3, 1)
1105    //
1106    // For probe row 0:
1107    //     (5, 0)
1108    //     (4, 0)
1109    //     (3, 0)
1110    //
1111    // After reversing both sets of indices, we obtain reversed indices:
1112    //
1113    //     (3,0)
1114    //     (4,0)
1115    //     (5,0)
1116    //     (3,1)
1117    //     (4,1)
1118    //     (5,1)
1119    //
1120    // With this approach, the lexicographic order on both the probe side and the build side is preserved.
1121    let (mut matched_probe, mut matched_build) = build_hashmap.get_matched_indices(
1122        Box::new(hash_values.iter().enumerate().rev()),
1123        deleted_offset,
1124    );
1125
1126    matched_probe.reverse();
1127    matched_build.reverse();
1128
1129    let build_indices: UInt64Array = matched_build.into();
1130    let probe_indices: UInt32Array = matched_probe.into();
1131
1132    let (build_indices, probe_indices) = equal_rows_arr(
1133        &build_indices,
1134        &probe_indices,
1135        &build_join_values,
1136        &keys_values,
1137        null_equality,
1138    )?;
1139
1140    Ok((build_indices, probe_indices))
1141}
1142
1143pub struct OneSideHashJoiner {
1144    /// Build side
1145    build_side: JoinSide,
1146    /// Input record batch buffer
1147    pub input_buffer: RecordBatch,
1148    /// Columns from the side
1149    pub(crate) on: Vec<PhysicalExprRef>,
1150    /// Hashmap
1151    pub(crate) hashmap: PruningJoinHashMap,
1152    /// Reuse the hashes buffer
1153    pub(crate) hashes_buffer: Vec<u64>,
1154    /// Matched rows
1155    pub(crate) visited_rows: HashSet<usize>,
1156    /// Offset
1157    pub(crate) offset: usize,
1158    /// Deleted offset
1159    pub(crate) deleted_offset: usize,
1160}
1161
1162impl OneSideHashJoiner {
1163    pub fn size(&self) -> usize {
1164        let mut size = 0;
1165        size += size_of_val(self);
1166        size += size_of_val(&self.build_side);
1167        size += self.input_buffer.get_array_memory_size();
1168        size += size_of_val(&self.on);
1169        size += self.hashmap.size();
1170        size += self.hashes_buffer.capacity() * size_of::<u64>();
1171        size += self.visited_rows.capacity() * size_of::<usize>();
1172        size += size_of_val(&self.offset);
1173        size += size_of_val(&self.deleted_offset);
1174        size
1175    }
1176    pub fn new(
1177        build_side: JoinSide,
1178        on: Vec<PhysicalExprRef>,
1179        schema: SchemaRef,
1180    ) -> Self {
1181        Self {
1182            build_side,
1183            input_buffer: RecordBatch::new_empty(schema),
1184            on,
1185            hashmap: PruningJoinHashMap::with_capacity(0),
1186            hashes_buffer: vec![],
1187            visited_rows: HashSet::new(),
1188            offset: 0,
1189            deleted_offset: 0,
1190        }
1191    }
1192
1193    /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch.
1194    ///
1195    /// # Arguments
1196    ///
1197    /// * `batch` - The incoming [RecordBatch] to be merged with the internal input buffer
1198    /// * `random_state` - The random state used to hash values
1199    ///
1200    /// # Returns
1201    ///
1202    /// Returns a [Result] encapsulating any intermediate errors.
1203    pub(crate) fn update_internal_state(
1204        &mut self,
1205        batch: &RecordBatch,
1206        random_state: &RandomState,
1207    ) -> Result<()> {
1208        // Merge the incoming batch with the existing input buffer:
1209        self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
1210        // Resize the hashes buffer to the number of rows in the incoming batch:
1211        self.hashes_buffer.resize(batch.num_rows(), 0);
1212        // Get allocation_info before adding the item
1213        // Update the hashmap with the join key values and hashes of the incoming batch:
1214        update_hash(
1215            &self.on,
1216            batch,
1217            &mut self.hashmap,
1218            self.offset,
1219            random_state,
1220            &mut self.hashes_buffer,
1221            self.deleted_offset,
1222            false,
1223        )?;
1224        Ok(())
1225    }
1226
1227    /// Calculate prune length.
1228    ///
1229    /// # Arguments
1230    ///
1231    /// * `build_side_sorted_filter_expr` - Build side mutable sorted filter expression..
1232    /// * `probe_side_sorted_filter_expr` - Probe side mutable sorted filter expression.
1233    /// * `graph` - A mutable reference to the physical expression graph.
1234    ///
1235    /// # Returns
1236    ///
1237    /// A Result object that contains the pruning length.
1238    pub(crate) fn calculate_prune_length_with_probe_batch(
1239        &mut self,
1240        build_side_sorted_filter_expr: &mut SortedFilterExpr,
1241        probe_side_sorted_filter_expr: &mut SortedFilterExpr,
1242        graph: &mut ExprIntervalGraph,
1243    ) -> Result<usize> {
1244        // Return early if the input buffer is empty:
1245        if self.input_buffer.num_rows() == 0 {
1246            return Ok(0);
1247        }
1248        // Process the build and probe side sorted filter expressions if both are present:
1249        // Collect the sorted filter expressions into a vector of (node_index, interval) tuples:
1250        let mut filter_intervals = vec![];
1251        for expr in [
1252            &build_side_sorted_filter_expr,
1253            &probe_side_sorted_filter_expr,
1254        ] {
1255            filter_intervals.push((expr.node_index(), expr.interval().clone()))
1256        }
1257        // Update the physical expression graph using the join filter intervals:
1258        graph.update_ranges(&mut filter_intervals, Interval::TRUE)?;
1259        // Extract the new join filter interval for the build side:
1260        let calculated_build_side_interval = filter_intervals.remove(0).1;
1261        // If the intervals have not changed, return early without pruning:
1262        if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
1263            return Ok(0);
1264        }
1265        // Update the build side interval and determine the pruning length:
1266        build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
1267
1268        determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
1269    }
1270
1271    pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> {
1272        // Prune the hash values:
1273        self.hashmap.prune_hash_values(
1274            prune_length,
1275            self.deleted_offset as u64,
1276            HASHMAP_SHRINK_SCALE_FACTOR,
1277        );
1278        // Remove pruned rows from the visited rows set:
1279        for row in self.deleted_offset..(self.deleted_offset + prune_length) {
1280            self.visited_rows.remove(&row);
1281        }
1282        // Update the input buffer after pruning:
1283        self.input_buffer = self
1284            .input_buffer
1285            .slice(prune_length, self.input_buffer.num_rows() - prune_length);
1286        // Increment the deleted offset:
1287        self.deleted_offset += prune_length;
1288        Ok(())
1289    }
1290}
1291
1292/// `SymmetricHashJoinStream` manages incremental join operations between two
1293/// streams. Unlike traditional join approaches that need to scan one side of
1294/// the join fully before proceeding, `SymmetricHashJoinStream` facilitates
1295/// more dynamic join operations by working with streams as they emit data. This
1296/// approach allows for more efficient processing, particularly in scenarios
1297/// where waiting for complete data materialization is not feasible or optimal.
1298/// The trait provides a framework for handling various states of such a join
1299/// process, ensuring that join logic is efficiently executed as data becomes
1300/// available from either stream.
1301///
1302/// This implementation performs eager joins of data from two different asynchronous
1303/// streams, typically referred to as left and right streams. The implementation
1304/// provides a comprehensive set of methods to control and execute the join
1305/// process, leveraging the states defined in `SHJStreamState`. Methods are
1306/// primarily focused on asynchronously fetching data batches from each stream,
1307/// processing them, and managing transitions between various states of the join.
1308///
1309/// This implementations use a state machine approach to navigate different
1310/// stages of the join operation, handling data from both streams and determining
1311/// when the join completes.
1312///
1313/// State Transitions:
1314/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
1315///   - In `fetch_next_from_left_stream`, when fetching a batch from the left stream:
1316///     - On success (`Some(Ok(batch))`), state transitions to `PullRight` for
1317///       processing the batch.
1318///     - On error (`Some(Err(e))`), the error is returned, and the state remains
1319///       unchanged.
1320///     - On no data (`None`), state changes to `LeftExhausted`, returning `Continue`
1321///       to proceed with the join process.
1322/// - From `PullRight` to `PullLeft` or `RightExhausted`:
1323///   - In `fetch_next_from_right_stream`, when fetching from the right stream:
1324///     - If a batch is available, state changes to `PullLeft` for processing.
1325///     - On error, the error is returned without changing the state.
1326///     - If right stream is exhausted (`None`), state transitions to `RightExhausted`,
1327///       with a `Continue` result.
1328/// - Handling `RightExhausted` and `LeftExhausted`:
1329///   - Methods `handle_right_stream_end` and `handle_left_stream_end` manage scenarios
1330///     when streams are exhausted:
1331///     - They attempt to continue processing with the other stream.
1332///     - If both streams are exhausted, state changes to `BothExhausted { final_result: false }`.
1333/// - Transition to `BothExhausted { final_result: true }`:
1334///   - Occurs in `prepare_for_final_results_after_exhaustion` when both streams are
1335///     exhausted, indicating completion of processing and availability of final results.
1336impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
1337    /// Implements the main polling logic for the join stream.
1338    ///
1339    /// This method continuously checks the state of the join stream and
1340    /// acts accordingly by delegating the handling to appropriate sub-methods
1341    /// depending on the current state.
1342    ///
1343    /// # Arguments
1344    ///
1345    /// * `cx` - A context that facilitates cooperative non-blocking execution within a task.
1346    ///
1347    /// # Returns
1348    ///
1349    /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a `RecordBatch` or None.
1350    fn poll_next_impl(
1351        &mut self,
1352        cx: &mut Context<'_>,
1353    ) -> Poll<Option<Result<RecordBatch>>> {
1354        loop {
1355            match self.batch_transformer.next() {
1356                None => {
1357                    let result = match self.state() {
1358                        SHJStreamState::PullRight => {
1359                            ready!(self.fetch_next_from_right_stream(cx))
1360                        }
1361                        SHJStreamState::PullLeft => {
1362                            ready!(self.fetch_next_from_left_stream(cx))
1363                        }
1364                        SHJStreamState::RightExhausted => {
1365                            ready!(self.handle_right_stream_end(cx))
1366                        }
1367                        SHJStreamState::LeftExhausted => {
1368                            ready!(self.handle_left_stream_end(cx))
1369                        }
1370                        SHJStreamState::BothExhausted {
1371                            final_result: false,
1372                        } => self.prepare_for_final_results_after_exhaustion(),
1373                        SHJStreamState::BothExhausted { final_result: true } => {
1374                            return Poll::Ready(None);
1375                        }
1376                    };
1377
1378                    match result? {
1379                        StatefulStreamResult::Ready(None) => {
1380                            return Poll::Ready(None);
1381                        }
1382                        StatefulStreamResult::Ready(Some(batch)) => {
1383                            self.batch_transformer.set_batch(batch);
1384                        }
1385                        _ => {}
1386                    }
1387                }
1388                Some((batch, _)) => {
1389                    return self
1390                        .metrics
1391                        .baseline_metrics
1392                        .record_poll(Poll::Ready(Some(Ok(batch))));
1393                }
1394            }
1395        }
1396    }
1397    /// Asynchronously pulls the next batch from the right stream.
1398    ///
1399    /// This default implementation checks for the next value in the right stream.
1400    /// If a batch is found, the state is switched to `PullLeft`, and the batch handling
1401    /// is delegated to `process_batch_from_right`. If the stream ends, the state is set to `RightExhausted`.
1402    ///
1403    /// # Returns
1404    ///
1405    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state result after pulling the batch.
1406    fn fetch_next_from_right_stream(
1407        &mut self,
1408        cx: &mut Context<'_>,
1409    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1410        match ready!(self.right_stream().poll_next_unpin(cx)) {
1411            Some(Ok(batch)) => {
1412                if batch.num_rows() == 0 {
1413                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
1414                }
1415                self.set_state(SHJStreamState::PullLeft);
1416                Poll::Ready(self.process_batch_from_right(&batch))
1417            }
1418            Some(Err(e)) => Poll::Ready(Err(e)),
1419            None => {
1420                self.set_state(SHJStreamState::RightExhausted);
1421                Poll::Ready(Ok(StatefulStreamResult::Continue))
1422            }
1423        }
1424    }
1425
1426    /// Asynchronously pulls the next batch from the left stream.
1427    ///
1428    /// This default implementation checks for the next value in the left stream.
1429    /// If a batch is found, the state is switched to `PullRight`, and the batch handling
1430    /// is delegated to `process_batch_from_left`. If the stream ends, the state is set to `LeftExhausted`.
1431    ///
1432    /// # Returns
1433    ///
1434    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state result after pulling the batch.
1435    fn fetch_next_from_left_stream(
1436        &mut self,
1437        cx: &mut Context<'_>,
1438    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1439        match ready!(self.left_stream().poll_next_unpin(cx)) {
1440            Some(Ok(batch)) => {
1441                if batch.num_rows() == 0 {
1442                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
1443                }
1444                self.set_state(SHJStreamState::PullRight);
1445                Poll::Ready(self.process_batch_from_left(&batch))
1446            }
1447            Some(Err(e)) => Poll::Ready(Err(e)),
1448            None => {
1449                self.set_state(SHJStreamState::LeftExhausted);
1450                Poll::Ready(Ok(StatefulStreamResult::Continue))
1451            }
1452        }
1453    }
1454
1455    /// Asynchronously handles the scenario when the right stream is exhausted.
1456    ///
1457    /// In this default implementation, when the right stream is exhausted, it attempts
1458    /// to pull from the left stream. If a batch is found in the left stream, it delegates
1459    /// the handling to `process_batch_from_left`. If both streams are exhausted, the state is set
1460    /// to indicate both streams are exhausted without final results yet.
1461    ///
1462    /// # Returns
1463    ///
1464    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state result after checking the exhaustion state.
1465    fn handle_right_stream_end(
1466        &mut self,
1467        cx: &mut Context<'_>,
1468    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1469        match ready!(self.left_stream().poll_next_unpin(cx)) {
1470            Some(Ok(batch)) => {
1471                if batch.num_rows() == 0 {
1472                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
1473                }
1474                Poll::Ready(self.process_batch_after_right_end(&batch))
1475            }
1476            Some(Err(e)) => Poll::Ready(Err(e)),
1477            None => {
1478                self.set_state(SHJStreamState::BothExhausted {
1479                    final_result: false,
1480                });
1481                Poll::Ready(Ok(StatefulStreamResult::Continue))
1482            }
1483        }
1484    }
1485
1486    /// Asynchronously handles the scenario when the left stream is exhausted.
1487    ///
1488    /// When the left stream is exhausted, this default
1489    /// implementation tries to pull from the right stream and delegates the batch
1490    /// handling to `process_batch_after_left_end`. If both streams are exhausted, the state
1491    /// is updated to indicate so.
1492    ///
1493    /// # Returns
1494    ///
1495    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state result after checking the exhaustion state.
1496    fn handle_left_stream_end(
1497        &mut self,
1498        cx: &mut Context<'_>,
1499    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1500        match ready!(self.right_stream().poll_next_unpin(cx)) {
1501            Some(Ok(batch)) => {
1502                if batch.num_rows() == 0 {
1503                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
1504                }
1505                Poll::Ready(self.process_batch_after_left_end(&batch))
1506            }
1507            Some(Err(e)) => Poll::Ready(Err(e)),
1508            None => {
1509                self.set_state(SHJStreamState::BothExhausted {
1510                    final_result: false,
1511                });
1512                Poll::Ready(Ok(StatefulStreamResult::Continue))
1513            }
1514        }
1515    }
1516
1517    /// Handles the state when both streams are exhausted and final results are yet to be produced.
1518    ///
1519    /// This default implementation switches the state to indicate both streams are
1520    /// exhausted with final results and then invokes the handling for this specific
1521    /// scenario via `process_batches_before_finalization`.
1522    ///
1523    /// # Returns
1524    ///
1525    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state result after both streams are exhausted.
1526    fn prepare_for_final_results_after_exhaustion(
1527        &mut self,
1528    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1529        self.set_state(SHJStreamState::BothExhausted { final_result: true });
1530        self.process_batches_before_finalization()
1531    }
1532
1533    fn process_batch_from_right(
1534        &mut self,
1535        batch: &RecordBatch,
1536    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1537        self.perform_join_for_given_side(batch, JoinSide::Right)
1538            .map(|maybe_batch| {
1539                if maybe_batch.is_some() {
1540                    StatefulStreamResult::Ready(maybe_batch)
1541                } else {
1542                    StatefulStreamResult::Continue
1543                }
1544            })
1545    }
1546
1547    fn process_batch_from_left(
1548        &mut self,
1549        batch: &RecordBatch,
1550    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1551        self.perform_join_for_given_side(batch, JoinSide::Left)
1552            .map(|maybe_batch| {
1553                if maybe_batch.is_some() {
1554                    StatefulStreamResult::Ready(maybe_batch)
1555                } else {
1556                    StatefulStreamResult::Continue
1557                }
1558            })
1559    }
1560
1561    fn process_batch_after_left_end(
1562        &mut self,
1563        right_batch: &RecordBatch,
1564    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1565        self.process_batch_from_right(right_batch)
1566    }
1567
1568    fn process_batch_after_right_end(
1569        &mut self,
1570        left_batch: &RecordBatch,
1571    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1572        self.process_batch_from_left(left_batch)
1573    }
1574
1575    fn process_batches_before_finalization(
1576        &mut self,
1577    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1578        // Get the left side results:
1579        let left_result = build_side_determined_results(
1580            &self.left,
1581            &self.schema,
1582            self.left.input_buffer.num_rows(),
1583            self.right.input_buffer.schema(),
1584            self.join_type,
1585            &self.column_indices,
1586        )?;
1587        // Get the right side results:
1588        let right_result = build_side_determined_results(
1589            &self.right,
1590            &self.schema,
1591            self.right.input_buffer.num_rows(),
1592            self.left.input_buffer.schema(),
1593            self.join_type,
1594            &self.column_indices,
1595        )?;
1596
1597        // Combine the left and right results:
1598        let result = combine_two_batches(&self.schema, left_result, right_result)?;
1599
1600        // Return the result:
1601        if result.is_some() {
1602            return Ok(StatefulStreamResult::Ready(result));
1603        }
1604        Ok(StatefulStreamResult::Continue)
1605    }
1606
1607    fn right_stream(&mut self) -> &mut SendableRecordBatchStream {
1608        &mut self.right_stream
1609    }
1610
1611    fn left_stream(&mut self) -> &mut SendableRecordBatchStream {
1612        &mut self.left_stream
1613    }
1614
1615    fn set_state(&mut self, state: SHJStreamState) {
1616        self.state = state;
1617    }
1618
1619    fn state(&mut self) -> SHJStreamState {
1620        self.state.clone()
1621    }
1622
1623    fn size(&self) -> usize {
1624        let mut size = 0;
1625        size += size_of_val(&self.schema);
1626        size += size_of_val(&self.filter);
1627        size += size_of_val(&self.join_type);
1628        size += self.left.size();
1629        size += self.right.size();
1630        size += size_of_val(&self.column_indices);
1631        size += self.graph.as_ref().map(|g| g.size()).unwrap_or(0);
1632        size += size_of_val(&self.left_sorted_filter_expr);
1633        size += size_of_val(&self.right_sorted_filter_expr);
1634        size += size_of_val(&self.random_state);
1635        size += size_of_val(&self.null_equality);
1636        size += size_of_val(&self.metrics);
1637        size
1638    }
1639
1640    /// Performs a join operation for the specified `probe_side` (either left or right).
1641    /// This function:
1642    /// 1. Determines which side is the probe and which is the build side.
1643    /// 2. Updates metrics based on the batch that was polled.
1644    /// 3. Executes the join with the given `probe_batch`.
1645    /// 4. Optionally computes anti-join results if all conditions are met.
1646    /// 5. Combines the results and returns a combined batch or `None` if no batch was produced.
1647    fn perform_join_for_given_side(
1648        &mut self,
1649        probe_batch: &RecordBatch,
1650        probe_side: JoinSide,
1651    ) -> Result<Option<RecordBatch>> {
1652        let (
1653            probe_hash_joiner,
1654            build_hash_joiner,
1655            probe_side_sorted_filter_expr,
1656            build_side_sorted_filter_expr,
1657            probe_side_metrics,
1658        ) = if probe_side.eq(&JoinSide::Left) {
1659            (
1660                &mut self.left,
1661                &mut self.right,
1662                &mut self.left_sorted_filter_expr,
1663                &mut self.right_sorted_filter_expr,
1664                &mut self.metrics.left,
1665            )
1666        } else {
1667            (
1668                &mut self.right,
1669                &mut self.left,
1670                &mut self.right_sorted_filter_expr,
1671                &mut self.left_sorted_filter_expr,
1672                &mut self.metrics.right,
1673            )
1674        };
1675        // Update the metrics for the stream that was polled:
1676        probe_side_metrics.input_batches.add(1);
1677        probe_side_metrics.input_rows.add(probe_batch.num_rows());
1678        // Update the internal state of the hash joiner for the build side:
1679        probe_hash_joiner.update_internal_state(probe_batch, &self.random_state)?;
1680        // Join the two sides:
1681        let equal_result = join_with_probe_batch(
1682            build_hash_joiner,
1683            probe_hash_joiner,
1684            &self.schema,
1685            self.join_type,
1686            self.filter.as_ref(),
1687            probe_batch,
1688            &self.column_indices,
1689            &self.random_state,
1690            self.null_equality,
1691        )?;
1692        // Increment the offset for the probe hash joiner:
1693        probe_hash_joiner.offset += probe_batch.num_rows();
1694
1695        let anti_result = if let (
1696            Some(build_side_sorted_filter_expr),
1697            Some(probe_side_sorted_filter_expr),
1698            Some(graph),
1699        ) = (
1700            build_side_sorted_filter_expr.as_mut(),
1701            probe_side_sorted_filter_expr.as_mut(),
1702            self.graph.as_mut(),
1703        ) {
1704            // Calculate filter intervals:
1705            calculate_filter_expr_intervals(
1706                &build_hash_joiner.input_buffer,
1707                build_side_sorted_filter_expr,
1708                probe_batch,
1709                probe_side_sorted_filter_expr,
1710            )?;
1711            let prune_length = build_hash_joiner
1712                .calculate_prune_length_with_probe_batch(
1713                    build_side_sorted_filter_expr,
1714                    probe_side_sorted_filter_expr,
1715                    graph,
1716                )?;
1717            let result = build_side_determined_results(
1718                build_hash_joiner,
1719                &self.schema,
1720                prune_length,
1721                probe_batch.schema(),
1722                self.join_type,
1723                &self.column_indices,
1724            )?;
1725            build_hash_joiner.prune_internal_state(prune_length)?;
1726            result
1727        } else {
1728            None
1729        };
1730
1731        // Combine results:
1732        let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
1733        let capacity = self.size();
1734        self.metrics.stream_memory_usage.set(capacity);
1735        self.reservation.lock().try_resize(capacity)?;
1736        Ok(result)
1737    }
1738}
1739
1740/// Represents the various states of an symmetric hash join stream operation.
1741///
1742/// This enum is used to track the current state of streaming during a join
1743/// operation. It provides indicators as to which side of the join needs to be
1744/// pulled next or if one (or both) sides have been exhausted. This allows
1745/// for efficient management of resources and optimal performance during the
1746/// join process.
1747#[derive(Clone, Debug)]
1748pub enum SHJStreamState {
1749    /// Indicates that the next step should pull from the right side of the join.
1750    PullRight,
1751
1752    /// Indicates that the next step should pull from the left side of the join.
1753    PullLeft,
1754
1755    /// State representing that the right side of the join has been fully processed.
1756    RightExhausted,
1757
1758    /// State representing that the left side of the join has been fully processed.
1759    LeftExhausted,
1760
1761    /// Represents a state where both sides of the join are exhausted.
1762    ///
1763    /// The `final_result` field indicates whether the join operation has
1764    /// produced a final result or not.
1765    BothExhausted { final_result: bool },
1766}
1767
1768#[cfg(test)]
1769mod tests {
1770    use std::collections::HashMap;
1771    use std::sync::{LazyLock, Mutex};
1772
1773    use super::*;
1774    use crate::joins::test_utils::{
1775        build_sides_record_batches, compare_batches, complicated_filter,
1776        create_memory_table, join_expr_tests_fixture_f64, join_expr_tests_fixture_i32,
1777        join_expr_tests_fixture_temporal, partitioned_hash_join_with_filter,
1778        partitioned_sym_join_with_filter, split_record_batches,
1779    };
1780
1781    use arrow::compute::SortOptions;
1782    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
1783    use datafusion_common::ScalarValue;
1784    use datafusion_execution::config::SessionConfig;
1785    use datafusion_expr::Operator;
1786    use datafusion_physical_expr::expressions::{Column, binary, col, lit};
1787    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1788
1789    use rstest::*;
1790
1791    const TABLE_SIZE: i32 = 30;
1792
1793    type TableKey = (i32, i32, usize); // (cardinality.0, cardinality.1, batch_size)
1794    type TableValue = (Vec<RecordBatch>, Vec<RecordBatch>); // (left, right)
1795
1796    // Cache for storing tables
1797    static TABLE_CACHE: LazyLock<Mutex<HashMap<TableKey, TableValue>>> =
1798        LazyLock::new(|| Mutex::new(HashMap::new()));
1799
1800    fn get_or_create_table(
1801        cardinality: (i32, i32),
1802        batch_size: usize,
1803    ) -> Result<TableValue> {
1804        {
1805            let cache = TABLE_CACHE.lock().unwrap();
1806            if let Some(table) = cache.get(&(cardinality.0, cardinality.1, batch_size)) {
1807                return Ok(table.clone());
1808            }
1809        }
1810
1811        // If not, create the table
1812        let (left_batch, right_batch) =
1813            build_sides_record_batches(TABLE_SIZE, cardinality)?;
1814
1815        let (left_partition, right_partition) = (
1816            split_record_batches(&left_batch, batch_size)?,
1817            split_record_batches(&right_batch, batch_size)?,
1818        );
1819
1820        // Lock the cache again and store the table
1821        let mut cache = TABLE_CACHE.lock().unwrap();
1822
1823        // Store the table in the cache
1824        cache.insert(
1825            (cardinality.0, cardinality.1, batch_size),
1826            (left_partition.clone(), right_partition.clone()),
1827        );
1828
1829        Ok((left_partition, right_partition))
1830    }
1831
1832    pub async fn experiment(
1833        left: Arc<dyn ExecutionPlan>,
1834        right: Arc<dyn ExecutionPlan>,
1835        filter: Option<JoinFilter>,
1836        join_type: JoinType,
1837        on: JoinOn,
1838        task_ctx: Arc<TaskContext>,
1839    ) -> Result<()> {
1840        let first_batches = partitioned_sym_join_with_filter(
1841            Arc::clone(&left),
1842            Arc::clone(&right),
1843            on.clone(),
1844            filter.clone(),
1845            &join_type,
1846            NullEquality::NullEqualsNothing,
1847            Arc::clone(&task_ctx),
1848        )
1849        .await?;
1850        let second_batches = partitioned_hash_join_with_filter(
1851            left,
1852            right,
1853            on,
1854            filter,
1855            &join_type,
1856            NullEquality::NullEqualsNothing,
1857            task_ctx,
1858        )
1859        .await?;
1860        compare_batches(&first_batches, &second_batches);
1861        Ok(())
1862    }
1863
1864    #[rstest]
1865    #[tokio::test(flavor = "multi_thread")]
1866    async fn complex_join_all_one_ascending_numeric(
1867        #[values(
1868            JoinType::Inner,
1869            JoinType::Left,
1870            JoinType::Right,
1871            JoinType::RightSemi,
1872            JoinType::LeftSemi,
1873            JoinType::LeftAnti,
1874            JoinType::LeftMark,
1875            JoinType::RightAnti,
1876            JoinType::RightMark,
1877            JoinType::Full
1878        )]
1879        join_type: JoinType,
1880        #[values(
1881        (4, 5),
1882        (12, 17),
1883        )]
1884        cardinality: (i32, i32),
1885    ) -> Result<()> {
1886        // a + b > c + 10 AND a + b < c + 100
1887        let task_ctx = Arc::new(TaskContext::default());
1888
1889        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
1890
1891        let left_schema = &left_partition[0].schema();
1892        let right_schema = &right_partition[0].schema();
1893
1894        let left_sorted = [PhysicalSortExpr {
1895            expr: binary(
1896                col("la1", left_schema)?,
1897                Operator::Plus,
1898                col("la2", left_schema)?,
1899                left_schema,
1900            )?,
1901            options: SortOptions::default(),
1902        }]
1903        .into();
1904        let right_sorted = [PhysicalSortExpr {
1905            expr: col("ra1", right_schema)?,
1906            options: SortOptions::default(),
1907        }]
1908        .into();
1909        let (left, right) = create_memory_table(
1910            left_partition,
1911            right_partition,
1912            vec![left_sorted],
1913            vec![right_sorted],
1914        )?;
1915
1916        let on = vec![(
1917            binary(
1918                col("lc1", left_schema)?,
1919                Operator::Plus,
1920                lit(ScalarValue::Int32(Some(1))),
1921                left_schema,
1922            )?,
1923            Arc::new(Column::new_with_schema("rc1", right_schema)?) as _,
1924        )];
1925
1926        let intermediate_schema = Schema::new(vec![
1927            Field::new("0", DataType::Int32, true),
1928            Field::new("1", DataType::Int32, true),
1929            Field::new("2", DataType::Int32, true),
1930        ]);
1931        let filter_expr = complicated_filter(&intermediate_schema)?;
1932        let column_indices = vec![
1933            ColumnIndex {
1934                index: left_schema.index_of("la1")?,
1935                side: JoinSide::Left,
1936            },
1937            ColumnIndex {
1938                index: left_schema.index_of("la2")?,
1939                side: JoinSide::Left,
1940            },
1941            ColumnIndex {
1942                index: right_schema.index_of("ra1")?,
1943                side: JoinSide::Right,
1944            },
1945        ];
1946        let filter =
1947            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1948
1949        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1950        Ok(())
1951    }
1952
1953    #[rstest]
1954    #[tokio::test(flavor = "multi_thread")]
1955    async fn join_all_one_ascending_numeric(
1956        #[values(
1957            JoinType::Inner,
1958            JoinType::Left,
1959            JoinType::Right,
1960            JoinType::RightSemi,
1961            JoinType::LeftSemi,
1962            JoinType::LeftAnti,
1963            JoinType::LeftMark,
1964            JoinType::RightAnti,
1965            JoinType::RightMark,
1966            JoinType::Full
1967        )]
1968        join_type: JoinType,
1969        #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1970    ) -> Result<()> {
1971        let task_ctx = Arc::new(TaskContext::default());
1972        let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1973
1974        let left_schema = &left_partition[0].schema();
1975        let right_schema = &right_partition[0].schema();
1976
1977        let left_sorted = [PhysicalSortExpr {
1978            expr: col("la1", left_schema)?,
1979            options: SortOptions::default(),
1980        }]
1981        .into();
1982        let right_sorted = [PhysicalSortExpr {
1983            expr: col("ra1", right_schema)?,
1984            options: SortOptions::default(),
1985        }]
1986        .into();
1987        let (left, right) = create_memory_table(
1988            left_partition,
1989            right_partition,
1990            vec![left_sorted],
1991            vec![right_sorted],
1992        )?;
1993
1994        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1995
1996        let intermediate_schema = Schema::new(vec![
1997            Field::new("left", DataType::Int32, true),
1998            Field::new("right", DataType::Int32, true),
1999        ]);
2000        let filter_expr = join_expr_tests_fixture_i32(
2001            case_expr,
2002            col("left", &intermediate_schema)?,
2003            col("right", &intermediate_schema)?,
2004        );
2005        let column_indices = vec![
2006            ColumnIndex {
2007                index: 0,
2008                side: JoinSide::Left,
2009            },
2010            ColumnIndex {
2011                index: 0,
2012                side: JoinSide::Right,
2013            },
2014        ];
2015        let filter =
2016            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2017
2018        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2019        Ok(())
2020    }
2021
2022    #[rstest]
2023    #[tokio::test(flavor = "multi_thread")]
2024    async fn join_without_sort_information(
2025        #[values(
2026            JoinType::Inner,
2027            JoinType::Left,
2028            JoinType::Right,
2029            JoinType::RightSemi,
2030            JoinType::LeftSemi,
2031            JoinType::LeftAnti,
2032            JoinType::LeftMark,
2033            JoinType::RightAnti,
2034            JoinType::RightMark,
2035            JoinType::Full
2036        )]
2037        join_type: JoinType,
2038        #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2039    ) -> Result<()> {
2040        let task_ctx = Arc::new(TaskContext::default());
2041        let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
2042
2043        let left_schema = &left_partition[0].schema();
2044        let right_schema = &right_partition[0].schema();
2045        let (left, right) =
2046            create_memory_table(left_partition, right_partition, vec![], vec![])?;
2047
2048        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2049
2050        let intermediate_schema = Schema::new(vec![
2051            Field::new("left", DataType::Int32, true),
2052            Field::new("right", DataType::Int32, true),
2053        ]);
2054        let filter_expr = join_expr_tests_fixture_i32(
2055            case_expr,
2056            col("left", &intermediate_schema)?,
2057            col("right", &intermediate_schema)?,
2058        );
2059        let column_indices = vec![
2060            ColumnIndex {
2061                index: 5,
2062                side: JoinSide::Left,
2063            },
2064            ColumnIndex {
2065                index: 5,
2066                side: JoinSide::Right,
2067            },
2068        ];
2069        let filter =
2070            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2071
2072        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2073        Ok(())
2074    }
2075
2076    #[rstest]
2077    #[tokio::test(flavor = "multi_thread")]
2078    async fn join_without_filter(
2079        #[values(
2080            JoinType::Inner,
2081            JoinType::Left,
2082            JoinType::Right,
2083            JoinType::RightSemi,
2084            JoinType::LeftSemi,
2085            JoinType::LeftAnti,
2086            JoinType::LeftMark,
2087            JoinType::RightAnti,
2088            JoinType::RightMark,
2089            JoinType::Full
2090        )]
2091        join_type: JoinType,
2092    ) -> Result<()> {
2093        let task_ctx = Arc::new(TaskContext::default());
2094        let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2095        let left_schema = &left_partition[0].schema();
2096        let right_schema = &right_partition[0].schema();
2097        let (left, right) =
2098            create_memory_table(left_partition, right_partition, vec![], vec![])?;
2099
2100        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2101        experiment(left, right, None, join_type, on, task_ctx).await?;
2102        Ok(())
2103    }
2104
2105    #[rstest]
2106    #[tokio::test(flavor = "multi_thread")]
2107    async fn join_all_one_descending_numeric_particular(
2108        #[values(
2109            JoinType::Inner,
2110            JoinType::Left,
2111            JoinType::Right,
2112            JoinType::RightSemi,
2113            JoinType::LeftSemi,
2114            JoinType::LeftAnti,
2115            JoinType::LeftMark,
2116            JoinType::RightAnti,
2117            JoinType::RightMark,
2118            JoinType::Full
2119        )]
2120        join_type: JoinType,
2121        #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2122    ) -> Result<()> {
2123        let task_ctx = Arc::new(TaskContext::default());
2124        let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2125        let left_schema = &left_partition[0].schema();
2126        let right_schema = &right_partition[0].schema();
2127        let left_sorted = [PhysicalSortExpr {
2128            expr: col("la1_des", left_schema)?,
2129            options: SortOptions {
2130                descending: true,
2131                nulls_first: true,
2132            },
2133        }]
2134        .into();
2135        let right_sorted = [PhysicalSortExpr {
2136            expr: col("ra1_des", right_schema)?,
2137            options: SortOptions {
2138                descending: true,
2139                nulls_first: true,
2140            },
2141        }]
2142        .into();
2143        let (left, right) = create_memory_table(
2144            left_partition,
2145            right_partition,
2146            vec![left_sorted],
2147            vec![right_sorted],
2148        )?;
2149
2150        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2151
2152        let intermediate_schema = Schema::new(vec![
2153            Field::new("left", DataType::Int32, true),
2154            Field::new("right", DataType::Int32, true),
2155        ]);
2156        let filter_expr = join_expr_tests_fixture_i32(
2157            case_expr,
2158            col("left", &intermediate_schema)?,
2159            col("right", &intermediate_schema)?,
2160        );
2161        let column_indices = vec![
2162            ColumnIndex {
2163                index: 5,
2164                side: JoinSide::Left,
2165            },
2166            ColumnIndex {
2167                index: 5,
2168                side: JoinSide::Right,
2169            },
2170        ];
2171        let filter =
2172            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2173
2174        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2175        Ok(())
2176    }
2177
2178    #[tokio::test(flavor = "multi_thread")]
2179    async fn build_null_columns_first() -> Result<()> {
2180        let join_type = JoinType::Full;
2181        let case_expr = 1;
2182        let session_config = SessionConfig::new().with_repartition_joins(false);
2183        let task_ctx = TaskContext::default().with_session_config(session_config);
2184        let task_ctx = Arc::new(task_ctx);
2185        let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2186        let left_schema = &left_partition[0].schema();
2187        let right_schema = &right_partition[0].schema();
2188        let left_sorted = [PhysicalSortExpr {
2189            expr: col("l_asc_null_first", left_schema)?,
2190            options: SortOptions {
2191                descending: false,
2192                nulls_first: true,
2193            },
2194        }]
2195        .into();
2196        let right_sorted = [PhysicalSortExpr {
2197            expr: col("r_asc_null_first", right_schema)?,
2198            options: SortOptions {
2199                descending: false,
2200                nulls_first: true,
2201            },
2202        }]
2203        .into();
2204        let (left, right) = create_memory_table(
2205            left_partition,
2206            right_partition,
2207            vec![left_sorted],
2208            vec![right_sorted],
2209        )?;
2210
2211        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2212
2213        let intermediate_schema = Schema::new(vec![
2214            Field::new("left", DataType::Int32, true),
2215            Field::new("right", DataType::Int32, true),
2216        ]);
2217        let filter_expr = join_expr_tests_fixture_i32(
2218            case_expr,
2219            col("left", &intermediate_schema)?,
2220            col("right", &intermediate_schema)?,
2221        );
2222        let column_indices = vec![
2223            ColumnIndex {
2224                index: 6,
2225                side: JoinSide::Left,
2226            },
2227            ColumnIndex {
2228                index: 6,
2229                side: JoinSide::Right,
2230            },
2231        ];
2232        let filter =
2233            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2234        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2235        Ok(())
2236    }
2237
2238    #[tokio::test(flavor = "multi_thread")]
2239    async fn build_null_columns_last() -> Result<()> {
2240        let join_type = JoinType::Full;
2241        let case_expr = 1;
2242        let session_config = SessionConfig::new().with_repartition_joins(false);
2243        let task_ctx = TaskContext::default().with_session_config(session_config);
2244        let task_ctx = Arc::new(task_ctx);
2245        let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2246
2247        let left_schema = &left_partition[0].schema();
2248        let right_schema = &right_partition[0].schema();
2249        let left_sorted = [PhysicalSortExpr {
2250            expr: col("l_asc_null_last", left_schema)?,
2251            options: SortOptions {
2252                descending: false,
2253                nulls_first: false,
2254            },
2255        }]
2256        .into();
2257        let right_sorted = [PhysicalSortExpr {
2258            expr: col("r_asc_null_last", right_schema)?,
2259            options: SortOptions {
2260                descending: false,
2261                nulls_first: false,
2262            },
2263        }]
2264        .into();
2265        let (left, right) = create_memory_table(
2266            left_partition,
2267            right_partition,
2268            vec![left_sorted],
2269            vec![right_sorted],
2270        )?;
2271
2272        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2273
2274        let intermediate_schema = Schema::new(vec![
2275            Field::new("left", DataType::Int32, true),
2276            Field::new("right", DataType::Int32, true),
2277        ]);
2278        let filter_expr = join_expr_tests_fixture_i32(
2279            case_expr,
2280            col("left", &intermediate_schema)?,
2281            col("right", &intermediate_schema)?,
2282        );
2283        let column_indices = vec![
2284            ColumnIndex {
2285                index: 7,
2286                side: JoinSide::Left,
2287            },
2288            ColumnIndex {
2289                index: 7,
2290                side: JoinSide::Right,
2291            },
2292        ];
2293        let filter =
2294            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2295
2296        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2297        Ok(())
2298    }
2299
2300    #[tokio::test(flavor = "multi_thread")]
2301    async fn build_null_columns_first_descending() -> Result<()> {
2302        let join_type = JoinType::Full;
2303        let cardinality = (10, 11);
2304        let case_expr = 1;
2305        let session_config = SessionConfig::new().with_repartition_joins(false);
2306        let task_ctx = TaskContext::default().with_session_config(session_config);
2307        let task_ctx = Arc::new(task_ctx);
2308        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2309
2310        let left_schema = &left_partition[0].schema();
2311        let right_schema = &right_partition[0].schema();
2312        let left_sorted = [PhysicalSortExpr {
2313            expr: col("l_desc_null_first", left_schema)?,
2314            options: SortOptions {
2315                descending: true,
2316                nulls_first: true,
2317            },
2318        }]
2319        .into();
2320        let right_sorted = [PhysicalSortExpr {
2321            expr: col("r_desc_null_first", right_schema)?,
2322            options: SortOptions {
2323                descending: true,
2324                nulls_first: true,
2325            },
2326        }]
2327        .into();
2328        let (left, right) = create_memory_table(
2329            left_partition,
2330            right_partition,
2331            vec![left_sorted],
2332            vec![right_sorted],
2333        )?;
2334
2335        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2336
2337        let intermediate_schema = Schema::new(vec![
2338            Field::new("left", DataType::Int32, true),
2339            Field::new("right", DataType::Int32, true),
2340        ]);
2341        let filter_expr = join_expr_tests_fixture_i32(
2342            case_expr,
2343            col("left", &intermediate_schema)?,
2344            col("right", &intermediate_schema)?,
2345        );
2346        let column_indices = vec![
2347            ColumnIndex {
2348                index: 8,
2349                side: JoinSide::Left,
2350            },
2351            ColumnIndex {
2352                index: 8,
2353                side: JoinSide::Right,
2354            },
2355        ];
2356        let filter =
2357            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2358
2359        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2360        Ok(())
2361    }
2362
2363    #[tokio::test(flavor = "multi_thread")]
2364    async fn complex_join_all_one_ascending_numeric_missing_stat() -> Result<()> {
2365        let cardinality = (3, 4);
2366        let join_type = JoinType::Full;
2367
2368        // a + b > c + 10 AND a + b < c + 100
2369        let session_config = SessionConfig::new().with_repartition_joins(false);
2370        let task_ctx = TaskContext::default().with_session_config(session_config);
2371        let task_ctx = Arc::new(task_ctx);
2372        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2373
2374        let left_schema = &left_partition[0].schema();
2375        let right_schema = &right_partition[0].schema();
2376        let left_sorted = [PhysicalSortExpr {
2377            expr: col("la1", left_schema)?,
2378            options: SortOptions::default(),
2379        }]
2380        .into();
2381        let right_sorted = [PhysicalSortExpr {
2382            expr: col("ra1", right_schema)?,
2383            options: SortOptions::default(),
2384        }]
2385        .into();
2386        let (left, right) = create_memory_table(
2387            left_partition,
2388            right_partition,
2389            vec![left_sorted],
2390            vec![right_sorted],
2391        )?;
2392
2393        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2394
2395        let intermediate_schema = Schema::new(vec![
2396            Field::new("0", DataType::Int32, true),
2397            Field::new("1", DataType::Int32, true),
2398            Field::new("2", DataType::Int32, true),
2399        ]);
2400        let filter_expr = complicated_filter(&intermediate_schema)?;
2401        let column_indices = vec![
2402            ColumnIndex {
2403                index: 0,
2404                side: JoinSide::Left,
2405            },
2406            ColumnIndex {
2407                index: 4,
2408                side: JoinSide::Left,
2409            },
2410            ColumnIndex {
2411                index: 0,
2412                side: JoinSide::Right,
2413            },
2414        ];
2415        let filter =
2416            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2417
2418        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2419        Ok(())
2420    }
2421
2422    #[tokio::test(flavor = "multi_thread")]
2423    async fn complex_join_all_one_ascending_equivalence() -> Result<()> {
2424        let cardinality = (3, 4);
2425        let join_type = JoinType::Full;
2426
2427        // a + b > c + 10 AND a + b < c + 100
2428        let config = SessionConfig::new().with_repartition_joins(false);
2429        // let session_ctx = SessionContext::with_config(config);
2430        // let task_ctx = session_ctx.task_ctx();
2431        let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
2432        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2433        let left_schema = &left_partition[0].schema();
2434        let right_schema = &right_partition[0].schema();
2435        let left_sorted = vec![
2436            [PhysicalSortExpr {
2437                expr: col("la1", left_schema)?,
2438                options: SortOptions::default(),
2439            }]
2440            .into(),
2441            [PhysicalSortExpr {
2442                expr: col("la2", left_schema)?,
2443                options: SortOptions::default(),
2444            }]
2445            .into(),
2446        ];
2447
2448        let right_sorted = [PhysicalSortExpr {
2449            expr: col("ra1", right_schema)?,
2450            options: SortOptions::default(),
2451        }]
2452        .into();
2453
2454        let (left, right) = create_memory_table(
2455            left_partition,
2456            right_partition,
2457            left_sorted,
2458            vec![right_sorted],
2459        )?;
2460
2461        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2462
2463        let intermediate_schema = Schema::new(vec![
2464            Field::new("0", DataType::Int32, true),
2465            Field::new("1", DataType::Int32, true),
2466            Field::new("2", DataType::Int32, true),
2467        ]);
2468        let filter_expr = complicated_filter(&intermediate_schema)?;
2469        let column_indices = vec![
2470            ColumnIndex {
2471                index: 0,
2472                side: JoinSide::Left,
2473            },
2474            ColumnIndex {
2475                index: 4,
2476                side: JoinSide::Left,
2477            },
2478            ColumnIndex {
2479                index: 0,
2480                side: JoinSide::Right,
2481            },
2482        ];
2483        let filter =
2484            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2485
2486        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2487        Ok(())
2488    }
2489
2490    #[rstest]
2491    #[tokio::test(flavor = "multi_thread")]
2492    async fn testing_with_temporal_columns(
2493        #[values(
2494            JoinType::Inner,
2495            JoinType::Left,
2496            JoinType::Right,
2497            JoinType::RightSemi,
2498            JoinType::LeftSemi,
2499            JoinType::LeftAnti,
2500            JoinType::LeftMark,
2501            JoinType::RightAnti,
2502            JoinType::RightMark,
2503            JoinType::Full
2504        )]
2505        join_type: JoinType,
2506        #[values(
2507            (4, 5),
2508            (12, 17),
2509        )]
2510        cardinality: (i32, i32),
2511        #[values(0, 1, 2)] case_expr: usize,
2512    ) -> Result<()> {
2513        let session_config = SessionConfig::new().with_repartition_joins(false);
2514        let task_ctx = TaskContext::default().with_session_config(session_config);
2515        let task_ctx = Arc::new(task_ctx);
2516        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2517
2518        let left_schema = &left_partition[0].schema();
2519        let right_schema = &right_partition[0].schema();
2520        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2521        let left_sorted = [PhysicalSortExpr {
2522            expr: col("lt1", left_schema)?,
2523            options: SortOptions {
2524                descending: false,
2525                nulls_first: true,
2526            },
2527        }]
2528        .into();
2529        let right_sorted = [PhysicalSortExpr {
2530            expr: col("rt1", right_schema)?,
2531            options: SortOptions {
2532                descending: false,
2533                nulls_first: true,
2534            },
2535        }]
2536        .into();
2537        let (left, right) = create_memory_table(
2538            left_partition,
2539            right_partition,
2540            vec![left_sorted],
2541            vec![right_sorted],
2542        )?;
2543        let intermediate_schema = Schema::new(vec![
2544            Field::new(
2545                "left",
2546                DataType::Timestamp(TimeUnit::Millisecond, None),
2547                false,
2548            ),
2549            Field::new(
2550                "right",
2551                DataType::Timestamp(TimeUnit::Millisecond, None),
2552                false,
2553            ),
2554        ]);
2555        let filter_expr = join_expr_tests_fixture_temporal(
2556            case_expr,
2557            col("left", &intermediate_schema)?,
2558            col("right", &intermediate_schema)?,
2559            &intermediate_schema,
2560        )?;
2561        let column_indices = vec![
2562            ColumnIndex {
2563                index: 3,
2564                side: JoinSide::Left,
2565            },
2566            ColumnIndex {
2567                index: 3,
2568                side: JoinSide::Right,
2569            },
2570        ];
2571        let filter =
2572            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2573        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2574        Ok(())
2575    }
2576
2577    #[rstest]
2578    #[tokio::test(flavor = "multi_thread")]
2579    async fn test_with_interval_columns(
2580        #[values(
2581            JoinType::Inner,
2582            JoinType::Left,
2583            JoinType::Right,
2584            JoinType::RightSemi,
2585            JoinType::LeftSemi,
2586            JoinType::LeftAnti,
2587            JoinType::LeftMark,
2588            JoinType::RightAnti,
2589            JoinType::RightMark,
2590            JoinType::Full
2591        )]
2592        join_type: JoinType,
2593        #[values(
2594            (4, 5),
2595            (12, 17),
2596        )]
2597        cardinality: (i32, i32),
2598    ) -> Result<()> {
2599        let session_config = SessionConfig::new().with_repartition_joins(false);
2600        let task_ctx = TaskContext::default().with_session_config(session_config);
2601        let task_ctx = Arc::new(task_ctx);
2602        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2603
2604        let left_schema = &left_partition[0].schema();
2605        let right_schema = &right_partition[0].schema();
2606        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2607        let left_sorted = [PhysicalSortExpr {
2608            expr: col("li1", left_schema)?,
2609            options: SortOptions {
2610                descending: false,
2611                nulls_first: true,
2612            },
2613        }]
2614        .into();
2615        let right_sorted = [PhysicalSortExpr {
2616            expr: col("ri1", right_schema)?,
2617            options: SortOptions {
2618                descending: false,
2619                nulls_first: true,
2620            },
2621        }]
2622        .into();
2623        let (left, right) = create_memory_table(
2624            left_partition,
2625            right_partition,
2626            vec![left_sorted],
2627            vec![right_sorted],
2628        )?;
2629        let intermediate_schema = Schema::new(vec![
2630            Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
2631            Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
2632        ]);
2633        let filter_expr = join_expr_tests_fixture_temporal(
2634            0,
2635            col("left", &intermediate_schema)?,
2636            col("right", &intermediate_schema)?,
2637            &intermediate_schema,
2638        )?;
2639        let column_indices = vec![
2640            ColumnIndex {
2641                index: 9,
2642                side: JoinSide::Left,
2643            },
2644            ColumnIndex {
2645                index: 9,
2646                side: JoinSide::Right,
2647            },
2648        ];
2649        let filter =
2650            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2651        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2652
2653        Ok(())
2654    }
2655
2656    #[rstest]
2657    #[tokio::test(flavor = "multi_thread")]
2658    async fn testing_ascending_float_pruning(
2659        #[values(
2660            JoinType::Inner,
2661            JoinType::Left,
2662            JoinType::Right,
2663            JoinType::RightSemi,
2664            JoinType::LeftSemi,
2665            JoinType::LeftAnti,
2666            JoinType::LeftMark,
2667            JoinType::RightAnti,
2668            JoinType::RightMark,
2669            JoinType::Full
2670        )]
2671        join_type: JoinType,
2672        #[values(
2673            (4, 5),
2674            (12, 17),
2675        )]
2676        cardinality: (i32, i32),
2677        #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2678    ) -> Result<()> {
2679        let session_config = SessionConfig::new().with_repartition_joins(false);
2680        let task_ctx = TaskContext::default().with_session_config(session_config);
2681        let task_ctx = Arc::new(task_ctx);
2682        let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2683
2684        let left_schema = &left_partition[0].schema();
2685        let right_schema = &right_partition[0].schema();
2686        let left_sorted = [PhysicalSortExpr {
2687            expr: col("l_float", left_schema)?,
2688            options: SortOptions::default(),
2689        }]
2690        .into();
2691        let right_sorted = [PhysicalSortExpr {
2692            expr: col("r_float", right_schema)?,
2693            options: SortOptions::default(),
2694        }]
2695        .into();
2696        let (left, right) = create_memory_table(
2697            left_partition,
2698            right_partition,
2699            vec![left_sorted],
2700            vec![right_sorted],
2701        )?;
2702
2703        let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2704
2705        let intermediate_schema = Schema::new(vec![
2706            Field::new("left", DataType::Float64, true),
2707            Field::new("right", DataType::Float64, true),
2708        ]);
2709        let filter_expr = join_expr_tests_fixture_f64(
2710            case_expr,
2711            col("left", &intermediate_schema)?,
2712            col("right", &intermediate_schema)?,
2713        );
2714        let column_indices = vec![
2715            ColumnIndex {
2716                index: 10, // l_float
2717                side: JoinSide::Left,
2718            },
2719            ColumnIndex {
2720                index: 10, // r_float
2721                side: JoinSide::Right,
2722            },
2723        ];
2724        let filter =
2725            JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2726
2727        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2728        Ok(())
2729    }
2730}