1use std::any::Any;
29use std::fmt::{self, Debug};
30use std::mem::{size_of, size_of_val};
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use std::vec;
34
35use crate::check_if_same_properties;
36use crate::common::SharedMemoryReservation;
37use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
38use crate::joins::stream_join_utils::{
39 PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
40 calculate_filter_expr_intervals, combine_two_batches,
41 convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
42 get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
43};
44use crate::joins::utils::{
45 BatchSplitter, BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn,
46 JoinOnRef, NoopBatchTransformer, StatefulStreamResult, apply_join_filter_to_indices,
47 build_batch_from_indices, build_join_schema, check_join_is_valid, equal_rows_arr,
48 symmetric_join_output_partitioning, update_hash,
49};
50use crate::projection::{
51 ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
52 physical_to_column_exprs, update_join_filter, update_join_on,
53};
54use crate::{
55 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
56 PlanProperties, RecordBatchStream, SendableRecordBatchStream,
57 joins::StreamJoinPartitionMode,
58 metrics::{ExecutionPlanMetricsSet, MetricsSet},
59};
60
61use arrow::array::{
62 ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array,
63 UInt64Array,
64};
65use arrow::compute::concat_batches;
66use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
67use arrow::record_batch::RecordBatch;
68use datafusion_common::hash_utils::create_hashes;
69use datafusion_common::utils::bisect;
70use datafusion_common::{
71 HashSet, JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err,
72 plan_err,
73};
74use datafusion_execution::TaskContext;
75use datafusion_execution::memory_pool::MemoryConsumer;
76use datafusion_expr::interval_arithmetic::Interval;
77use datafusion_physical_expr::equivalence::join_equivalence_properties;
78use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
79use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
80use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
81
82use ahash::RandomState;
83use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
84use futures::{Stream, StreamExt, ready};
85use parking_lot::Mutex;
86
87const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
88
89#[derive(Debug, Clone)]
175pub struct SymmetricHashJoinExec {
176 pub(crate) left: Arc<dyn ExecutionPlan>,
178 pub(crate) right: Arc<dyn ExecutionPlan>,
180 pub(crate) on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
182 pub(crate) filter: Option<JoinFilter>,
184 pub(crate) join_type: JoinType,
186 random_state: RandomState,
188 metrics: ExecutionPlanMetricsSet,
190 column_indices: Vec<ColumnIndex>,
192 pub(crate) null_equality: NullEquality,
194 pub(crate) left_sort_exprs: Option<LexOrdering>,
196 pub(crate) right_sort_exprs: Option<LexOrdering>,
198 mode: StreamJoinPartitionMode,
200 cache: Arc<PlanProperties>,
202}
203
204impl SymmetricHashJoinExec {
205 #[expect(clippy::too_many_arguments)]
212 pub fn try_new(
213 left: Arc<dyn ExecutionPlan>,
214 right: Arc<dyn ExecutionPlan>,
215 on: JoinOn,
216 filter: Option<JoinFilter>,
217 join_type: &JoinType,
218 null_equality: NullEquality,
219 left_sort_exprs: Option<LexOrdering>,
220 right_sort_exprs: Option<LexOrdering>,
221 mode: StreamJoinPartitionMode,
222 ) -> Result<Self> {
223 let left_schema = left.schema();
224 let right_schema = right.schema();
225
226 if on.is_empty() {
228 return plan_err!(
229 "On constraints in SymmetricHashJoinExec should be non-empty"
230 );
231 }
232
233 check_join_is_valid(&left_schema, &right_schema, &on)?;
235
236 let (schema, column_indices) =
238 build_join_schema(&left_schema, &right_schema, join_type);
239
240 let random_state = RandomState::with_seeds(0, 0, 0, 0);
242 let schema = Arc::new(schema);
243 let cache = Self::compute_properties(&left, &right, schema, *join_type, &on)?;
244 Ok(SymmetricHashJoinExec {
245 left,
246 right,
247 on,
248 filter,
249 join_type: *join_type,
250 random_state,
251 metrics: ExecutionPlanMetricsSet::new(),
252 column_indices,
253 null_equality,
254 left_sort_exprs,
255 right_sort_exprs,
256 mode,
257 cache: Arc::new(cache),
258 })
259 }
260
261 fn compute_properties(
263 left: &Arc<dyn ExecutionPlan>,
264 right: &Arc<dyn ExecutionPlan>,
265 schema: SchemaRef,
266 join_type: JoinType,
267 join_on: JoinOnRef,
268 ) -> Result<PlanProperties> {
269 let eq_properties = join_equivalence_properties(
271 left.equivalence_properties().clone(),
272 right.equivalence_properties().clone(),
273 &join_type,
274 schema,
275 &[false, false],
276 None,
278 join_on,
279 )?;
280
281 let output_partitioning =
282 symmetric_join_output_partitioning(left, right, &join_type)?;
283
284 Ok(PlanProperties::new(
285 eq_properties,
286 output_partitioning,
287 emission_type_from_children([left, right]),
288 boundedness_from_children([left, right]),
289 ))
290 }
291
292 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
294 &self.left
295 }
296
297 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
299 &self.right
300 }
301
302 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
304 &self.on
305 }
306
307 pub fn filter(&self) -> Option<&JoinFilter> {
309 self.filter.as_ref()
310 }
311
312 pub fn join_type(&self) -> &JoinType {
314 &self.join_type
315 }
316
317 pub fn null_equality(&self) -> NullEquality {
319 self.null_equality
320 }
321
322 pub fn partition_mode(&self) -> StreamJoinPartitionMode {
324 self.mode
325 }
326
327 pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
329 self.left_sort_exprs.as_ref()
330 }
331
332 pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
334 self.right_sort_exprs.as_ref()
335 }
336
337 pub fn check_if_order_information_available(&self) -> Result<bool> {
339 if let Some(filter) = self.filter() {
340 let left = self.left();
341 if let Some(left_ordering) = left.output_ordering() {
342 let right = self.right();
343 if let Some(right_ordering) = right.output_ordering() {
344 let left_convertible = convert_sort_expr_with_filter_schema(
345 &JoinSide::Left,
346 filter,
347 &left.schema(),
348 &left_ordering[0],
349 )?
350 .is_some();
351 let right_convertible = convert_sort_expr_with_filter_schema(
352 &JoinSide::Right,
353 filter,
354 &right.schema(),
355 &right_ordering[0],
356 )?
357 .is_some();
358 return Ok(left_convertible && right_convertible);
359 }
360 }
361 }
362 Ok(false)
363 }
364
365 fn with_new_children_and_same_properties(
366 &self,
367 mut children: Vec<Arc<dyn ExecutionPlan>>,
368 ) -> Self {
369 let left = children.swap_remove(0);
370 let right = children.swap_remove(0);
371 Self {
372 left,
373 right,
374 metrics: ExecutionPlanMetricsSet::new(),
375 ..Self::clone(self)
376 }
377 }
378}
379
380impl DisplayAs for SymmetricHashJoinExec {
381 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
382 match t {
383 DisplayFormatType::Default | DisplayFormatType::Verbose => {
384 let display_filter = self.filter.as_ref().map_or_else(
385 || "".to_string(),
386 |f| format!(", filter={}", f.expression()),
387 );
388 let on = self
389 .on
390 .iter()
391 .map(|(c1, c2)| format!("({c1}, {c2})"))
392 .collect::<Vec<String>>()
393 .join(", ");
394 write!(
395 f,
396 "SymmetricHashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
397 self.mode, self.join_type, on, display_filter
398 )
399 }
400 DisplayFormatType::TreeRender => {
401 let on = self
402 .on
403 .iter()
404 .map(|(c1, c2)| {
405 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
406 })
407 .collect::<Vec<String>>()
408 .join(", ");
409
410 writeln!(f, "mode={:?}", self.mode)?;
411 if *self.join_type() != JoinType::Inner {
412 writeln!(f, "join_type={:?}", self.join_type)?;
413 }
414 writeln!(f, "on={on}")
415 }
416 }
417 }
418}
419
420impl ExecutionPlan for SymmetricHashJoinExec {
421 fn name(&self) -> &'static str {
422 "SymmetricHashJoinExec"
423 }
424
425 fn as_any(&self) -> &dyn Any {
426 self
427 }
428
429 fn properties(&self) -> &Arc<PlanProperties> {
430 &self.cache
431 }
432
433 fn required_input_distribution(&self) -> Vec<Distribution> {
434 match self.mode {
435 StreamJoinPartitionMode::Partitioned => {
436 let (left_expr, right_expr) = self
437 .on
438 .iter()
439 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
440 .unzip();
441 vec![
442 Distribution::HashPartitioned(left_expr),
443 Distribution::HashPartitioned(right_expr),
444 ]
445 }
446 StreamJoinPartitionMode::SinglePartition => {
447 vec![Distribution::SinglePartition, Distribution::SinglePartition]
448 }
449 }
450 }
451
452 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
453 vec![
454 self.left_sort_exprs
455 .as_ref()
456 .map(|e| OrderingRequirements::from(e.clone())),
457 self.right_sort_exprs
458 .as_ref()
459 .map(|e| OrderingRequirements::from(e.clone())),
460 ]
461 }
462
463 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
464 vec![&self.left, &self.right]
465 }
466
467 fn with_new_children(
468 self: Arc<Self>,
469 children: Vec<Arc<dyn ExecutionPlan>>,
470 ) -> Result<Arc<dyn ExecutionPlan>> {
471 check_if_same_properties!(self, children);
472 Ok(Arc::new(SymmetricHashJoinExec::try_new(
473 Arc::clone(&children[0]),
474 Arc::clone(&children[1]),
475 self.on.clone(),
476 self.filter.clone(),
477 &self.join_type,
478 self.null_equality,
479 self.left_sort_exprs.clone(),
480 self.right_sort_exprs.clone(),
481 self.mode,
482 )?))
483 }
484
485 fn metrics(&self) -> Option<MetricsSet> {
486 Some(self.metrics.clone_inner())
487 }
488
489 fn execute(
490 &self,
491 partition: usize,
492 context: Arc<TaskContext>,
493 ) -> Result<SendableRecordBatchStream> {
494 let left_partitions = self.left.output_partitioning().partition_count();
495 let right_partitions = self.right.output_partitioning().partition_count();
496 assert_eq_or_internal_err!(
497 left_partitions,
498 right_partitions,
499 "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
500 consider using RepartitionExec"
501 );
502 let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
505 self.left_sort_exprs(),
506 self.right_sort_exprs(),
507 &self.filter,
508 ) {
509 (Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
510 let (left, right, graph) = prepare_sorted_exprs(
511 filter,
512 &self.left,
513 &self.right,
514 left_sort_exprs,
515 right_sort_exprs,
516 )?;
517 (Some(left), Some(right), Some(graph))
518 }
519 _ => (None, None, None),
522 };
523
524 let (on_left, on_right) = self.on.iter().cloned().unzip();
525
526 let left_side_joiner =
527 OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
528 let right_side_joiner =
529 OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
530
531 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
532
533 let right_stream = self.right.execute(partition, Arc::clone(&context))?;
534
535 let batch_size = context.session_config().batch_size();
536 let enforce_batch_size_in_joins =
537 context.session_config().enforce_batch_size_in_joins();
538
539 let reservation = Arc::new(Mutex::new(
540 MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
541 .register(context.memory_pool()),
542 ));
543 if let Some(g) = graph.as_ref() {
544 reservation.lock().try_grow(g.size())?;
545 }
546
547 if enforce_batch_size_in_joins {
548 Ok(Box::pin(SymmetricHashJoinStream {
549 left_stream,
550 right_stream,
551 schema: self.schema(),
552 filter: self.filter.clone(),
553 join_type: self.join_type,
554 random_state: self.random_state.clone(),
555 left: left_side_joiner,
556 right: right_side_joiner,
557 column_indices: self.column_indices.clone(),
558 metrics: StreamJoinMetrics::new(partition, &self.metrics),
559 graph,
560 left_sorted_filter_expr,
561 right_sorted_filter_expr,
562 null_equality: self.null_equality,
563 state: SHJStreamState::PullRight,
564 reservation,
565 batch_transformer: BatchSplitter::new(batch_size),
566 }))
567 } else {
568 Ok(Box::pin(SymmetricHashJoinStream {
569 left_stream,
570 right_stream,
571 schema: self.schema(),
572 filter: self.filter.clone(),
573 join_type: self.join_type,
574 random_state: self.random_state.clone(),
575 left: left_side_joiner,
576 right: right_side_joiner,
577 column_indices: self.column_indices.clone(),
578 metrics: StreamJoinMetrics::new(partition, &self.metrics),
579 graph,
580 left_sorted_filter_expr,
581 right_sorted_filter_expr,
582 null_equality: self.null_equality,
583 state: SHJStreamState::PullRight,
584 reservation,
585 batch_transformer: NoopBatchTransformer::new(),
586 }))
587 }
588 }
589
590 fn try_swapping_with_projection(
594 &self,
595 projection: &ProjectionExec,
596 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
597 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr())
599 else {
600 return Ok(None);
601 };
602
603 let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
604 self.left().schema().fields().len(),
605 &projection_as_columns,
606 );
607
608 if !join_allows_pushdown(
609 &projection_as_columns,
610 &self.schema(),
611 far_right_left_col_ind,
612 far_left_right_col_ind,
613 ) {
614 return Ok(None);
615 }
616
617 let Some(new_on) = update_join_on(
618 &projection_as_columns[0..=far_right_left_col_ind as _],
619 &projection_as_columns[far_left_right_col_ind as _..],
620 self.on(),
621 self.left().schema().fields().len(),
622 ) else {
623 return Ok(None);
624 };
625
626 let new_filter = if let Some(filter) = self.filter() {
627 match update_join_filter(
628 &projection_as_columns[0..=far_right_left_col_ind as _],
629 &projection_as_columns[far_left_right_col_ind as _..],
630 filter,
631 self.left().schema().fields().len(),
632 ) {
633 Some(updated_filter) => Some(updated_filter),
634 None => return Ok(None),
635 }
636 } else {
637 None
638 };
639
640 let (new_left, new_right) = new_join_children(
641 &projection_as_columns,
642 far_right_left_col_ind,
643 far_left_right_col_ind,
644 self.left(),
645 self.right(),
646 )?;
647
648 SymmetricHashJoinExec::try_new(
649 Arc::new(new_left),
650 Arc::new(new_right),
651 new_on,
652 new_filter,
653 self.join_type(),
654 self.null_equality(),
655 self.right().output_ordering().cloned(),
656 self.left().output_ordering().cloned(),
657 self.partition_mode(),
658 )
659 .map(|e| Some(Arc::new(e) as _))
660 }
661}
662
663struct SymmetricHashJoinStream<T> {
665 left_stream: SendableRecordBatchStream,
667 right_stream: SendableRecordBatchStream,
668 schema: Arc<Schema>,
670 filter: Option<JoinFilter>,
672 join_type: JoinType,
674 left: OneSideHashJoiner,
676 right: OneSideHashJoiner,
678 column_indices: Vec<ColumnIndex>,
680 graph: Option<ExprIntervalGraph>,
682 left_sorted_filter_expr: Option<SortedFilterExpr>,
684 right_sorted_filter_expr: Option<SortedFilterExpr>,
686 random_state: RandomState,
688 null_equality: NullEquality,
690 metrics: StreamJoinMetrics,
692 reservation: SharedMemoryReservation,
694 state: SHJStreamState,
696 batch_transformer: T,
698}
699
700impl<T: BatchTransformer + Unpin + Send> RecordBatchStream
701 for SymmetricHashJoinStream<T>
702{
703 fn schema(&self) -> SchemaRef {
704 Arc::clone(&self.schema)
705 }
706}
707
708impl<T: BatchTransformer + Unpin + Send> Stream for SymmetricHashJoinStream<T> {
709 type Item = Result<RecordBatch>;
710
711 fn poll_next(
712 mut self: std::pin::Pin<&mut Self>,
713 cx: &mut Context<'_>,
714 ) -> Poll<Option<Self::Item>> {
715 self.poll_next_impl(cx)
716 }
717}
718
719fn determine_prune_length(
738 buffer: &RecordBatch,
739 build_side_filter_expr: &SortedFilterExpr,
740) -> Result<usize> {
741 let origin_sorted_expr = build_side_filter_expr.origin_sorted_expr();
742 let interval = build_side_filter_expr.interval();
743 let batch_arr = origin_sorted_expr
745 .expr
746 .evaluate(buffer)?
747 .into_array(buffer.num_rows())?;
748
749 let target = if origin_sorted_expr.options.descending {
751 interval.upper().clone()
752 } else {
753 interval.lower().clone()
754 };
755
756 bisect::<true>(&[batch_arr], &[target], &[origin_sorted_expr.options])
758}
759
760fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> bool {
775 if build_side == JoinSide::Left {
776 matches!(
777 join_type,
778 JoinType::Left
779 | JoinType::LeftAnti
780 | JoinType::Full
781 | JoinType::LeftSemi
782 | JoinType::LeftMark
783 )
784 } else {
785 matches!(
786 join_type,
787 JoinType::Right
788 | JoinType::RightAnti
789 | JoinType::Full
790 | JoinType::RightSemi
791 | JoinType::RightMark
792 )
793 }
794}
795
796fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: ArrowPrimitiveType>(
813 build_side: JoinSide,
814 prune_length: usize,
815 visited_rows: &HashSet<usize>,
816 deleted_offset: usize,
817 join_type: JoinType,
818) -> Result<(PrimitiveArray<L>, PrimitiveArray<R>)>
819where
820 NativeAdapter<L>: From<<L as ArrowPrimitiveType>::Native>,
821{
822 let result = match (build_side, join_type) {
824 (JoinSide::Left, JoinType::LeftMark) => {
840 let build_indices = (0..prune_length)
841 .map(L::Native::from_usize)
842 .collect::<PrimitiveArray<L>>();
843 let probe_indices = (0..prune_length)
844 .map(|idx| {
845 visited_rows
847 .contains(&(idx + deleted_offset))
848 .then_some(R::Native::from_usize(0).unwrap())
849 })
850 .collect();
851 (build_indices, probe_indices)
852 }
853 (JoinSide::Right, JoinType::RightMark) => {
854 let build_indices = (0..prune_length)
855 .map(L::Native::from_usize)
856 .collect::<PrimitiveArray<L>>();
857 let probe_indices = (0..prune_length)
858 .map(|idx| {
859 visited_rows
861 .contains(&(idx + deleted_offset))
862 .then_some(R::Native::from_usize(0).unwrap())
863 })
864 .collect();
865 (build_indices, probe_indices)
866 }
867 (JoinSide::Left, JoinType::Left | JoinType::LeftAnti)
869 | (JoinSide::Right, JoinType::Right | JoinType::RightAnti)
870 | (_, JoinType::Full) => {
871 let build_unmatched_indices =
872 get_pruning_anti_indices(prune_length, deleted_offset, visited_rows);
873 let mut builder =
874 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
875 builder.append_nulls(build_unmatched_indices.len());
876 let probe_indices = builder.finish();
877 (build_unmatched_indices, probe_indices)
878 }
879 (JoinSide::Left, JoinType::LeftSemi) | (JoinSide::Right, JoinType::RightSemi) => {
881 let build_unmatched_indices =
882 get_pruning_semi_indices(prune_length, deleted_offset, visited_rows);
883 let mut builder =
884 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
885 builder.append_nulls(build_unmatched_indices.len());
886 let probe_indices = builder.finish();
887 (build_unmatched_indices, probe_indices)
888 }
889 _ => unreachable!(),
891 };
892 Ok(result)
893}
894
895pub(crate) fn build_side_determined_results(
913 build_hash_joiner: &OneSideHashJoiner,
914 output_schema: &SchemaRef,
915 prune_length: usize,
916 probe_schema: SchemaRef,
917 join_type: JoinType,
918 column_indices: &[ColumnIndex],
919) -> Result<Option<RecordBatch>> {
920 if prune_length > 0
922 && need_to_produce_result_in_final(build_hash_joiner.build_side, join_type)
923 {
924 let (build_indices, probe_indices) = calculate_indices_by_join_type(
926 build_hash_joiner.build_side,
927 prune_length,
928 &build_hash_joiner.visited_rows,
929 build_hash_joiner.deleted_offset,
930 join_type,
931 )?;
932
933 let empty_probe_batch = RecordBatch::new_empty(probe_schema);
935 build_batch_from_indices(
937 output_schema.as_ref(),
938 &build_hash_joiner.input_buffer,
939 &empty_probe_batch,
940 &build_indices,
941 &probe_indices,
942 column_indices,
943 build_hash_joiner.build_side,
944 join_type,
945 )
946 .map(|batch| (batch.num_rows() > 0).then_some(batch))
947 } else {
948 Ok(None)
950 }
951}
952
953#[expect(clippy::too_many_arguments)]
973pub(crate) fn join_with_probe_batch(
974 build_hash_joiner: &mut OneSideHashJoiner,
975 probe_hash_joiner: &mut OneSideHashJoiner,
976 schema: &SchemaRef,
977 join_type: JoinType,
978 filter: Option<&JoinFilter>,
979 probe_batch: &RecordBatch,
980 column_indices: &[ColumnIndex],
981 random_state: &RandomState,
982 null_equality: NullEquality,
983) -> Result<Option<RecordBatch>> {
984 if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
985 return Ok(None);
986 }
987 let (build_indices, probe_indices) = lookup_join_hashmap(
988 &build_hash_joiner.hashmap,
989 &build_hash_joiner.input_buffer,
990 probe_batch,
991 &build_hash_joiner.on,
992 &probe_hash_joiner.on,
993 random_state,
994 null_equality,
995 &mut build_hash_joiner.hashes_buffer,
996 Some(build_hash_joiner.deleted_offset),
997 )?;
998
999 let (build_indices, probe_indices) = if let Some(filter) = filter {
1000 apply_join_filter_to_indices(
1001 &build_hash_joiner.input_buffer,
1002 probe_batch,
1003 build_indices,
1004 probe_indices,
1005 filter,
1006 build_hash_joiner.build_side,
1007 None,
1008 join_type,
1009 )?
1010 } else {
1011 (build_indices, probe_indices)
1012 };
1013
1014 if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
1015 record_visited_indices(
1016 &mut build_hash_joiner.visited_rows,
1017 build_hash_joiner.deleted_offset,
1018 &build_indices,
1019 );
1020 }
1021 if need_to_produce_result_in_final(build_hash_joiner.build_side.negate(), join_type) {
1022 record_visited_indices(
1023 &mut probe_hash_joiner.visited_rows,
1024 probe_hash_joiner.offset,
1025 &probe_indices,
1026 );
1027 }
1028 if matches!(
1029 join_type,
1030 JoinType::LeftAnti
1031 | JoinType::RightAnti
1032 | JoinType::LeftSemi
1033 | JoinType::LeftMark
1034 | JoinType::RightSemi
1035 | JoinType::RightMark
1036 ) {
1037 Ok(None)
1038 } else {
1039 build_batch_from_indices(
1040 schema,
1041 &build_hash_joiner.input_buffer,
1042 probe_batch,
1043 &build_indices,
1044 &probe_indices,
1045 column_indices,
1046 build_hash_joiner.build_side,
1047 join_type,
1048 )
1049 .map(|batch| (batch.num_rows() > 0).then_some(batch))
1050 }
1051}
1052
1053#[expect(clippy::too_many_arguments)]
1073fn lookup_join_hashmap(
1074 build_hashmap: &PruningJoinHashMap,
1075 build_batch: &RecordBatch,
1076 probe_batch: &RecordBatch,
1077 build_on: &[PhysicalExprRef],
1078 probe_on: &[PhysicalExprRef],
1079 random_state: &RandomState,
1080 null_equality: NullEquality,
1081 hashes_buffer: &mut Vec<u64>,
1082 deleted_offset: Option<usize>,
1083) -> Result<(UInt64Array, UInt32Array)> {
1084 let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
1085 let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;
1086
1087 hashes_buffer.clear();
1088 hashes_buffer.resize(probe_batch.num_rows(), 0);
1089 let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
1090
1091 let (mut matched_probe, mut matched_build) = build_hashmap.get_matched_indices(
1122 Box::new(hash_values.iter().enumerate().rev()),
1123 deleted_offset,
1124 );
1125
1126 matched_probe.reverse();
1127 matched_build.reverse();
1128
1129 let build_indices: UInt64Array = matched_build.into();
1130 let probe_indices: UInt32Array = matched_probe.into();
1131
1132 let (build_indices, probe_indices) = equal_rows_arr(
1133 &build_indices,
1134 &probe_indices,
1135 &build_join_values,
1136 &keys_values,
1137 null_equality,
1138 )?;
1139
1140 Ok((build_indices, probe_indices))
1141}
1142
1143pub struct OneSideHashJoiner {
1144 build_side: JoinSide,
1146 pub input_buffer: RecordBatch,
1148 pub(crate) on: Vec<PhysicalExprRef>,
1150 pub(crate) hashmap: PruningJoinHashMap,
1152 pub(crate) hashes_buffer: Vec<u64>,
1154 pub(crate) visited_rows: HashSet<usize>,
1156 pub(crate) offset: usize,
1158 pub(crate) deleted_offset: usize,
1160}
1161
1162impl OneSideHashJoiner {
1163 pub fn size(&self) -> usize {
1164 let mut size = 0;
1165 size += size_of_val(self);
1166 size += size_of_val(&self.build_side);
1167 size += self.input_buffer.get_array_memory_size();
1168 size += size_of_val(&self.on);
1169 size += self.hashmap.size();
1170 size += self.hashes_buffer.capacity() * size_of::<u64>();
1171 size += self.visited_rows.capacity() * size_of::<usize>();
1172 size += size_of_val(&self.offset);
1173 size += size_of_val(&self.deleted_offset);
1174 size
1175 }
1176 pub fn new(
1177 build_side: JoinSide,
1178 on: Vec<PhysicalExprRef>,
1179 schema: SchemaRef,
1180 ) -> Self {
1181 Self {
1182 build_side,
1183 input_buffer: RecordBatch::new_empty(schema),
1184 on,
1185 hashmap: PruningJoinHashMap::with_capacity(0),
1186 hashes_buffer: vec![],
1187 visited_rows: HashSet::new(),
1188 offset: 0,
1189 deleted_offset: 0,
1190 }
1191 }
1192
1193 pub(crate) fn update_internal_state(
1204 &mut self,
1205 batch: &RecordBatch,
1206 random_state: &RandomState,
1207 ) -> Result<()> {
1208 self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
1210 self.hashes_buffer.resize(batch.num_rows(), 0);
1212 update_hash(
1215 &self.on,
1216 batch,
1217 &mut self.hashmap,
1218 self.offset,
1219 random_state,
1220 &mut self.hashes_buffer,
1221 self.deleted_offset,
1222 false,
1223 )?;
1224 Ok(())
1225 }
1226
1227 pub(crate) fn calculate_prune_length_with_probe_batch(
1239 &mut self,
1240 build_side_sorted_filter_expr: &mut SortedFilterExpr,
1241 probe_side_sorted_filter_expr: &mut SortedFilterExpr,
1242 graph: &mut ExprIntervalGraph,
1243 ) -> Result<usize> {
1244 if self.input_buffer.num_rows() == 0 {
1246 return Ok(0);
1247 }
1248 let mut filter_intervals = vec![];
1251 for expr in [
1252 &build_side_sorted_filter_expr,
1253 &probe_side_sorted_filter_expr,
1254 ] {
1255 filter_intervals.push((expr.node_index(), expr.interval().clone()))
1256 }
1257 graph.update_ranges(&mut filter_intervals, Interval::TRUE)?;
1259 let calculated_build_side_interval = filter_intervals.remove(0).1;
1261 if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
1263 return Ok(0);
1264 }
1265 build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
1267
1268 determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
1269 }
1270
1271 pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> {
1272 self.hashmap.prune_hash_values(
1274 prune_length,
1275 self.deleted_offset as u64,
1276 HASHMAP_SHRINK_SCALE_FACTOR,
1277 );
1278 for row in self.deleted_offset..(self.deleted_offset + prune_length) {
1280 self.visited_rows.remove(&row);
1281 }
1282 self.input_buffer = self
1284 .input_buffer
1285 .slice(prune_length, self.input_buffer.num_rows() - prune_length);
1286 self.deleted_offset += prune_length;
1288 Ok(())
1289 }
1290}
1291
1292impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
1337 fn poll_next_impl(
1351 &mut self,
1352 cx: &mut Context<'_>,
1353 ) -> Poll<Option<Result<RecordBatch>>> {
1354 loop {
1355 match self.batch_transformer.next() {
1356 None => {
1357 let result = match self.state() {
1358 SHJStreamState::PullRight => {
1359 ready!(self.fetch_next_from_right_stream(cx))
1360 }
1361 SHJStreamState::PullLeft => {
1362 ready!(self.fetch_next_from_left_stream(cx))
1363 }
1364 SHJStreamState::RightExhausted => {
1365 ready!(self.handle_right_stream_end(cx))
1366 }
1367 SHJStreamState::LeftExhausted => {
1368 ready!(self.handle_left_stream_end(cx))
1369 }
1370 SHJStreamState::BothExhausted {
1371 final_result: false,
1372 } => self.prepare_for_final_results_after_exhaustion(),
1373 SHJStreamState::BothExhausted { final_result: true } => {
1374 return Poll::Ready(None);
1375 }
1376 };
1377
1378 match result? {
1379 StatefulStreamResult::Ready(None) => {
1380 return Poll::Ready(None);
1381 }
1382 StatefulStreamResult::Ready(Some(batch)) => {
1383 self.batch_transformer.set_batch(batch);
1384 }
1385 _ => {}
1386 }
1387 }
1388 Some((batch, _)) => {
1389 return self
1390 .metrics
1391 .baseline_metrics
1392 .record_poll(Poll::Ready(Some(Ok(batch))));
1393 }
1394 }
1395 }
1396 }
1397 fn fetch_next_from_right_stream(
1407 &mut self,
1408 cx: &mut Context<'_>,
1409 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1410 match ready!(self.right_stream().poll_next_unpin(cx)) {
1411 Some(Ok(batch)) => {
1412 if batch.num_rows() == 0 {
1413 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1414 }
1415 self.set_state(SHJStreamState::PullLeft);
1416 Poll::Ready(self.process_batch_from_right(&batch))
1417 }
1418 Some(Err(e)) => Poll::Ready(Err(e)),
1419 None => {
1420 self.set_state(SHJStreamState::RightExhausted);
1421 Poll::Ready(Ok(StatefulStreamResult::Continue))
1422 }
1423 }
1424 }
1425
1426 fn fetch_next_from_left_stream(
1436 &mut self,
1437 cx: &mut Context<'_>,
1438 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1439 match ready!(self.left_stream().poll_next_unpin(cx)) {
1440 Some(Ok(batch)) => {
1441 if batch.num_rows() == 0 {
1442 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1443 }
1444 self.set_state(SHJStreamState::PullRight);
1445 Poll::Ready(self.process_batch_from_left(&batch))
1446 }
1447 Some(Err(e)) => Poll::Ready(Err(e)),
1448 None => {
1449 self.set_state(SHJStreamState::LeftExhausted);
1450 Poll::Ready(Ok(StatefulStreamResult::Continue))
1451 }
1452 }
1453 }
1454
1455 fn handle_right_stream_end(
1466 &mut self,
1467 cx: &mut Context<'_>,
1468 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1469 match ready!(self.left_stream().poll_next_unpin(cx)) {
1470 Some(Ok(batch)) => {
1471 if batch.num_rows() == 0 {
1472 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1473 }
1474 Poll::Ready(self.process_batch_after_right_end(&batch))
1475 }
1476 Some(Err(e)) => Poll::Ready(Err(e)),
1477 None => {
1478 self.set_state(SHJStreamState::BothExhausted {
1479 final_result: false,
1480 });
1481 Poll::Ready(Ok(StatefulStreamResult::Continue))
1482 }
1483 }
1484 }
1485
1486 fn handle_left_stream_end(
1497 &mut self,
1498 cx: &mut Context<'_>,
1499 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1500 match ready!(self.right_stream().poll_next_unpin(cx)) {
1501 Some(Ok(batch)) => {
1502 if batch.num_rows() == 0 {
1503 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1504 }
1505 Poll::Ready(self.process_batch_after_left_end(&batch))
1506 }
1507 Some(Err(e)) => Poll::Ready(Err(e)),
1508 None => {
1509 self.set_state(SHJStreamState::BothExhausted {
1510 final_result: false,
1511 });
1512 Poll::Ready(Ok(StatefulStreamResult::Continue))
1513 }
1514 }
1515 }
1516
1517 fn prepare_for_final_results_after_exhaustion(
1527 &mut self,
1528 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1529 self.set_state(SHJStreamState::BothExhausted { final_result: true });
1530 self.process_batches_before_finalization()
1531 }
1532
1533 fn process_batch_from_right(
1534 &mut self,
1535 batch: &RecordBatch,
1536 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1537 self.perform_join_for_given_side(batch, JoinSide::Right)
1538 .map(|maybe_batch| {
1539 if maybe_batch.is_some() {
1540 StatefulStreamResult::Ready(maybe_batch)
1541 } else {
1542 StatefulStreamResult::Continue
1543 }
1544 })
1545 }
1546
1547 fn process_batch_from_left(
1548 &mut self,
1549 batch: &RecordBatch,
1550 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1551 self.perform_join_for_given_side(batch, JoinSide::Left)
1552 .map(|maybe_batch| {
1553 if maybe_batch.is_some() {
1554 StatefulStreamResult::Ready(maybe_batch)
1555 } else {
1556 StatefulStreamResult::Continue
1557 }
1558 })
1559 }
1560
1561 fn process_batch_after_left_end(
1562 &mut self,
1563 right_batch: &RecordBatch,
1564 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1565 self.process_batch_from_right(right_batch)
1566 }
1567
1568 fn process_batch_after_right_end(
1569 &mut self,
1570 left_batch: &RecordBatch,
1571 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1572 self.process_batch_from_left(left_batch)
1573 }
1574
1575 fn process_batches_before_finalization(
1576 &mut self,
1577 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1578 let left_result = build_side_determined_results(
1580 &self.left,
1581 &self.schema,
1582 self.left.input_buffer.num_rows(),
1583 self.right.input_buffer.schema(),
1584 self.join_type,
1585 &self.column_indices,
1586 )?;
1587 let right_result = build_side_determined_results(
1589 &self.right,
1590 &self.schema,
1591 self.right.input_buffer.num_rows(),
1592 self.left.input_buffer.schema(),
1593 self.join_type,
1594 &self.column_indices,
1595 )?;
1596
1597 let result = combine_two_batches(&self.schema, left_result, right_result)?;
1599
1600 if result.is_some() {
1602 return Ok(StatefulStreamResult::Ready(result));
1603 }
1604 Ok(StatefulStreamResult::Continue)
1605 }
1606
1607 fn right_stream(&mut self) -> &mut SendableRecordBatchStream {
1608 &mut self.right_stream
1609 }
1610
1611 fn left_stream(&mut self) -> &mut SendableRecordBatchStream {
1612 &mut self.left_stream
1613 }
1614
1615 fn set_state(&mut self, state: SHJStreamState) {
1616 self.state = state;
1617 }
1618
1619 fn state(&mut self) -> SHJStreamState {
1620 self.state.clone()
1621 }
1622
1623 fn size(&self) -> usize {
1624 let mut size = 0;
1625 size += size_of_val(&self.schema);
1626 size += size_of_val(&self.filter);
1627 size += size_of_val(&self.join_type);
1628 size += self.left.size();
1629 size += self.right.size();
1630 size += size_of_val(&self.column_indices);
1631 size += self.graph.as_ref().map(|g| g.size()).unwrap_or(0);
1632 size += size_of_val(&self.left_sorted_filter_expr);
1633 size += size_of_val(&self.right_sorted_filter_expr);
1634 size += size_of_val(&self.random_state);
1635 size += size_of_val(&self.null_equality);
1636 size += size_of_val(&self.metrics);
1637 size
1638 }
1639
1640 fn perform_join_for_given_side(
1648 &mut self,
1649 probe_batch: &RecordBatch,
1650 probe_side: JoinSide,
1651 ) -> Result<Option<RecordBatch>> {
1652 let (
1653 probe_hash_joiner,
1654 build_hash_joiner,
1655 probe_side_sorted_filter_expr,
1656 build_side_sorted_filter_expr,
1657 probe_side_metrics,
1658 ) = if probe_side.eq(&JoinSide::Left) {
1659 (
1660 &mut self.left,
1661 &mut self.right,
1662 &mut self.left_sorted_filter_expr,
1663 &mut self.right_sorted_filter_expr,
1664 &mut self.metrics.left,
1665 )
1666 } else {
1667 (
1668 &mut self.right,
1669 &mut self.left,
1670 &mut self.right_sorted_filter_expr,
1671 &mut self.left_sorted_filter_expr,
1672 &mut self.metrics.right,
1673 )
1674 };
1675 probe_side_metrics.input_batches.add(1);
1677 probe_side_metrics.input_rows.add(probe_batch.num_rows());
1678 probe_hash_joiner.update_internal_state(probe_batch, &self.random_state)?;
1680 let equal_result = join_with_probe_batch(
1682 build_hash_joiner,
1683 probe_hash_joiner,
1684 &self.schema,
1685 self.join_type,
1686 self.filter.as_ref(),
1687 probe_batch,
1688 &self.column_indices,
1689 &self.random_state,
1690 self.null_equality,
1691 )?;
1692 probe_hash_joiner.offset += probe_batch.num_rows();
1694
1695 let anti_result = if let (
1696 Some(build_side_sorted_filter_expr),
1697 Some(probe_side_sorted_filter_expr),
1698 Some(graph),
1699 ) = (
1700 build_side_sorted_filter_expr.as_mut(),
1701 probe_side_sorted_filter_expr.as_mut(),
1702 self.graph.as_mut(),
1703 ) {
1704 calculate_filter_expr_intervals(
1706 &build_hash_joiner.input_buffer,
1707 build_side_sorted_filter_expr,
1708 probe_batch,
1709 probe_side_sorted_filter_expr,
1710 )?;
1711 let prune_length = build_hash_joiner
1712 .calculate_prune_length_with_probe_batch(
1713 build_side_sorted_filter_expr,
1714 probe_side_sorted_filter_expr,
1715 graph,
1716 )?;
1717 let result = build_side_determined_results(
1718 build_hash_joiner,
1719 &self.schema,
1720 prune_length,
1721 probe_batch.schema(),
1722 self.join_type,
1723 &self.column_indices,
1724 )?;
1725 build_hash_joiner.prune_internal_state(prune_length)?;
1726 result
1727 } else {
1728 None
1729 };
1730
1731 let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
1733 let capacity = self.size();
1734 self.metrics.stream_memory_usage.set(capacity);
1735 self.reservation.lock().try_resize(capacity)?;
1736 Ok(result)
1737 }
1738}
1739
1740#[derive(Clone, Debug)]
1748pub enum SHJStreamState {
1749 PullRight,
1751
1752 PullLeft,
1754
1755 RightExhausted,
1757
1758 LeftExhausted,
1760
1761 BothExhausted { final_result: bool },
1766}
1767
1768#[cfg(test)]
1769mod tests {
1770 use std::collections::HashMap;
1771 use std::sync::{LazyLock, Mutex};
1772
1773 use super::*;
1774 use crate::joins::test_utils::{
1775 build_sides_record_batches, compare_batches, complicated_filter,
1776 create_memory_table, join_expr_tests_fixture_f64, join_expr_tests_fixture_i32,
1777 join_expr_tests_fixture_temporal, partitioned_hash_join_with_filter,
1778 partitioned_sym_join_with_filter, split_record_batches,
1779 };
1780
1781 use arrow::compute::SortOptions;
1782 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
1783 use datafusion_common::ScalarValue;
1784 use datafusion_execution::config::SessionConfig;
1785 use datafusion_expr::Operator;
1786 use datafusion_physical_expr::expressions::{Column, binary, col, lit};
1787 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1788
1789 use rstest::*;
1790
1791 const TABLE_SIZE: i32 = 30;
1792
1793 type TableKey = (i32, i32, usize); type TableValue = (Vec<RecordBatch>, Vec<RecordBatch>); static TABLE_CACHE: LazyLock<Mutex<HashMap<TableKey, TableValue>>> =
1798 LazyLock::new(|| Mutex::new(HashMap::new()));
1799
1800 fn get_or_create_table(
1801 cardinality: (i32, i32),
1802 batch_size: usize,
1803 ) -> Result<TableValue> {
1804 {
1805 let cache = TABLE_CACHE.lock().unwrap();
1806 if let Some(table) = cache.get(&(cardinality.0, cardinality.1, batch_size)) {
1807 return Ok(table.clone());
1808 }
1809 }
1810
1811 let (left_batch, right_batch) =
1813 build_sides_record_batches(TABLE_SIZE, cardinality)?;
1814
1815 let (left_partition, right_partition) = (
1816 split_record_batches(&left_batch, batch_size)?,
1817 split_record_batches(&right_batch, batch_size)?,
1818 );
1819
1820 let mut cache = TABLE_CACHE.lock().unwrap();
1822
1823 cache.insert(
1825 (cardinality.0, cardinality.1, batch_size),
1826 (left_partition.clone(), right_partition.clone()),
1827 );
1828
1829 Ok((left_partition, right_partition))
1830 }
1831
1832 pub async fn experiment(
1833 left: Arc<dyn ExecutionPlan>,
1834 right: Arc<dyn ExecutionPlan>,
1835 filter: Option<JoinFilter>,
1836 join_type: JoinType,
1837 on: JoinOn,
1838 task_ctx: Arc<TaskContext>,
1839 ) -> Result<()> {
1840 let first_batches = partitioned_sym_join_with_filter(
1841 Arc::clone(&left),
1842 Arc::clone(&right),
1843 on.clone(),
1844 filter.clone(),
1845 &join_type,
1846 NullEquality::NullEqualsNothing,
1847 Arc::clone(&task_ctx),
1848 )
1849 .await?;
1850 let second_batches = partitioned_hash_join_with_filter(
1851 left,
1852 right,
1853 on,
1854 filter,
1855 &join_type,
1856 NullEquality::NullEqualsNothing,
1857 task_ctx,
1858 )
1859 .await?;
1860 compare_batches(&first_batches, &second_batches);
1861 Ok(())
1862 }
1863
1864 #[rstest]
1865 #[tokio::test(flavor = "multi_thread")]
1866 async fn complex_join_all_one_ascending_numeric(
1867 #[values(
1868 JoinType::Inner,
1869 JoinType::Left,
1870 JoinType::Right,
1871 JoinType::RightSemi,
1872 JoinType::LeftSemi,
1873 JoinType::LeftAnti,
1874 JoinType::LeftMark,
1875 JoinType::RightAnti,
1876 JoinType::RightMark,
1877 JoinType::Full
1878 )]
1879 join_type: JoinType,
1880 #[values(
1881 (4, 5),
1882 (12, 17),
1883 )]
1884 cardinality: (i32, i32),
1885 ) -> Result<()> {
1886 let task_ctx = Arc::new(TaskContext::default());
1888
1889 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
1890
1891 let left_schema = &left_partition[0].schema();
1892 let right_schema = &right_partition[0].schema();
1893
1894 let left_sorted = [PhysicalSortExpr {
1895 expr: binary(
1896 col("la1", left_schema)?,
1897 Operator::Plus,
1898 col("la2", left_schema)?,
1899 left_schema,
1900 )?,
1901 options: SortOptions::default(),
1902 }]
1903 .into();
1904 let right_sorted = [PhysicalSortExpr {
1905 expr: col("ra1", right_schema)?,
1906 options: SortOptions::default(),
1907 }]
1908 .into();
1909 let (left, right) = create_memory_table(
1910 left_partition,
1911 right_partition,
1912 vec![left_sorted],
1913 vec![right_sorted],
1914 )?;
1915
1916 let on = vec![(
1917 binary(
1918 col("lc1", left_schema)?,
1919 Operator::Plus,
1920 lit(ScalarValue::Int32(Some(1))),
1921 left_schema,
1922 )?,
1923 Arc::new(Column::new_with_schema("rc1", right_schema)?) as _,
1924 )];
1925
1926 let intermediate_schema = Schema::new(vec![
1927 Field::new("0", DataType::Int32, true),
1928 Field::new("1", DataType::Int32, true),
1929 Field::new("2", DataType::Int32, true),
1930 ]);
1931 let filter_expr = complicated_filter(&intermediate_schema)?;
1932 let column_indices = vec![
1933 ColumnIndex {
1934 index: left_schema.index_of("la1")?,
1935 side: JoinSide::Left,
1936 },
1937 ColumnIndex {
1938 index: left_schema.index_of("la2")?,
1939 side: JoinSide::Left,
1940 },
1941 ColumnIndex {
1942 index: right_schema.index_of("ra1")?,
1943 side: JoinSide::Right,
1944 },
1945 ];
1946 let filter =
1947 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1948
1949 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1950 Ok(())
1951 }
1952
1953 #[rstest]
1954 #[tokio::test(flavor = "multi_thread")]
1955 async fn join_all_one_ascending_numeric(
1956 #[values(
1957 JoinType::Inner,
1958 JoinType::Left,
1959 JoinType::Right,
1960 JoinType::RightSemi,
1961 JoinType::LeftSemi,
1962 JoinType::LeftAnti,
1963 JoinType::LeftMark,
1964 JoinType::RightAnti,
1965 JoinType::RightMark,
1966 JoinType::Full
1967 )]
1968 join_type: JoinType,
1969 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1970 ) -> Result<()> {
1971 let task_ctx = Arc::new(TaskContext::default());
1972 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1973
1974 let left_schema = &left_partition[0].schema();
1975 let right_schema = &right_partition[0].schema();
1976
1977 let left_sorted = [PhysicalSortExpr {
1978 expr: col("la1", left_schema)?,
1979 options: SortOptions::default(),
1980 }]
1981 .into();
1982 let right_sorted = [PhysicalSortExpr {
1983 expr: col("ra1", right_schema)?,
1984 options: SortOptions::default(),
1985 }]
1986 .into();
1987 let (left, right) = create_memory_table(
1988 left_partition,
1989 right_partition,
1990 vec![left_sorted],
1991 vec![right_sorted],
1992 )?;
1993
1994 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1995
1996 let intermediate_schema = Schema::new(vec![
1997 Field::new("left", DataType::Int32, true),
1998 Field::new("right", DataType::Int32, true),
1999 ]);
2000 let filter_expr = join_expr_tests_fixture_i32(
2001 case_expr,
2002 col("left", &intermediate_schema)?,
2003 col("right", &intermediate_schema)?,
2004 );
2005 let column_indices = vec![
2006 ColumnIndex {
2007 index: 0,
2008 side: JoinSide::Left,
2009 },
2010 ColumnIndex {
2011 index: 0,
2012 side: JoinSide::Right,
2013 },
2014 ];
2015 let filter =
2016 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2017
2018 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2019 Ok(())
2020 }
2021
2022 #[rstest]
2023 #[tokio::test(flavor = "multi_thread")]
2024 async fn join_without_sort_information(
2025 #[values(
2026 JoinType::Inner,
2027 JoinType::Left,
2028 JoinType::Right,
2029 JoinType::RightSemi,
2030 JoinType::LeftSemi,
2031 JoinType::LeftAnti,
2032 JoinType::LeftMark,
2033 JoinType::RightAnti,
2034 JoinType::RightMark,
2035 JoinType::Full
2036 )]
2037 join_type: JoinType,
2038 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2039 ) -> Result<()> {
2040 let task_ctx = Arc::new(TaskContext::default());
2041 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
2042
2043 let left_schema = &left_partition[0].schema();
2044 let right_schema = &right_partition[0].schema();
2045 let (left, right) =
2046 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2047
2048 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2049
2050 let intermediate_schema = Schema::new(vec![
2051 Field::new("left", DataType::Int32, true),
2052 Field::new("right", DataType::Int32, true),
2053 ]);
2054 let filter_expr = join_expr_tests_fixture_i32(
2055 case_expr,
2056 col("left", &intermediate_schema)?,
2057 col("right", &intermediate_schema)?,
2058 );
2059 let column_indices = vec![
2060 ColumnIndex {
2061 index: 5,
2062 side: JoinSide::Left,
2063 },
2064 ColumnIndex {
2065 index: 5,
2066 side: JoinSide::Right,
2067 },
2068 ];
2069 let filter =
2070 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2071
2072 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2073 Ok(())
2074 }
2075
2076 #[rstest]
2077 #[tokio::test(flavor = "multi_thread")]
2078 async fn join_without_filter(
2079 #[values(
2080 JoinType::Inner,
2081 JoinType::Left,
2082 JoinType::Right,
2083 JoinType::RightSemi,
2084 JoinType::LeftSemi,
2085 JoinType::LeftAnti,
2086 JoinType::LeftMark,
2087 JoinType::RightAnti,
2088 JoinType::RightMark,
2089 JoinType::Full
2090 )]
2091 join_type: JoinType,
2092 ) -> Result<()> {
2093 let task_ctx = Arc::new(TaskContext::default());
2094 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2095 let left_schema = &left_partition[0].schema();
2096 let right_schema = &right_partition[0].schema();
2097 let (left, right) =
2098 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2099
2100 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2101 experiment(left, right, None, join_type, on, task_ctx).await?;
2102 Ok(())
2103 }
2104
2105 #[rstest]
2106 #[tokio::test(flavor = "multi_thread")]
2107 async fn join_all_one_descending_numeric_particular(
2108 #[values(
2109 JoinType::Inner,
2110 JoinType::Left,
2111 JoinType::Right,
2112 JoinType::RightSemi,
2113 JoinType::LeftSemi,
2114 JoinType::LeftAnti,
2115 JoinType::LeftMark,
2116 JoinType::RightAnti,
2117 JoinType::RightMark,
2118 JoinType::Full
2119 )]
2120 join_type: JoinType,
2121 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2122 ) -> Result<()> {
2123 let task_ctx = Arc::new(TaskContext::default());
2124 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2125 let left_schema = &left_partition[0].schema();
2126 let right_schema = &right_partition[0].schema();
2127 let left_sorted = [PhysicalSortExpr {
2128 expr: col("la1_des", left_schema)?,
2129 options: SortOptions {
2130 descending: true,
2131 nulls_first: true,
2132 },
2133 }]
2134 .into();
2135 let right_sorted = [PhysicalSortExpr {
2136 expr: col("ra1_des", right_schema)?,
2137 options: SortOptions {
2138 descending: true,
2139 nulls_first: true,
2140 },
2141 }]
2142 .into();
2143 let (left, right) = create_memory_table(
2144 left_partition,
2145 right_partition,
2146 vec![left_sorted],
2147 vec![right_sorted],
2148 )?;
2149
2150 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2151
2152 let intermediate_schema = Schema::new(vec![
2153 Field::new("left", DataType::Int32, true),
2154 Field::new("right", DataType::Int32, true),
2155 ]);
2156 let filter_expr = join_expr_tests_fixture_i32(
2157 case_expr,
2158 col("left", &intermediate_schema)?,
2159 col("right", &intermediate_schema)?,
2160 );
2161 let column_indices = vec![
2162 ColumnIndex {
2163 index: 5,
2164 side: JoinSide::Left,
2165 },
2166 ColumnIndex {
2167 index: 5,
2168 side: JoinSide::Right,
2169 },
2170 ];
2171 let filter =
2172 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2173
2174 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2175 Ok(())
2176 }
2177
2178 #[tokio::test(flavor = "multi_thread")]
2179 async fn build_null_columns_first() -> Result<()> {
2180 let join_type = JoinType::Full;
2181 let case_expr = 1;
2182 let session_config = SessionConfig::new().with_repartition_joins(false);
2183 let task_ctx = TaskContext::default().with_session_config(session_config);
2184 let task_ctx = Arc::new(task_ctx);
2185 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2186 let left_schema = &left_partition[0].schema();
2187 let right_schema = &right_partition[0].schema();
2188 let left_sorted = [PhysicalSortExpr {
2189 expr: col("l_asc_null_first", left_schema)?,
2190 options: SortOptions {
2191 descending: false,
2192 nulls_first: true,
2193 },
2194 }]
2195 .into();
2196 let right_sorted = [PhysicalSortExpr {
2197 expr: col("r_asc_null_first", right_schema)?,
2198 options: SortOptions {
2199 descending: false,
2200 nulls_first: true,
2201 },
2202 }]
2203 .into();
2204 let (left, right) = create_memory_table(
2205 left_partition,
2206 right_partition,
2207 vec![left_sorted],
2208 vec![right_sorted],
2209 )?;
2210
2211 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2212
2213 let intermediate_schema = Schema::new(vec![
2214 Field::new("left", DataType::Int32, true),
2215 Field::new("right", DataType::Int32, true),
2216 ]);
2217 let filter_expr = join_expr_tests_fixture_i32(
2218 case_expr,
2219 col("left", &intermediate_schema)?,
2220 col("right", &intermediate_schema)?,
2221 );
2222 let column_indices = vec![
2223 ColumnIndex {
2224 index: 6,
2225 side: JoinSide::Left,
2226 },
2227 ColumnIndex {
2228 index: 6,
2229 side: JoinSide::Right,
2230 },
2231 ];
2232 let filter =
2233 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2234 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2235 Ok(())
2236 }
2237
2238 #[tokio::test(flavor = "multi_thread")]
2239 async fn build_null_columns_last() -> Result<()> {
2240 let join_type = JoinType::Full;
2241 let case_expr = 1;
2242 let session_config = SessionConfig::new().with_repartition_joins(false);
2243 let task_ctx = TaskContext::default().with_session_config(session_config);
2244 let task_ctx = Arc::new(task_ctx);
2245 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2246
2247 let left_schema = &left_partition[0].schema();
2248 let right_schema = &right_partition[0].schema();
2249 let left_sorted = [PhysicalSortExpr {
2250 expr: col("l_asc_null_last", left_schema)?,
2251 options: SortOptions {
2252 descending: false,
2253 nulls_first: false,
2254 },
2255 }]
2256 .into();
2257 let right_sorted = [PhysicalSortExpr {
2258 expr: col("r_asc_null_last", right_schema)?,
2259 options: SortOptions {
2260 descending: false,
2261 nulls_first: false,
2262 },
2263 }]
2264 .into();
2265 let (left, right) = create_memory_table(
2266 left_partition,
2267 right_partition,
2268 vec![left_sorted],
2269 vec![right_sorted],
2270 )?;
2271
2272 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2273
2274 let intermediate_schema = Schema::new(vec![
2275 Field::new("left", DataType::Int32, true),
2276 Field::new("right", DataType::Int32, true),
2277 ]);
2278 let filter_expr = join_expr_tests_fixture_i32(
2279 case_expr,
2280 col("left", &intermediate_schema)?,
2281 col("right", &intermediate_schema)?,
2282 );
2283 let column_indices = vec![
2284 ColumnIndex {
2285 index: 7,
2286 side: JoinSide::Left,
2287 },
2288 ColumnIndex {
2289 index: 7,
2290 side: JoinSide::Right,
2291 },
2292 ];
2293 let filter =
2294 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2295
2296 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2297 Ok(())
2298 }
2299
2300 #[tokio::test(flavor = "multi_thread")]
2301 async fn build_null_columns_first_descending() -> Result<()> {
2302 let join_type = JoinType::Full;
2303 let cardinality = (10, 11);
2304 let case_expr = 1;
2305 let session_config = SessionConfig::new().with_repartition_joins(false);
2306 let task_ctx = TaskContext::default().with_session_config(session_config);
2307 let task_ctx = Arc::new(task_ctx);
2308 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2309
2310 let left_schema = &left_partition[0].schema();
2311 let right_schema = &right_partition[0].schema();
2312 let left_sorted = [PhysicalSortExpr {
2313 expr: col("l_desc_null_first", left_schema)?,
2314 options: SortOptions {
2315 descending: true,
2316 nulls_first: true,
2317 },
2318 }]
2319 .into();
2320 let right_sorted = [PhysicalSortExpr {
2321 expr: col("r_desc_null_first", right_schema)?,
2322 options: SortOptions {
2323 descending: true,
2324 nulls_first: true,
2325 },
2326 }]
2327 .into();
2328 let (left, right) = create_memory_table(
2329 left_partition,
2330 right_partition,
2331 vec![left_sorted],
2332 vec![right_sorted],
2333 )?;
2334
2335 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2336
2337 let intermediate_schema = Schema::new(vec![
2338 Field::new("left", DataType::Int32, true),
2339 Field::new("right", DataType::Int32, true),
2340 ]);
2341 let filter_expr = join_expr_tests_fixture_i32(
2342 case_expr,
2343 col("left", &intermediate_schema)?,
2344 col("right", &intermediate_schema)?,
2345 );
2346 let column_indices = vec![
2347 ColumnIndex {
2348 index: 8,
2349 side: JoinSide::Left,
2350 },
2351 ColumnIndex {
2352 index: 8,
2353 side: JoinSide::Right,
2354 },
2355 ];
2356 let filter =
2357 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2358
2359 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2360 Ok(())
2361 }
2362
2363 #[tokio::test(flavor = "multi_thread")]
2364 async fn complex_join_all_one_ascending_numeric_missing_stat() -> Result<()> {
2365 let cardinality = (3, 4);
2366 let join_type = JoinType::Full;
2367
2368 let session_config = SessionConfig::new().with_repartition_joins(false);
2370 let task_ctx = TaskContext::default().with_session_config(session_config);
2371 let task_ctx = Arc::new(task_ctx);
2372 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2373
2374 let left_schema = &left_partition[0].schema();
2375 let right_schema = &right_partition[0].schema();
2376 let left_sorted = [PhysicalSortExpr {
2377 expr: col("la1", left_schema)?,
2378 options: SortOptions::default(),
2379 }]
2380 .into();
2381 let right_sorted = [PhysicalSortExpr {
2382 expr: col("ra1", right_schema)?,
2383 options: SortOptions::default(),
2384 }]
2385 .into();
2386 let (left, right) = create_memory_table(
2387 left_partition,
2388 right_partition,
2389 vec![left_sorted],
2390 vec![right_sorted],
2391 )?;
2392
2393 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2394
2395 let intermediate_schema = Schema::new(vec![
2396 Field::new("0", DataType::Int32, true),
2397 Field::new("1", DataType::Int32, true),
2398 Field::new("2", DataType::Int32, true),
2399 ]);
2400 let filter_expr = complicated_filter(&intermediate_schema)?;
2401 let column_indices = vec![
2402 ColumnIndex {
2403 index: 0,
2404 side: JoinSide::Left,
2405 },
2406 ColumnIndex {
2407 index: 4,
2408 side: JoinSide::Left,
2409 },
2410 ColumnIndex {
2411 index: 0,
2412 side: JoinSide::Right,
2413 },
2414 ];
2415 let filter =
2416 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2417
2418 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2419 Ok(())
2420 }
2421
2422 #[tokio::test(flavor = "multi_thread")]
2423 async fn complex_join_all_one_ascending_equivalence() -> Result<()> {
2424 let cardinality = (3, 4);
2425 let join_type = JoinType::Full;
2426
2427 let config = SessionConfig::new().with_repartition_joins(false);
2429 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
2432 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2433 let left_schema = &left_partition[0].schema();
2434 let right_schema = &right_partition[0].schema();
2435 let left_sorted = vec![
2436 [PhysicalSortExpr {
2437 expr: col("la1", left_schema)?,
2438 options: SortOptions::default(),
2439 }]
2440 .into(),
2441 [PhysicalSortExpr {
2442 expr: col("la2", left_schema)?,
2443 options: SortOptions::default(),
2444 }]
2445 .into(),
2446 ];
2447
2448 let right_sorted = [PhysicalSortExpr {
2449 expr: col("ra1", right_schema)?,
2450 options: SortOptions::default(),
2451 }]
2452 .into();
2453
2454 let (left, right) = create_memory_table(
2455 left_partition,
2456 right_partition,
2457 left_sorted,
2458 vec![right_sorted],
2459 )?;
2460
2461 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2462
2463 let intermediate_schema = Schema::new(vec![
2464 Field::new("0", DataType::Int32, true),
2465 Field::new("1", DataType::Int32, true),
2466 Field::new("2", DataType::Int32, true),
2467 ]);
2468 let filter_expr = complicated_filter(&intermediate_schema)?;
2469 let column_indices = vec![
2470 ColumnIndex {
2471 index: 0,
2472 side: JoinSide::Left,
2473 },
2474 ColumnIndex {
2475 index: 4,
2476 side: JoinSide::Left,
2477 },
2478 ColumnIndex {
2479 index: 0,
2480 side: JoinSide::Right,
2481 },
2482 ];
2483 let filter =
2484 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2485
2486 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2487 Ok(())
2488 }
2489
2490 #[rstest]
2491 #[tokio::test(flavor = "multi_thread")]
2492 async fn testing_with_temporal_columns(
2493 #[values(
2494 JoinType::Inner,
2495 JoinType::Left,
2496 JoinType::Right,
2497 JoinType::RightSemi,
2498 JoinType::LeftSemi,
2499 JoinType::LeftAnti,
2500 JoinType::LeftMark,
2501 JoinType::RightAnti,
2502 JoinType::RightMark,
2503 JoinType::Full
2504 )]
2505 join_type: JoinType,
2506 #[values(
2507 (4, 5),
2508 (12, 17),
2509 )]
2510 cardinality: (i32, i32),
2511 #[values(0, 1, 2)] case_expr: usize,
2512 ) -> Result<()> {
2513 let session_config = SessionConfig::new().with_repartition_joins(false);
2514 let task_ctx = TaskContext::default().with_session_config(session_config);
2515 let task_ctx = Arc::new(task_ctx);
2516 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2517
2518 let left_schema = &left_partition[0].schema();
2519 let right_schema = &right_partition[0].schema();
2520 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2521 let left_sorted = [PhysicalSortExpr {
2522 expr: col("lt1", left_schema)?,
2523 options: SortOptions {
2524 descending: false,
2525 nulls_first: true,
2526 },
2527 }]
2528 .into();
2529 let right_sorted = [PhysicalSortExpr {
2530 expr: col("rt1", right_schema)?,
2531 options: SortOptions {
2532 descending: false,
2533 nulls_first: true,
2534 },
2535 }]
2536 .into();
2537 let (left, right) = create_memory_table(
2538 left_partition,
2539 right_partition,
2540 vec![left_sorted],
2541 vec![right_sorted],
2542 )?;
2543 let intermediate_schema = Schema::new(vec![
2544 Field::new(
2545 "left",
2546 DataType::Timestamp(TimeUnit::Millisecond, None),
2547 false,
2548 ),
2549 Field::new(
2550 "right",
2551 DataType::Timestamp(TimeUnit::Millisecond, None),
2552 false,
2553 ),
2554 ]);
2555 let filter_expr = join_expr_tests_fixture_temporal(
2556 case_expr,
2557 col("left", &intermediate_schema)?,
2558 col("right", &intermediate_schema)?,
2559 &intermediate_schema,
2560 )?;
2561 let column_indices = vec![
2562 ColumnIndex {
2563 index: 3,
2564 side: JoinSide::Left,
2565 },
2566 ColumnIndex {
2567 index: 3,
2568 side: JoinSide::Right,
2569 },
2570 ];
2571 let filter =
2572 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2573 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2574 Ok(())
2575 }
2576
2577 #[rstest]
2578 #[tokio::test(flavor = "multi_thread")]
2579 async fn test_with_interval_columns(
2580 #[values(
2581 JoinType::Inner,
2582 JoinType::Left,
2583 JoinType::Right,
2584 JoinType::RightSemi,
2585 JoinType::LeftSemi,
2586 JoinType::LeftAnti,
2587 JoinType::LeftMark,
2588 JoinType::RightAnti,
2589 JoinType::RightMark,
2590 JoinType::Full
2591 )]
2592 join_type: JoinType,
2593 #[values(
2594 (4, 5),
2595 (12, 17),
2596 )]
2597 cardinality: (i32, i32),
2598 ) -> Result<()> {
2599 let session_config = SessionConfig::new().with_repartition_joins(false);
2600 let task_ctx = TaskContext::default().with_session_config(session_config);
2601 let task_ctx = Arc::new(task_ctx);
2602 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2603
2604 let left_schema = &left_partition[0].schema();
2605 let right_schema = &right_partition[0].schema();
2606 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2607 let left_sorted = [PhysicalSortExpr {
2608 expr: col("li1", left_schema)?,
2609 options: SortOptions {
2610 descending: false,
2611 nulls_first: true,
2612 },
2613 }]
2614 .into();
2615 let right_sorted = [PhysicalSortExpr {
2616 expr: col("ri1", right_schema)?,
2617 options: SortOptions {
2618 descending: false,
2619 nulls_first: true,
2620 },
2621 }]
2622 .into();
2623 let (left, right) = create_memory_table(
2624 left_partition,
2625 right_partition,
2626 vec![left_sorted],
2627 vec![right_sorted],
2628 )?;
2629 let intermediate_schema = Schema::new(vec![
2630 Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
2631 Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
2632 ]);
2633 let filter_expr = join_expr_tests_fixture_temporal(
2634 0,
2635 col("left", &intermediate_schema)?,
2636 col("right", &intermediate_schema)?,
2637 &intermediate_schema,
2638 )?;
2639 let column_indices = vec![
2640 ColumnIndex {
2641 index: 9,
2642 side: JoinSide::Left,
2643 },
2644 ColumnIndex {
2645 index: 9,
2646 side: JoinSide::Right,
2647 },
2648 ];
2649 let filter =
2650 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2651 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2652
2653 Ok(())
2654 }
2655
2656 #[rstest]
2657 #[tokio::test(flavor = "multi_thread")]
2658 async fn testing_ascending_float_pruning(
2659 #[values(
2660 JoinType::Inner,
2661 JoinType::Left,
2662 JoinType::Right,
2663 JoinType::RightSemi,
2664 JoinType::LeftSemi,
2665 JoinType::LeftAnti,
2666 JoinType::LeftMark,
2667 JoinType::RightAnti,
2668 JoinType::RightMark,
2669 JoinType::Full
2670 )]
2671 join_type: JoinType,
2672 #[values(
2673 (4, 5),
2674 (12, 17),
2675 )]
2676 cardinality: (i32, i32),
2677 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2678 ) -> Result<()> {
2679 let session_config = SessionConfig::new().with_repartition_joins(false);
2680 let task_ctx = TaskContext::default().with_session_config(session_config);
2681 let task_ctx = Arc::new(task_ctx);
2682 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2683
2684 let left_schema = &left_partition[0].schema();
2685 let right_schema = &right_partition[0].schema();
2686 let left_sorted = [PhysicalSortExpr {
2687 expr: col("l_float", left_schema)?,
2688 options: SortOptions::default(),
2689 }]
2690 .into();
2691 let right_sorted = [PhysicalSortExpr {
2692 expr: col("r_float", right_schema)?,
2693 options: SortOptions::default(),
2694 }]
2695 .into();
2696 let (left, right) = create_memory_table(
2697 left_partition,
2698 right_partition,
2699 vec![left_sorted],
2700 vec![right_sorted],
2701 )?;
2702
2703 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2704
2705 let intermediate_schema = Schema::new(vec![
2706 Field::new("left", DataType::Float64, true),
2707 Field::new("right", DataType::Float64, true),
2708 ]);
2709 let filter_expr = join_expr_tests_fixture_f64(
2710 case_expr,
2711 col("left", &intermediate_schema)?,
2712 col("right", &intermediate_schema)?,
2713 );
2714 let column_indices = vec![
2715 ColumnIndex {
2716 index: 10, side: JoinSide::Left,
2718 },
2719 ColumnIndex {
2720 index: 10, side: JoinSide::Right,
2722 },
2723 ];
2724 let filter =
2725 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2726
2727 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2728 Ok(())
2729 }
2730}