1use std::any::Any;
29use std::fmt::{self, Debug};
30use std::mem::{size_of, size_of_val};
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use std::vec;
34
35use crate::common::SharedMemoryReservation;
36use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
37use crate::joins::stream_join_utils::{
38 calculate_filter_expr_intervals, combine_two_batches,
39 convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
40 get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
41 PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
42};
43use crate::joins::utils::{
44 apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
45 check_join_is_valid, equal_rows_arr, symmetric_join_output_partitioning, update_hash,
46 BatchSplitter, BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn,
47 JoinOnRef, NoopBatchTransformer, StatefulStreamResult,
48};
49use crate::projection::{
50 join_allows_pushdown, join_table_borders, new_join_children,
51 physical_to_column_exprs, update_join_filter, update_join_on, ProjectionExec,
52};
53use crate::{
54 joins::StreamJoinPartitionMode,
55 metrics::{ExecutionPlanMetricsSet, MetricsSet},
56 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
57 PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
58};
59
60use arrow::array::{
61 ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array,
62 UInt64Array,
63};
64use arrow::compute::concat_batches;
65use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
66use arrow::record_batch::RecordBatch;
67use datafusion_common::hash_utils::create_hashes;
68use datafusion_common::utils::bisect;
69use datafusion_common::{
70 internal_err, plan_err, HashSet, JoinSide, JoinType, NullEquality, Result,
71};
72use datafusion_execution::memory_pool::MemoryConsumer;
73use datafusion_execution::TaskContext;
74use datafusion_expr::interval_arithmetic::Interval;
75use datafusion_physical_expr::equivalence::join_equivalence_properties;
76use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
77use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
78use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
79
80use ahash::RandomState;
81use futures::{ready, Stream, StreamExt};
82use parking_lot::Mutex;
83
84const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
85
86#[derive(Debug, Clone)]
172pub struct SymmetricHashJoinExec {
173 pub(crate) left: Arc<dyn ExecutionPlan>,
175 pub(crate) right: Arc<dyn ExecutionPlan>,
177 pub(crate) on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
179 pub(crate) filter: Option<JoinFilter>,
181 pub(crate) join_type: JoinType,
183 random_state: RandomState,
185 metrics: ExecutionPlanMetricsSet,
187 column_indices: Vec<ColumnIndex>,
189 pub(crate) null_equality: NullEquality,
191 pub(crate) left_sort_exprs: Option<LexOrdering>,
193 pub(crate) right_sort_exprs: Option<LexOrdering>,
195 mode: StreamJoinPartitionMode,
197 cache: PlanProperties,
199}
200
201impl SymmetricHashJoinExec {
202 #[allow(clippy::too_many_arguments)]
209 pub fn try_new(
210 left: Arc<dyn ExecutionPlan>,
211 right: Arc<dyn ExecutionPlan>,
212 on: JoinOn,
213 filter: Option<JoinFilter>,
214 join_type: &JoinType,
215 null_equality: NullEquality,
216 left_sort_exprs: Option<LexOrdering>,
217 right_sort_exprs: Option<LexOrdering>,
218 mode: StreamJoinPartitionMode,
219 ) -> Result<Self> {
220 let left_schema = left.schema();
221 let right_schema = right.schema();
222
223 if on.is_empty() {
225 return plan_err!(
226 "On constraints in SymmetricHashJoinExec should be non-empty"
227 );
228 }
229
230 check_join_is_valid(&left_schema, &right_schema, &on)?;
232
233 let (schema, column_indices) =
235 build_join_schema(&left_schema, &right_schema, join_type);
236
237 let random_state = RandomState::with_seeds(0, 0, 0, 0);
239 let schema = Arc::new(schema);
240 let cache = Self::compute_properties(&left, &right, schema, *join_type, &on)?;
241 Ok(SymmetricHashJoinExec {
242 left,
243 right,
244 on,
245 filter,
246 join_type: *join_type,
247 random_state,
248 metrics: ExecutionPlanMetricsSet::new(),
249 column_indices,
250 null_equality,
251 left_sort_exprs,
252 right_sort_exprs,
253 mode,
254 cache,
255 })
256 }
257
258 fn compute_properties(
260 left: &Arc<dyn ExecutionPlan>,
261 right: &Arc<dyn ExecutionPlan>,
262 schema: SchemaRef,
263 join_type: JoinType,
264 join_on: JoinOnRef,
265 ) -> Result<PlanProperties> {
266 let eq_properties = join_equivalence_properties(
268 left.equivalence_properties().clone(),
269 right.equivalence_properties().clone(),
270 &join_type,
271 schema,
272 &[false, false],
273 None,
275 join_on,
276 )?;
277
278 let output_partitioning =
279 symmetric_join_output_partitioning(left, right, &join_type)?;
280
281 Ok(PlanProperties::new(
282 eq_properties,
283 output_partitioning,
284 emission_type_from_children([left, right]),
285 boundedness_from_children([left, right]),
286 ))
287 }
288
289 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
291 &self.left
292 }
293
294 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
296 &self.right
297 }
298
299 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
301 &self.on
302 }
303
304 pub fn filter(&self) -> Option<&JoinFilter> {
306 self.filter.as_ref()
307 }
308
309 pub fn join_type(&self) -> &JoinType {
311 &self.join_type
312 }
313
314 pub fn null_equality(&self) -> NullEquality {
316 self.null_equality
317 }
318
319 pub fn partition_mode(&self) -> StreamJoinPartitionMode {
321 self.mode
322 }
323
324 pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
326 self.left_sort_exprs.as_ref()
327 }
328
329 pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
331 self.right_sort_exprs.as_ref()
332 }
333
334 pub fn check_if_order_information_available(&self) -> Result<bool> {
336 if let Some(filter) = self.filter() {
337 let left = self.left();
338 if let Some(left_ordering) = left.output_ordering() {
339 let right = self.right();
340 if let Some(right_ordering) = right.output_ordering() {
341 let left_convertible = convert_sort_expr_with_filter_schema(
342 &JoinSide::Left,
343 filter,
344 &left.schema(),
345 &left_ordering[0],
346 )?
347 .is_some();
348 let right_convertible = convert_sort_expr_with_filter_schema(
349 &JoinSide::Right,
350 filter,
351 &right.schema(),
352 &right_ordering[0],
353 )?
354 .is_some();
355 return Ok(left_convertible && right_convertible);
356 }
357 }
358 }
359 Ok(false)
360 }
361}
362
363impl DisplayAs for SymmetricHashJoinExec {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 match t {
366 DisplayFormatType::Default | DisplayFormatType::Verbose => {
367 let display_filter = self.filter.as_ref().map_or_else(
368 || "".to_string(),
369 |f| format!(", filter={}", f.expression()),
370 );
371 let on = self
372 .on
373 .iter()
374 .map(|(c1, c2)| format!("({c1}, {c2})"))
375 .collect::<Vec<String>>()
376 .join(", ");
377 write!(
378 f,
379 "SymmetricHashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
380 self.mode, self.join_type, on, display_filter
381 )
382 }
383 DisplayFormatType::TreeRender => {
384 let on = self
385 .on
386 .iter()
387 .map(|(c1, c2)| {
388 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
389 })
390 .collect::<Vec<String>>()
391 .join(", ");
392
393 writeln!(f, "mode={:?}", self.mode)?;
394 if *self.join_type() != JoinType::Inner {
395 writeln!(f, "join_type={:?}", self.join_type)?;
396 }
397 writeln!(f, "on={on}")
398 }
399 }
400 }
401}
402
403impl ExecutionPlan for SymmetricHashJoinExec {
404 fn name(&self) -> &'static str {
405 "SymmetricHashJoinExec"
406 }
407
408 fn as_any(&self) -> &dyn Any {
409 self
410 }
411
412 fn properties(&self) -> &PlanProperties {
413 &self.cache
414 }
415
416 fn required_input_distribution(&self) -> Vec<Distribution> {
417 match self.mode {
418 StreamJoinPartitionMode::Partitioned => {
419 let (left_expr, right_expr) = self
420 .on
421 .iter()
422 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
423 .unzip();
424 vec![
425 Distribution::HashPartitioned(left_expr),
426 Distribution::HashPartitioned(right_expr),
427 ]
428 }
429 StreamJoinPartitionMode::SinglePartition => {
430 vec![Distribution::SinglePartition, Distribution::SinglePartition]
431 }
432 }
433 }
434
435 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
436 vec![
437 self.left_sort_exprs
438 .as_ref()
439 .map(|e| OrderingRequirements::from(e.clone())),
440 self.right_sort_exprs
441 .as_ref()
442 .map(|e| OrderingRequirements::from(e.clone())),
443 ]
444 }
445
446 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
447 vec![&self.left, &self.right]
448 }
449
450 fn with_new_children(
451 self: Arc<Self>,
452 children: Vec<Arc<dyn ExecutionPlan>>,
453 ) -> Result<Arc<dyn ExecutionPlan>> {
454 Ok(Arc::new(SymmetricHashJoinExec::try_new(
455 Arc::clone(&children[0]),
456 Arc::clone(&children[1]),
457 self.on.clone(),
458 self.filter.clone(),
459 &self.join_type,
460 self.null_equality,
461 self.left_sort_exprs.clone(),
462 self.right_sort_exprs.clone(),
463 self.mode,
464 )?))
465 }
466
467 fn metrics(&self) -> Option<MetricsSet> {
468 Some(self.metrics.clone_inner())
469 }
470
471 fn statistics(&self) -> Result<Statistics> {
472 Ok(Statistics::new_unknown(&self.schema()))
474 }
475
476 fn execute(
477 &self,
478 partition: usize,
479 context: Arc<TaskContext>,
480 ) -> Result<SendableRecordBatchStream> {
481 let left_partitions = self.left.output_partitioning().partition_count();
482 let right_partitions = self.right.output_partitioning().partition_count();
483 if left_partitions != right_partitions {
484 return internal_err!(
485 "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
486 consider using RepartitionExec"
487 );
488 }
489 let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
492 self.left_sort_exprs(),
493 self.right_sort_exprs(),
494 &self.filter,
495 ) {
496 (Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
497 let (left, right, graph) = prepare_sorted_exprs(
498 filter,
499 &self.left,
500 &self.right,
501 left_sort_exprs,
502 right_sort_exprs,
503 )?;
504 (Some(left), Some(right), Some(graph))
505 }
506 _ => (None, None, None),
509 };
510
511 let (on_left, on_right) = self.on.iter().cloned().unzip();
512
513 let left_side_joiner =
514 OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
515 let right_side_joiner =
516 OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
517
518 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
519
520 let right_stream = self.right.execute(partition, Arc::clone(&context))?;
521
522 let batch_size = context.session_config().batch_size();
523 let enforce_batch_size_in_joins =
524 context.session_config().enforce_batch_size_in_joins();
525
526 let reservation = Arc::new(Mutex::new(
527 MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
528 .register(context.memory_pool()),
529 ));
530 if let Some(g) = graph.as_ref() {
531 reservation.lock().try_grow(g.size())?;
532 }
533
534 if enforce_batch_size_in_joins {
535 Ok(Box::pin(SymmetricHashJoinStream {
536 left_stream,
537 right_stream,
538 schema: self.schema(),
539 filter: self.filter.clone(),
540 join_type: self.join_type,
541 random_state: self.random_state.clone(),
542 left: left_side_joiner,
543 right: right_side_joiner,
544 column_indices: self.column_indices.clone(),
545 metrics: StreamJoinMetrics::new(partition, &self.metrics),
546 graph,
547 left_sorted_filter_expr,
548 right_sorted_filter_expr,
549 null_equality: self.null_equality,
550 state: SHJStreamState::PullRight,
551 reservation,
552 batch_transformer: BatchSplitter::new(batch_size),
553 }))
554 } else {
555 Ok(Box::pin(SymmetricHashJoinStream {
556 left_stream,
557 right_stream,
558 schema: self.schema(),
559 filter: self.filter.clone(),
560 join_type: self.join_type,
561 random_state: self.random_state.clone(),
562 left: left_side_joiner,
563 right: right_side_joiner,
564 column_indices: self.column_indices.clone(),
565 metrics: StreamJoinMetrics::new(partition, &self.metrics),
566 graph,
567 left_sorted_filter_expr,
568 right_sorted_filter_expr,
569 null_equality: self.null_equality,
570 state: SHJStreamState::PullRight,
571 reservation,
572 batch_transformer: NoopBatchTransformer::new(),
573 }))
574 }
575 }
576
577 fn try_swapping_with_projection(
581 &self,
582 projection: &ProjectionExec,
583 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
584 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr())
586 else {
587 return Ok(None);
588 };
589
590 let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
591 self.left().schema().fields().len(),
592 &projection_as_columns,
593 );
594
595 if !join_allows_pushdown(
596 &projection_as_columns,
597 &self.schema(),
598 far_right_left_col_ind,
599 far_left_right_col_ind,
600 ) {
601 return Ok(None);
602 }
603
604 let Some(new_on) = update_join_on(
605 &projection_as_columns[0..=far_right_left_col_ind as _],
606 &projection_as_columns[far_left_right_col_ind as _..],
607 self.on(),
608 self.left().schema().fields().len(),
609 ) else {
610 return Ok(None);
611 };
612
613 let new_filter = if let Some(filter) = self.filter() {
614 match update_join_filter(
615 &projection_as_columns[0..=far_right_left_col_ind as _],
616 &projection_as_columns[far_left_right_col_ind as _..],
617 filter,
618 self.left().schema().fields().len(),
619 ) {
620 Some(updated_filter) => Some(updated_filter),
621 None => return Ok(None),
622 }
623 } else {
624 None
625 };
626
627 let (new_left, new_right) = new_join_children(
628 &projection_as_columns,
629 far_right_left_col_ind,
630 far_left_right_col_ind,
631 self.left(),
632 self.right(),
633 )?;
634
635 SymmetricHashJoinExec::try_new(
636 Arc::new(new_left),
637 Arc::new(new_right),
638 new_on,
639 new_filter,
640 self.join_type(),
641 self.null_equality(),
642 self.right().output_ordering().cloned(),
643 self.left().output_ordering().cloned(),
644 self.partition_mode(),
645 )
646 .map(|e| Some(Arc::new(e) as _))
647 }
648}
649
650struct SymmetricHashJoinStream<T> {
652 left_stream: SendableRecordBatchStream,
654 right_stream: SendableRecordBatchStream,
655 schema: Arc<Schema>,
657 filter: Option<JoinFilter>,
659 join_type: JoinType,
661 left: OneSideHashJoiner,
663 right: OneSideHashJoiner,
665 column_indices: Vec<ColumnIndex>,
667 graph: Option<ExprIntervalGraph>,
669 left_sorted_filter_expr: Option<SortedFilterExpr>,
671 right_sorted_filter_expr: Option<SortedFilterExpr>,
673 random_state: RandomState,
675 null_equality: NullEquality,
677 metrics: StreamJoinMetrics,
679 reservation: SharedMemoryReservation,
681 state: SHJStreamState,
683 batch_transformer: T,
685}
686
687impl<T: BatchTransformer + Unpin + Send> RecordBatchStream
688 for SymmetricHashJoinStream<T>
689{
690 fn schema(&self) -> SchemaRef {
691 Arc::clone(&self.schema)
692 }
693}
694
695impl<T: BatchTransformer + Unpin + Send> Stream for SymmetricHashJoinStream<T> {
696 type Item = Result<RecordBatch>;
697
698 fn poll_next(
699 mut self: std::pin::Pin<&mut Self>,
700 cx: &mut Context<'_>,
701 ) -> Poll<Option<Self::Item>> {
702 self.poll_next_impl(cx)
703 }
704}
705
706fn determine_prune_length(
725 buffer: &RecordBatch,
726 build_side_filter_expr: &SortedFilterExpr,
727) -> Result<usize> {
728 let origin_sorted_expr = build_side_filter_expr.origin_sorted_expr();
729 let interval = build_side_filter_expr.interval();
730 let batch_arr = origin_sorted_expr
732 .expr
733 .evaluate(buffer)?
734 .into_array(buffer.num_rows())?;
735
736 let target = if origin_sorted_expr.options.descending {
738 interval.upper().clone()
739 } else {
740 interval.lower().clone()
741 };
742
743 bisect::<true>(&[batch_arr], &[target], &[origin_sorted_expr.options])
745}
746
747fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> bool {
762 if build_side == JoinSide::Left {
763 matches!(
764 join_type,
765 JoinType::Left
766 | JoinType::LeftAnti
767 | JoinType::Full
768 | JoinType::LeftSemi
769 | JoinType::LeftMark
770 )
771 } else {
772 matches!(
773 join_type,
774 JoinType::Right
775 | JoinType::RightAnti
776 | JoinType::Full
777 | JoinType::RightSemi
778 | JoinType::RightMark
779 )
780 }
781}
782
783fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: ArrowPrimitiveType>(
800 build_side: JoinSide,
801 prune_length: usize,
802 visited_rows: &HashSet<usize>,
803 deleted_offset: usize,
804 join_type: JoinType,
805) -> Result<(PrimitiveArray<L>, PrimitiveArray<R>)>
806where
807 NativeAdapter<L>: From<<L as ArrowPrimitiveType>::Native>,
808{
809 let result = match (build_side, join_type) {
811 (JoinSide::Left, JoinType::LeftMark) => {
827 let build_indices = (0..prune_length)
828 .map(L::Native::from_usize)
829 .collect::<PrimitiveArray<L>>();
830 let probe_indices = (0..prune_length)
831 .map(|idx| {
832 visited_rows
834 .contains(&(idx + deleted_offset))
835 .then_some(R::Native::from_usize(0).unwrap())
836 })
837 .collect();
838 (build_indices, probe_indices)
839 }
840 (JoinSide::Right, JoinType::RightMark) => {
841 let build_indices = (0..prune_length)
842 .map(L::Native::from_usize)
843 .collect::<PrimitiveArray<L>>();
844 let probe_indices = (0..prune_length)
845 .map(|idx| {
846 visited_rows
848 .contains(&(idx + deleted_offset))
849 .then_some(R::Native::from_usize(0).unwrap())
850 })
851 .collect();
852 (build_indices, probe_indices)
853 }
854 (JoinSide::Left, JoinType::Left | JoinType::LeftAnti)
856 | (JoinSide::Right, JoinType::Right | JoinType::RightAnti)
857 | (_, JoinType::Full) => {
858 let build_unmatched_indices =
859 get_pruning_anti_indices(prune_length, deleted_offset, visited_rows);
860 let mut builder =
861 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
862 builder.append_nulls(build_unmatched_indices.len());
863 let probe_indices = builder.finish();
864 (build_unmatched_indices, probe_indices)
865 }
866 (JoinSide::Left, JoinType::LeftSemi) | (JoinSide::Right, JoinType::RightSemi) => {
868 let build_unmatched_indices =
869 get_pruning_semi_indices(prune_length, deleted_offset, visited_rows);
870 let mut builder =
871 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
872 builder.append_nulls(build_unmatched_indices.len());
873 let probe_indices = builder.finish();
874 (build_unmatched_indices, probe_indices)
875 }
876 _ => unreachable!(),
878 };
879 Ok(result)
880}
881
882pub(crate) fn build_side_determined_results(
900 build_hash_joiner: &OneSideHashJoiner,
901 output_schema: &SchemaRef,
902 prune_length: usize,
903 probe_schema: SchemaRef,
904 join_type: JoinType,
905 column_indices: &[ColumnIndex],
906) -> Result<Option<RecordBatch>> {
907 if prune_length > 0
909 && need_to_produce_result_in_final(build_hash_joiner.build_side, join_type)
910 {
911 let (build_indices, probe_indices) = calculate_indices_by_join_type(
913 build_hash_joiner.build_side,
914 prune_length,
915 &build_hash_joiner.visited_rows,
916 build_hash_joiner.deleted_offset,
917 join_type,
918 )?;
919
920 let empty_probe_batch = RecordBatch::new_empty(probe_schema);
922 build_batch_from_indices(
924 output_schema.as_ref(),
925 &build_hash_joiner.input_buffer,
926 &empty_probe_batch,
927 &build_indices,
928 &probe_indices,
929 column_indices,
930 build_hash_joiner.build_side,
931 )
932 .map(|batch| (batch.num_rows() > 0).then_some(batch))
933 } else {
934 Ok(None)
936 }
937}
938
939#[allow(clippy::too_many_arguments)]
959pub(crate) fn join_with_probe_batch(
960 build_hash_joiner: &mut OneSideHashJoiner,
961 probe_hash_joiner: &mut OneSideHashJoiner,
962 schema: &SchemaRef,
963 join_type: JoinType,
964 filter: Option<&JoinFilter>,
965 probe_batch: &RecordBatch,
966 column_indices: &[ColumnIndex],
967 random_state: &RandomState,
968 null_equality: NullEquality,
969) -> Result<Option<RecordBatch>> {
970 if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
971 return Ok(None);
972 }
973 let (build_indices, probe_indices) = lookup_join_hashmap(
974 &build_hash_joiner.hashmap,
975 &build_hash_joiner.input_buffer,
976 probe_batch,
977 &build_hash_joiner.on,
978 &probe_hash_joiner.on,
979 random_state,
980 null_equality,
981 &mut build_hash_joiner.hashes_buffer,
982 Some(build_hash_joiner.deleted_offset),
983 )?;
984
985 let (build_indices, probe_indices) = if let Some(filter) = filter {
986 apply_join_filter_to_indices(
987 &build_hash_joiner.input_buffer,
988 probe_batch,
989 build_indices,
990 probe_indices,
991 filter,
992 build_hash_joiner.build_side,
993 None,
994 )?
995 } else {
996 (build_indices, probe_indices)
997 };
998
999 if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
1000 record_visited_indices(
1001 &mut build_hash_joiner.visited_rows,
1002 build_hash_joiner.deleted_offset,
1003 &build_indices,
1004 );
1005 }
1006 if need_to_produce_result_in_final(build_hash_joiner.build_side.negate(), join_type) {
1007 record_visited_indices(
1008 &mut probe_hash_joiner.visited_rows,
1009 probe_hash_joiner.offset,
1010 &probe_indices,
1011 );
1012 }
1013 if matches!(
1014 join_type,
1015 JoinType::LeftAnti
1016 | JoinType::RightAnti
1017 | JoinType::LeftSemi
1018 | JoinType::LeftMark
1019 | JoinType::RightSemi
1020 | JoinType::RightMark
1021 ) {
1022 Ok(None)
1023 } else {
1024 build_batch_from_indices(
1025 schema,
1026 &build_hash_joiner.input_buffer,
1027 probe_batch,
1028 &build_indices,
1029 &probe_indices,
1030 column_indices,
1031 build_hash_joiner.build_side,
1032 )
1033 .map(|batch| (batch.num_rows() > 0).then_some(batch))
1034 }
1035}
1036
1037#[allow(clippy::too_many_arguments)]
1057fn lookup_join_hashmap(
1058 build_hashmap: &PruningJoinHashMap,
1059 build_batch: &RecordBatch,
1060 probe_batch: &RecordBatch,
1061 build_on: &[PhysicalExprRef],
1062 probe_on: &[PhysicalExprRef],
1063 random_state: &RandomState,
1064 null_equality: NullEquality,
1065 hashes_buffer: &mut Vec<u64>,
1066 deleted_offset: Option<usize>,
1067) -> Result<(UInt64Array, UInt32Array)> {
1068 let keys_values = probe_on
1069 .iter()
1070 .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1071 .collect::<Result<Vec<_>>>()?;
1072 let build_join_values = build_on
1073 .iter()
1074 .map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1075 .collect::<Result<Vec<_>>>()?;
1076
1077 hashes_buffer.clear();
1078 hashes_buffer.resize(probe_batch.num_rows(), 0);
1079 let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
1080
1081 let (mut matched_probe, mut matched_build) = build_hashmap.get_matched_indices(
1112 Box::new(hash_values.iter().enumerate().rev()),
1113 deleted_offset,
1114 );
1115
1116 matched_probe.reverse();
1117 matched_build.reverse();
1118
1119 let build_indices: UInt64Array = matched_build.into();
1120 let probe_indices: UInt32Array = matched_probe.into();
1121
1122 let (build_indices, probe_indices) = equal_rows_arr(
1123 &build_indices,
1124 &probe_indices,
1125 &build_join_values,
1126 &keys_values,
1127 null_equality,
1128 )?;
1129
1130 Ok((build_indices, probe_indices))
1131}
1132
1133pub struct OneSideHashJoiner {
1134 build_side: JoinSide,
1136 pub input_buffer: RecordBatch,
1138 pub(crate) on: Vec<PhysicalExprRef>,
1140 pub(crate) hashmap: PruningJoinHashMap,
1142 pub(crate) hashes_buffer: Vec<u64>,
1144 pub(crate) visited_rows: HashSet<usize>,
1146 pub(crate) offset: usize,
1148 pub(crate) deleted_offset: usize,
1150}
1151
1152impl OneSideHashJoiner {
1153 pub fn size(&self) -> usize {
1154 let mut size = 0;
1155 size += size_of_val(self);
1156 size += size_of_val(&self.build_side);
1157 size += self.input_buffer.get_array_memory_size();
1158 size += size_of_val(&self.on);
1159 size += self.hashmap.size();
1160 size += self.hashes_buffer.capacity() * size_of::<u64>();
1161 size += self.visited_rows.capacity() * size_of::<usize>();
1162 size += size_of_val(&self.offset);
1163 size += size_of_val(&self.deleted_offset);
1164 size
1165 }
1166 pub fn new(
1167 build_side: JoinSide,
1168 on: Vec<PhysicalExprRef>,
1169 schema: SchemaRef,
1170 ) -> Self {
1171 Self {
1172 build_side,
1173 input_buffer: RecordBatch::new_empty(schema),
1174 on,
1175 hashmap: PruningJoinHashMap::with_capacity(0),
1176 hashes_buffer: vec![],
1177 visited_rows: HashSet::new(),
1178 offset: 0,
1179 deleted_offset: 0,
1180 }
1181 }
1182
1183 pub(crate) fn update_internal_state(
1194 &mut self,
1195 batch: &RecordBatch,
1196 random_state: &RandomState,
1197 ) -> Result<()> {
1198 self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
1200 self.hashes_buffer.resize(batch.num_rows(), 0);
1202 update_hash(
1205 &self.on,
1206 batch,
1207 &mut self.hashmap,
1208 self.offset,
1209 random_state,
1210 &mut self.hashes_buffer,
1211 self.deleted_offset,
1212 false,
1213 )?;
1214 Ok(())
1215 }
1216
1217 pub(crate) fn calculate_prune_length_with_probe_batch(
1229 &mut self,
1230 build_side_sorted_filter_expr: &mut SortedFilterExpr,
1231 probe_side_sorted_filter_expr: &mut SortedFilterExpr,
1232 graph: &mut ExprIntervalGraph,
1233 ) -> Result<usize> {
1234 if self.input_buffer.num_rows() == 0 {
1236 return Ok(0);
1237 }
1238 let mut filter_intervals = vec![];
1241 for expr in [
1242 &build_side_sorted_filter_expr,
1243 &probe_side_sorted_filter_expr,
1244 ] {
1245 filter_intervals.push((expr.node_index(), expr.interval().clone()))
1246 }
1247 graph.update_ranges(&mut filter_intervals, Interval::CERTAINLY_TRUE)?;
1249 let calculated_build_side_interval = filter_intervals.remove(0).1;
1251 if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
1253 return Ok(0);
1254 }
1255 build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
1257
1258 determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
1259 }
1260
1261 pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> {
1262 self.hashmap.prune_hash_values(
1264 prune_length,
1265 self.deleted_offset as u64,
1266 HASHMAP_SHRINK_SCALE_FACTOR,
1267 );
1268 for row in self.deleted_offset..(self.deleted_offset + prune_length) {
1270 self.visited_rows.remove(&row);
1271 }
1272 self.input_buffer = self
1274 .input_buffer
1275 .slice(prune_length, self.input_buffer.num_rows() - prune_length);
1276 self.deleted_offset += prune_length;
1278 Ok(())
1279 }
1280}
1281
1282impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
1327 fn poll_next_impl(
1341 &mut self,
1342 cx: &mut Context<'_>,
1343 ) -> Poll<Option<Result<RecordBatch>>> {
1344 loop {
1345 match self.batch_transformer.next() {
1346 None => {
1347 let result = match self.state() {
1348 SHJStreamState::PullRight => {
1349 ready!(self.fetch_next_from_right_stream(cx))
1350 }
1351 SHJStreamState::PullLeft => {
1352 ready!(self.fetch_next_from_left_stream(cx))
1353 }
1354 SHJStreamState::RightExhausted => {
1355 ready!(self.handle_right_stream_end(cx))
1356 }
1357 SHJStreamState::LeftExhausted => {
1358 ready!(self.handle_left_stream_end(cx))
1359 }
1360 SHJStreamState::BothExhausted {
1361 final_result: false,
1362 } => self.prepare_for_final_results_after_exhaustion(),
1363 SHJStreamState::BothExhausted { final_result: true } => {
1364 return Poll::Ready(None);
1365 }
1366 };
1367
1368 match result? {
1369 StatefulStreamResult::Ready(None) => {
1370 return Poll::Ready(None);
1371 }
1372 StatefulStreamResult::Ready(Some(batch)) => {
1373 self.batch_transformer.set_batch(batch);
1374 }
1375 _ => {}
1376 }
1377 }
1378 Some((batch, _)) => {
1379 self.metrics.output_batches.add(1);
1380 return self
1381 .metrics
1382 .baseline_metrics
1383 .record_poll(Poll::Ready(Some(Ok(batch))));
1384 }
1385 }
1386 }
1387 }
1388 fn fetch_next_from_right_stream(
1398 &mut self,
1399 cx: &mut Context<'_>,
1400 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1401 match ready!(self.right_stream().poll_next_unpin(cx)) {
1402 Some(Ok(batch)) => {
1403 if batch.num_rows() == 0 {
1404 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1405 }
1406 self.set_state(SHJStreamState::PullLeft);
1407 Poll::Ready(self.process_batch_from_right(batch))
1408 }
1409 Some(Err(e)) => Poll::Ready(Err(e)),
1410 None => {
1411 self.set_state(SHJStreamState::RightExhausted);
1412 Poll::Ready(Ok(StatefulStreamResult::Continue))
1413 }
1414 }
1415 }
1416
1417 fn fetch_next_from_left_stream(
1427 &mut self,
1428 cx: &mut Context<'_>,
1429 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1430 match ready!(self.left_stream().poll_next_unpin(cx)) {
1431 Some(Ok(batch)) => {
1432 if batch.num_rows() == 0 {
1433 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1434 }
1435 self.set_state(SHJStreamState::PullRight);
1436 Poll::Ready(self.process_batch_from_left(batch))
1437 }
1438 Some(Err(e)) => Poll::Ready(Err(e)),
1439 None => {
1440 self.set_state(SHJStreamState::LeftExhausted);
1441 Poll::Ready(Ok(StatefulStreamResult::Continue))
1442 }
1443 }
1444 }
1445
1446 fn handle_right_stream_end(
1457 &mut self,
1458 cx: &mut Context<'_>,
1459 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1460 match ready!(self.left_stream().poll_next_unpin(cx)) {
1461 Some(Ok(batch)) => {
1462 if batch.num_rows() == 0 {
1463 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1464 }
1465 Poll::Ready(self.process_batch_after_right_end(batch))
1466 }
1467 Some(Err(e)) => Poll::Ready(Err(e)),
1468 None => {
1469 self.set_state(SHJStreamState::BothExhausted {
1470 final_result: false,
1471 });
1472 Poll::Ready(Ok(StatefulStreamResult::Continue))
1473 }
1474 }
1475 }
1476
1477 fn handle_left_stream_end(
1488 &mut self,
1489 cx: &mut Context<'_>,
1490 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1491 match ready!(self.right_stream().poll_next_unpin(cx)) {
1492 Some(Ok(batch)) => {
1493 if batch.num_rows() == 0 {
1494 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1495 }
1496 Poll::Ready(self.process_batch_after_left_end(batch))
1497 }
1498 Some(Err(e)) => Poll::Ready(Err(e)),
1499 None => {
1500 self.set_state(SHJStreamState::BothExhausted {
1501 final_result: false,
1502 });
1503 Poll::Ready(Ok(StatefulStreamResult::Continue))
1504 }
1505 }
1506 }
1507
1508 fn prepare_for_final_results_after_exhaustion(
1518 &mut self,
1519 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1520 self.set_state(SHJStreamState::BothExhausted { final_result: true });
1521 self.process_batches_before_finalization()
1522 }
1523
1524 fn process_batch_from_right(
1525 &mut self,
1526 batch: RecordBatch,
1527 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1528 self.perform_join_for_given_side(batch, JoinSide::Right)
1529 .map(|maybe_batch| {
1530 if maybe_batch.is_some() {
1531 StatefulStreamResult::Ready(maybe_batch)
1532 } else {
1533 StatefulStreamResult::Continue
1534 }
1535 })
1536 }
1537
1538 fn process_batch_from_left(
1539 &mut self,
1540 batch: RecordBatch,
1541 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1542 self.perform_join_for_given_side(batch, JoinSide::Left)
1543 .map(|maybe_batch| {
1544 if maybe_batch.is_some() {
1545 StatefulStreamResult::Ready(maybe_batch)
1546 } else {
1547 StatefulStreamResult::Continue
1548 }
1549 })
1550 }
1551
1552 fn process_batch_after_left_end(
1553 &mut self,
1554 right_batch: RecordBatch,
1555 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1556 self.process_batch_from_right(right_batch)
1557 }
1558
1559 fn process_batch_after_right_end(
1560 &mut self,
1561 left_batch: RecordBatch,
1562 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1563 self.process_batch_from_left(left_batch)
1564 }
1565
1566 fn process_batches_before_finalization(
1567 &mut self,
1568 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1569 let left_result = build_side_determined_results(
1571 &self.left,
1572 &self.schema,
1573 self.left.input_buffer.num_rows(),
1574 self.right.input_buffer.schema(),
1575 self.join_type,
1576 &self.column_indices,
1577 )?;
1578 let right_result = build_side_determined_results(
1580 &self.right,
1581 &self.schema,
1582 self.right.input_buffer.num_rows(),
1583 self.left.input_buffer.schema(),
1584 self.join_type,
1585 &self.column_indices,
1586 )?;
1587
1588 let result = combine_two_batches(&self.schema, left_result, right_result)?;
1590
1591 if result.is_some() {
1593 return Ok(StatefulStreamResult::Ready(result));
1594 }
1595 Ok(StatefulStreamResult::Continue)
1596 }
1597
1598 fn right_stream(&mut self) -> &mut SendableRecordBatchStream {
1599 &mut self.right_stream
1600 }
1601
1602 fn left_stream(&mut self) -> &mut SendableRecordBatchStream {
1603 &mut self.left_stream
1604 }
1605
1606 fn set_state(&mut self, state: SHJStreamState) {
1607 self.state = state;
1608 }
1609
1610 fn state(&mut self) -> SHJStreamState {
1611 self.state.clone()
1612 }
1613
1614 fn size(&self) -> usize {
1615 let mut size = 0;
1616 size += size_of_val(&self.schema);
1617 size += size_of_val(&self.filter);
1618 size += size_of_val(&self.join_type);
1619 size += self.left.size();
1620 size += self.right.size();
1621 size += size_of_val(&self.column_indices);
1622 size += self.graph.as_ref().map(|g| g.size()).unwrap_or(0);
1623 size += size_of_val(&self.left_sorted_filter_expr);
1624 size += size_of_val(&self.right_sorted_filter_expr);
1625 size += size_of_val(&self.random_state);
1626 size += size_of_val(&self.null_equality);
1627 size += size_of_val(&self.metrics);
1628 size
1629 }
1630
1631 fn perform_join_for_given_side(
1639 &mut self,
1640 probe_batch: RecordBatch,
1641 probe_side: JoinSide,
1642 ) -> Result<Option<RecordBatch>> {
1643 let (
1644 probe_hash_joiner,
1645 build_hash_joiner,
1646 probe_side_sorted_filter_expr,
1647 build_side_sorted_filter_expr,
1648 probe_side_metrics,
1649 ) = if probe_side.eq(&JoinSide::Left) {
1650 (
1651 &mut self.left,
1652 &mut self.right,
1653 &mut self.left_sorted_filter_expr,
1654 &mut self.right_sorted_filter_expr,
1655 &mut self.metrics.left,
1656 )
1657 } else {
1658 (
1659 &mut self.right,
1660 &mut self.left,
1661 &mut self.right_sorted_filter_expr,
1662 &mut self.left_sorted_filter_expr,
1663 &mut self.metrics.right,
1664 )
1665 };
1666 probe_side_metrics.input_batches.add(1);
1668 probe_side_metrics.input_rows.add(probe_batch.num_rows());
1669 probe_hash_joiner.update_internal_state(&probe_batch, &self.random_state)?;
1671 let equal_result = join_with_probe_batch(
1673 build_hash_joiner,
1674 probe_hash_joiner,
1675 &self.schema,
1676 self.join_type,
1677 self.filter.as_ref(),
1678 &probe_batch,
1679 &self.column_indices,
1680 &self.random_state,
1681 self.null_equality,
1682 )?;
1683 probe_hash_joiner.offset += probe_batch.num_rows();
1685
1686 let anti_result = if let (
1687 Some(build_side_sorted_filter_expr),
1688 Some(probe_side_sorted_filter_expr),
1689 Some(graph),
1690 ) = (
1691 build_side_sorted_filter_expr.as_mut(),
1692 probe_side_sorted_filter_expr.as_mut(),
1693 self.graph.as_mut(),
1694 ) {
1695 calculate_filter_expr_intervals(
1697 &build_hash_joiner.input_buffer,
1698 build_side_sorted_filter_expr,
1699 &probe_batch,
1700 probe_side_sorted_filter_expr,
1701 )?;
1702 let prune_length = build_hash_joiner
1703 .calculate_prune_length_with_probe_batch(
1704 build_side_sorted_filter_expr,
1705 probe_side_sorted_filter_expr,
1706 graph,
1707 )?;
1708 let result = build_side_determined_results(
1709 build_hash_joiner,
1710 &self.schema,
1711 prune_length,
1712 probe_batch.schema(),
1713 self.join_type,
1714 &self.column_indices,
1715 )?;
1716 build_hash_joiner.prune_internal_state(prune_length)?;
1717 result
1718 } else {
1719 None
1720 };
1721
1722 let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
1724 let capacity = self.size();
1725 self.metrics.stream_memory_usage.set(capacity);
1726 self.reservation.lock().try_resize(capacity)?;
1727 Ok(result)
1728 }
1729}
1730
1731#[derive(Clone, Debug)]
1739pub enum SHJStreamState {
1740 PullRight,
1742
1743 PullLeft,
1745
1746 RightExhausted,
1748
1749 LeftExhausted,
1751
1752 BothExhausted { final_result: bool },
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761 use std::collections::HashMap;
1762 use std::sync::{LazyLock, Mutex};
1763
1764 use super::*;
1765 use crate::joins::test_utils::{
1766 build_sides_record_batches, compare_batches, complicated_filter,
1767 create_memory_table, join_expr_tests_fixture_f64, join_expr_tests_fixture_i32,
1768 join_expr_tests_fixture_temporal, partitioned_hash_join_with_filter,
1769 partitioned_sym_join_with_filter, split_record_batches,
1770 };
1771
1772 use arrow::compute::SortOptions;
1773 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
1774 use datafusion_common::ScalarValue;
1775 use datafusion_execution::config::SessionConfig;
1776 use datafusion_expr::Operator;
1777 use datafusion_physical_expr::expressions::{binary, col, lit, Column};
1778 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1779
1780 use rstest::*;
1781
1782 const TABLE_SIZE: i32 = 30;
1783
1784 type TableKey = (i32, i32, usize); type TableValue = (Vec<RecordBatch>, Vec<RecordBatch>); static TABLE_CACHE: LazyLock<Mutex<HashMap<TableKey, TableValue>>> =
1789 LazyLock::new(|| Mutex::new(HashMap::new()));
1790
1791 fn get_or_create_table(
1792 cardinality: (i32, i32),
1793 batch_size: usize,
1794 ) -> Result<TableValue> {
1795 {
1796 let cache = TABLE_CACHE.lock().unwrap();
1797 if let Some(table) = cache.get(&(cardinality.0, cardinality.1, batch_size)) {
1798 return Ok(table.clone());
1799 }
1800 }
1801
1802 let (left_batch, right_batch) =
1804 build_sides_record_batches(TABLE_SIZE, cardinality)?;
1805
1806 let (left_partition, right_partition) = (
1807 split_record_batches(&left_batch, batch_size)?,
1808 split_record_batches(&right_batch, batch_size)?,
1809 );
1810
1811 let mut cache = TABLE_CACHE.lock().unwrap();
1813
1814 cache.insert(
1816 (cardinality.0, cardinality.1, batch_size),
1817 (left_partition.clone(), right_partition.clone()),
1818 );
1819
1820 Ok((left_partition, right_partition))
1821 }
1822
1823 pub async fn experiment(
1824 left: Arc<dyn ExecutionPlan>,
1825 right: Arc<dyn ExecutionPlan>,
1826 filter: Option<JoinFilter>,
1827 join_type: JoinType,
1828 on: JoinOn,
1829 task_ctx: Arc<TaskContext>,
1830 ) -> Result<()> {
1831 let first_batches = partitioned_sym_join_with_filter(
1832 Arc::clone(&left),
1833 Arc::clone(&right),
1834 on.clone(),
1835 filter.clone(),
1836 &join_type,
1837 NullEquality::NullEqualsNothing,
1838 Arc::clone(&task_ctx),
1839 )
1840 .await?;
1841 let second_batches = partitioned_hash_join_with_filter(
1842 left,
1843 right,
1844 on,
1845 filter,
1846 &join_type,
1847 NullEquality::NullEqualsNothing,
1848 task_ctx,
1849 )
1850 .await?;
1851 compare_batches(&first_batches, &second_batches);
1852 Ok(())
1853 }
1854
1855 #[rstest]
1856 #[tokio::test(flavor = "multi_thread")]
1857 async fn complex_join_all_one_ascending_numeric(
1858 #[values(
1859 JoinType::Inner,
1860 JoinType::Left,
1861 JoinType::Right,
1862 JoinType::RightSemi,
1863 JoinType::LeftSemi,
1864 JoinType::LeftAnti,
1865 JoinType::LeftMark,
1866 JoinType::RightAnti,
1867 JoinType::RightMark,
1868 JoinType::Full
1869 )]
1870 join_type: JoinType,
1871 #[values(
1872 (4, 5),
1873 (12, 17),
1874 )]
1875 cardinality: (i32, i32),
1876 ) -> Result<()> {
1877 let task_ctx = Arc::new(TaskContext::default());
1879
1880 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
1881
1882 let left_schema = &left_partition[0].schema();
1883 let right_schema = &right_partition[0].schema();
1884
1885 let left_sorted = [PhysicalSortExpr {
1886 expr: binary(
1887 col("la1", left_schema)?,
1888 Operator::Plus,
1889 col("la2", left_schema)?,
1890 left_schema,
1891 )?,
1892 options: SortOptions::default(),
1893 }]
1894 .into();
1895 let right_sorted = [PhysicalSortExpr {
1896 expr: col("ra1", right_schema)?,
1897 options: SortOptions::default(),
1898 }]
1899 .into();
1900 let (left, right) = create_memory_table(
1901 left_partition,
1902 right_partition,
1903 vec![left_sorted],
1904 vec![right_sorted],
1905 )?;
1906
1907 let on = vec![(
1908 binary(
1909 col("lc1", left_schema)?,
1910 Operator::Plus,
1911 lit(ScalarValue::Int32(Some(1))),
1912 left_schema,
1913 )?,
1914 Arc::new(Column::new_with_schema("rc1", right_schema)?) as _,
1915 )];
1916
1917 let intermediate_schema = Schema::new(vec![
1918 Field::new("0", DataType::Int32, true),
1919 Field::new("1", DataType::Int32, true),
1920 Field::new("2", DataType::Int32, true),
1921 ]);
1922 let filter_expr = complicated_filter(&intermediate_schema)?;
1923 let column_indices = vec![
1924 ColumnIndex {
1925 index: left_schema.index_of("la1")?,
1926 side: JoinSide::Left,
1927 },
1928 ColumnIndex {
1929 index: left_schema.index_of("la2")?,
1930 side: JoinSide::Left,
1931 },
1932 ColumnIndex {
1933 index: right_schema.index_of("ra1")?,
1934 side: JoinSide::Right,
1935 },
1936 ];
1937 let filter =
1938 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1939
1940 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1941 Ok(())
1942 }
1943
1944 #[rstest]
1945 #[tokio::test(flavor = "multi_thread")]
1946 async fn join_all_one_ascending_numeric(
1947 #[values(
1948 JoinType::Inner,
1949 JoinType::Left,
1950 JoinType::Right,
1951 JoinType::RightSemi,
1952 JoinType::LeftSemi,
1953 JoinType::LeftAnti,
1954 JoinType::LeftMark,
1955 JoinType::RightAnti,
1956 JoinType::RightMark,
1957 JoinType::Full
1958 )]
1959 join_type: JoinType,
1960 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1961 ) -> Result<()> {
1962 let task_ctx = Arc::new(TaskContext::default());
1963 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1964
1965 let left_schema = &left_partition[0].schema();
1966 let right_schema = &right_partition[0].schema();
1967
1968 let left_sorted = [PhysicalSortExpr {
1969 expr: col("la1", left_schema)?,
1970 options: SortOptions::default(),
1971 }]
1972 .into();
1973 let right_sorted = [PhysicalSortExpr {
1974 expr: col("ra1", right_schema)?,
1975 options: SortOptions::default(),
1976 }]
1977 .into();
1978 let (left, right) = create_memory_table(
1979 left_partition,
1980 right_partition,
1981 vec![left_sorted],
1982 vec![right_sorted],
1983 )?;
1984
1985 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1986
1987 let intermediate_schema = Schema::new(vec![
1988 Field::new("left", DataType::Int32, true),
1989 Field::new("right", DataType::Int32, true),
1990 ]);
1991 let filter_expr = join_expr_tests_fixture_i32(
1992 case_expr,
1993 col("left", &intermediate_schema)?,
1994 col("right", &intermediate_schema)?,
1995 );
1996 let column_indices = vec![
1997 ColumnIndex {
1998 index: 0,
1999 side: JoinSide::Left,
2000 },
2001 ColumnIndex {
2002 index: 0,
2003 side: JoinSide::Right,
2004 },
2005 ];
2006 let filter =
2007 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2008
2009 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2010 Ok(())
2011 }
2012
2013 #[rstest]
2014 #[tokio::test(flavor = "multi_thread")]
2015 async fn join_without_sort_information(
2016 #[values(
2017 JoinType::Inner,
2018 JoinType::Left,
2019 JoinType::Right,
2020 JoinType::RightSemi,
2021 JoinType::LeftSemi,
2022 JoinType::LeftAnti,
2023 JoinType::LeftMark,
2024 JoinType::RightAnti,
2025 JoinType::RightMark,
2026 JoinType::Full
2027 )]
2028 join_type: JoinType,
2029 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2030 ) -> Result<()> {
2031 let task_ctx = Arc::new(TaskContext::default());
2032 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
2033
2034 let left_schema = &left_partition[0].schema();
2035 let right_schema = &right_partition[0].schema();
2036 let (left, right) =
2037 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2038
2039 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2040
2041 let intermediate_schema = Schema::new(vec![
2042 Field::new("left", DataType::Int32, true),
2043 Field::new("right", DataType::Int32, true),
2044 ]);
2045 let filter_expr = join_expr_tests_fixture_i32(
2046 case_expr,
2047 col("left", &intermediate_schema)?,
2048 col("right", &intermediate_schema)?,
2049 );
2050 let column_indices = vec![
2051 ColumnIndex {
2052 index: 5,
2053 side: JoinSide::Left,
2054 },
2055 ColumnIndex {
2056 index: 5,
2057 side: JoinSide::Right,
2058 },
2059 ];
2060 let filter =
2061 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2062
2063 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2064 Ok(())
2065 }
2066
2067 #[rstest]
2068 #[tokio::test(flavor = "multi_thread")]
2069 async fn join_without_filter(
2070 #[values(
2071 JoinType::Inner,
2072 JoinType::Left,
2073 JoinType::Right,
2074 JoinType::RightSemi,
2075 JoinType::LeftSemi,
2076 JoinType::LeftAnti,
2077 JoinType::LeftMark,
2078 JoinType::RightAnti,
2079 JoinType::RightMark,
2080 JoinType::Full
2081 )]
2082 join_type: JoinType,
2083 ) -> Result<()> {
2084 let task_ctx = Arc::new(TaskContext::default());
2085 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2086 let left_schema = &left_partition[0].schema();
2087 let right_schema = &right_partition[0].schema();
2088 let (left, right) =
2089 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2090
2091 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2092 experiment(left, right, None, join_type, on, task_ctx).await?;
2093 Ok(())
2094 }
2095
2096 #[rstest]
2097 #[tokio::test(flavor = "multi_thread")]
2098 async fn join_all_one_descending_numeric_particular(
2099 #[values(
2100 JoinType::Inner,
2101 JoinType::Left,
2102 JoinType::Right,
2103 JoinType::RightSemi,
2104 JoinType::LeftSemi,
2105 JoinType::LeftAnti,
2106 JoinType::LeftMark,
2107 JoinType::RightAnti,
2108 JoinType::RightMark,
2109 JoinType::Full
2110 )]
2111 join_type: JoinType,
2112 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2113 ) -> Result<()> {
2114 let task_ctx = Arc::new(TaskContext::default());
2115 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2116 let left_schema = &left_partition[0].schema();
2117 let right_schema = &right_partition[0].schema();
2118 let left_sorted = [PhysicalSortExpr {
2119 expr: col("la1_des", left_schema)?,
2120 options: SortOptions {
2121 descending: true,
2122 nulls_first: true,
2123 },
2124 }]
2125 .into();
2126 let right_sorted = [PhysicalSortExpr {
2127 expr: col("ra1_des", right_schema)?,
2128 options: SortOptions {
2129 descending: true,
2130 nulls_first: true,
2131 },
2132 }]
2133 .into();
2134 let (left, right) = create_memory_table(
2135 left_partition,
2136 right_partition,
2137 vec![left_sorted],
2138 vec![right_sorted],
2139 )?;
2140
2141 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2142
2143 let intermediate_schema = Schema::new(vec![
2144 Field::new("left", DataType::Int32, true),
2145 Field::new("right", DataType::Int32, true),
2146 ]);
2147 let filter_expr = join_expr_tests_fixture_i32(
2148 case_expr,
2149 col("left", &intermediate_schema)?,
2150 col("right", &intermediate_schema)?,
2151 );
2152 let column_indices = vec![
2153 ColumnIndex {
2154 index: 5,
2155 side: JoinSide::Left,
2156 },
2157 ColumnIndex {
2158 index: 5,
2159 side: JoinSide::Right,
2160 },
2161 ];
2162 let filter =
2163 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2164
2165 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2166 Ok(())
2167 }
2168
2169 #[tokio::test(flavor = "multi_thread")]
2170 async fn build_null_columns_first() -> Result<()> {
2171 let join_type = JoinType::Full;
2172 let case_expr = 1;
2173 let session_config = SessionConfig::new().with_repartition_joins(false);
2174 let task_ctx = TaskContext::default().with_session_config(session_config);
2175 let task_ctx = Arc::new(task_ctx);
2176 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2177 let left_schema = &left_partition[0].schema();
2178 let right_schema = &right_partition[0].schema();
2179 let left_sorted = [PhysicalSortExpr {
2180 expr: col("l_asc_null_first", left_schema)?,
2181 options: SortOptions {
2182 descending: false,
2183 nulls_first: true,
2184 },
2185 }]
2186 .into();
2187 let right_sorted = [PhysicalSortExpr {
2188 expr: col("r_asc_null_first", right_schema)?,
2189 options: SortOptions {
2190 descending: false,
2191 nulls_first: true,
2192 },
2193 }]
2194 .into();
2195 let (left, right) = create_memory_table(
2196 left_partition,
2197 right_partition,
2198 vec![left_sorted],
2199 vec![right_sorted],
2200 )?;
2201
2202 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2203
2204 let intermediate_schema = Schema::new(vec![
2205 Field::new("left", DataType::Int32, true),
2206 Field::new("right", DataType::Int32, true),
2207 ]);
2208 let filter_expr = join_expr_tests_fixture_i32(
2209 case_expr,
2210 col("left", &intermediate_schema)?,
2211 col("right", &intermediate_schema)?,
2212 );
2213 let column_indices = vec![
2214 ColumnIndex {
2215 index: 6,
2216 side: JoinSide::Left,
2217 },
2218 ColumnIndex {
2219 index: 6,
2220 side: JoinSide::Right,
2221 },
2222 ];
2223 let filter =
2224 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2225 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2226 Ok(())
2227 }
2228
2229 #[tokio::test(flavor = "multi_thread")]
2230 async fn build_null_columns_last() -> Result<()> {
2231 let join_type = JoinType::Full;
2232 let case_expr = 1;
2233 let session_config = SessionConfig::new().with_repartition_joins(false);
2234 let task_ctx = TaskContext::default().with_session_config(session_config);
2235 let task_ctx = Arc::new(task_ctx);
2236 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2237
2238 let left_schema = &left_partition[0].schema();
2239 let right_schema = &right_partition[0].schema();
2240 let left_sorted = [PhysicalSortExpr {
2241 expr: col("l_asc_null_last", left_schema)?,
2242 options: SortOptions {
2243 descending: false,
2244 nulls_first: false,
2245 },
2246 }]
2247 .into();
2248 let right_sorted = [PhysicalSortExpr {
2249 expr: col("r_asc_null_last", right_schema)?,
2250 options: SortOptions {
2251 descending: false,
2252 nulls_first: false,
2253 },
2254 }]
2255 .into();
2256 let (left, right) = create_memory_table(
2257 left_partition,
2258 right_partition,
2259 vec![left_sorted],
2260 vec![right_sorted],
2261 )?;
2262
2263 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2264
2265 let intermediate_schema = Schema::new(vec![
2266 Field::new("left", DataType::Int32, true),
2267 Field::new("right", DataType::Int32, true),
2268 ]);
2269 let filter_expr = join_expr_tests_fixture_i32(
2270 case_expr,
2271 col("left", &intermediate_schema)?,
2272 col("right", &intermediate_schema)?,
2273 );
2274 let column_indices = vec![
2275 ColumnIndex {
2276 index: 7,
2277 side: JoinSide::Left,
2278 },
2279 ColumnIndex {
2280 index: 7,
2281 side: JoinSide::Right,
2282 },
2283 ];
2284 let filter =
2285 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2286
2287 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2288 Ok(())
2289 }
2290
2291 #[tokio::test(flavor = "multi_thread")]
2292 async fn build_null_columns_first_descending() -> Result<()> {
2293 let join_type = JoinType::Full;
2294 let cardinality = (10, 11);
2295 let case_expr = 1;
2296 let session_config = SessionConfig::new().with_repartition_joins(false);
2297 let task_ctx = TaskContext::default().with_session_config(session_config);
2298 let task_ctx = Arc::new(task_ctx);
2299 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2300
2301 let left_schema = &left_partition[0].schema();
2302 let right_schema = &right_partition[0].schema();
2303 let left_sorted = [PhysicalSortExpr {
2304 expr: col("l_desc_null_first", left_schema)?,
2305 options: SortOptions {
2306 descending: true,
2307 nulls_first: true,
2308 },
2309 }]
2310 .into();
2311 let right_sorted = [PhysicalSortExpr {
2312 expr: col("r_desc_null_first", right_schema)?,
2313 options: SortOptions {
2314 descending: true,
2315 nulls_first: true,
2316 },
2317 }]
2318 .into();
2319 let (left, right) = create_memory_table(
2320 left_partition,
2321 right_partition,
2322 vec![left_sorted],
2323 vec![right_sorted],
2324 )?;
2325
2326 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2327
2328 let intermediate_schema = Schema::new(vec![
2329 Field::new("left", DataType::Int32, true),
2330 Field::new("right", DataType::Int32, true),
2331 ]);
2332 let filter_expr = join_expr_tests_fixture_i32(
2333 case_expr,
2334 col("left", &intermediate_schema)?,
2335 col("right", &intermediate_schema)?,
2336 );
2337 let column_indices = vec![
2338 ColumnIndex {
2339 index: 8,
2340 side: JoinSide::Left,
2341 },
2342 ColumnIndex {
2343 index: 8,
2344 side: JoinSide::Right,
2345 },
2346 ];
2347 let filter =
2348 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2349
2350 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2351 Ok(())
2352 }
2353
2354 #[tokio::test(flavor = "multi_thread")]
2355 async fn complex_join_all_one_ascending_numeric_missing_stat() -> Result<()> {
2356 let cardinality = (3, 4);
2357 let join_type = JoinType::Full;
2358
2359 let session_config = SessionConfig::new().with_repartition_joins(false);
2361 let task_ctx = TaskContext::default().with_session_config(session_config);
2362 let task_ctx = Arc::new(task_ctx);
2363 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2364
2365 let left_schema = &left_partition[0].schema();
2366 let right_schema = &right_partition[0].schema();
2367 let left_sorted = [PhysicalSortExpr {
2368 expr: col("la1", left_schema)?,
2369 options: SortOptions::default(),
2370 }]
2371 .into();
2372 let right_sorted = [PhysicalSortExpr {
2373 expr: col("ra1", right_schema)?,
2374 options: SortOptions::default(),
2375 }]
2376 .into();
2377 let (left, right) = create_memory_table(
2378 left_partition,
2379 right_partition,
2380 vec![left_sorted],
2381 vec![right_sorted],
2382 )?;
2383
2384 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2385
2386 let intermediate_schema = Schema::new(vec![
2387 Field::new("0", DataType::Int32, true),
2388 Field::new("1", DataType::Int32, true),
2389 Field::new("2", DataType::Int32, true),
2390 ]);
2391 let filter_expr = complicated_filter(&intermediate_schema)?;
2392 let column_indices = vec![
2393 ColumnIndex {
2394 index: 0,
2395 side: JoinSide::Left,
2396 },
2397 ColumnIndex {
2398 index: 4,
2399 side: JoinSide::Left,
2400 },
2401 ColumnIndex {
2402 index: 0,
2403 side: JoinSide::Right,
2404 },
2405 ];
2406 let filter =
2407 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2408
2409 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2410 Ok(())
2411 }
2412
2413 #[tokio::test(flavor = "multi_thread")]
2414 async fn complex_join_all_one_ascending_equivalence() -> Result<()> {
2415 let cardinality = (3, 4);
2416 let join_type = JoinType::Full;
2417
2418 let config = SessionConfig::new().with_repartition_joins(false);
2420 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
2423 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2424 let left_schema = &left_partition[0].schema();
2425 let right_schema = &right_partition[0].schema();
2426 let left_sorted = vec![
2427 [PhysicalSortExpr {
2428 expr: col("la1", left_schema)?,
2429 options: SortOptions::default(),
2430 }]
2431 .into(),
2432 [PhysicalSortExpr {
2433 expr: col("la2", left_schema)?,
2434 options: SortOptions::default(),
2435 }]
2436 .into(),
2437 ];
2438
2439 let right_sorted = [PhysicalSortExpr {
2440 expr: col("ra1", right_schema)?,
2441 options: SortOptions::default(),
2442 }]
2443 .into();
2444
2445 let (left, right) = create_memory_table(
2446 left_partition,
2447 right_partition,
2448 left_sorted,
2449 vec![right_sorted],
2450 )?;
2451
2452 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2453
2454 let intermediate_schema = Schema::new(vec![
2455 Field::new("0", DataType::Int32, true),
2456 Field::new("1", DataType::Int32, true),
2457 Field::new("2", DataType::Int32, true),
2458 ]);
2459 let filter_expr = complicated_filter(&intermediate_schema)?;
2460 let column_indices = vec![
2461 ColumnIndex {
2462 index: 0,
2463 side: JoinSide::Left,
2464 },
2465 ColumnIndex {
2466 index: 4,
2467 side: JoinSide::Left,
2468 },
2469 ColumnIndex {
2470 index: 0,
2471 side: JoinSide::Right,
2472 },
2473 ];
2474 let filter =
2475 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2476
2477 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2478 Ok(())
2479 }
2480
2481 #[rstest]
2482 #[tokio::test(flavor = "multi_thread")]
2483 async fn testing_with_temporal_columns(
2484 #[values(
2485 JoinType::Inner,
2486 JoinType::Left,
2487 JoinType::Right,
2488 JoinType::RightSemi,
2489 JoinType::LeftSemi,
2490 JoinType::LeftAnti,
2491 JoinType::LeftMark,
2492 JoinType::RightAnti,
2493 JoinType::RightMark,
2494 JoinType::Full
2495 )]
2496 join_type: JoinType,
2497 #[values(
2498 (4, 5),
2499 (12, 17),
2500 )]
2501 cardinality: (i32, i32),
2502 #[values(0, 1, 2)] case_expr: usize,
2503 ) -> Result<()> {
2504 let session_config = SessionConfig::new().with_repartition_joins(false);
2505 let task_ctx = TaskContext::default().with_session_config(session_config);
2506 let task_ctx = Arc::new(task_ctx);
2507 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2508
2509 let left_schema = &left_partition[0].schema();
2510 let right_schema = &right_partition[0].schema();
2511 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2512 let left_sorted = [PhysicalSortExpr {
2513 expr: col("lt1", left_schema)?,
2514 options: SortOptions {
2515 descending: false,
2516 nulls_first: true,
2517 },
2518 }]
2519 .into();
2520 let right_sorted = [PhysicalSortExpr {
2521 expr: col("rt1", right_schema)?,
2522 options: SortOptions {
2523 descending: false,
2524 nulls_first: true,
2525 },
2526 }]
2527 .into();
2528 let (left, right) = create_memory_table(
2529 left_partition,
2530 right_partition,
2531 vec![left_sorted],
2532 vec![right_sorted],
2533 )?;
2534 let intermediate_schema = Schema::new(vec![
2535 Field::new(
2536 "left",
2537 DataType::Timestamp(TimeUnit::Millisecond, None),
2538 false,
2539 ),
2540 Field::new(
2541 "right",
2542 DataType::Timestamp(TimeUnit::Millisecond, None),
2543 false,
2544 ),
2545 ]);
2546 let filter_expr = join_expr_tests_fixture_temporal(
2547 case_expr,
2548 col("left", &intermediate_schema)?,
2549 col("right", &intermediate_schema)?,
2550 &intermediate_schema,
2551 )?;
2552 let column_indices = vec![
2553 ColumnIndex {
2554 index: 3,
2555 side: JoinSide::Left,
2556 },
2557 ColumnIndex {
2558 index: 3,
2559 side: JoinSide::Right,
2560 },
2561 ];
2562 let filter =
2563 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2564 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2565 Ok(())
2566 }
2567
2568 #[rstest]
2569 #[tokio::test(flavor = "multi_thread")]
2570 async fn test_with_interval_columns(
2571 #[values(
2572 JoinType::Inner,
2573 JoinType::Left,
2574 JoinType::Right,
2575 JoinType::RightSemi,
2576 JoinType::LeftSemi,
2577 JoinType::LeftAnti,
2578 JoinType::LeftMark,
2579 JoinType::RightAnti,
2580 JoinType::RightMark,
2581 JoinType::Full
2582 )]
2583 join_type: JoinType,
2584 #[values(
2585 (4, 5),
2586 (12, 17),
2587 )]
2588 cardinality: (i32, i32),
2589 ) -> Result<()> {
2590 let session_config = SessionConfig::new().with_repartition_joins(false);
2591 let task_ctx = TaskContext::default().with_session_config(session_config);
2592 let task_ctx = Arc::new(task_ctx);
2593 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2594
2595 let left_schema = &left_partition[0].schema();
2596 let right_schema = &right_partition[0].schema();
2597 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2598 let left_sorted = [PhysicalSortExpr {
2599 expr: col("li1", left_schema)?,
2600 options: SortOptions {
2601 descending: false,
2602 nulls_first: true,
2603 },
2604 }]
2605 .into();
2606 let right_sorted = [PhysicalSortExpr {
2607 expr: col("ri1", right_schema)?,
2608 options: SortOptions {
2609 descending: false,
2610 nulls_first: true,
2611 },
2612 }]
2613 .into();
2614 let (left, right) = create_memory_table(
2615 left_partition,
2616 right_partition,
2617 vec![left_sorted],
2618 vec![right_sorted],
2619 )?;
2620 let intermediate_schema = Schema::new(vec![
2621 Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
2622 Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
2623 ]);
2624 let filter_expr = join_expr_tests_fixture_temporal(
2625 0,
2626 col("left", &intermediate_schema)?,
2627 col("right", &intermediate_schema)?,
2628 &intermediate_schema,
2629 )?;
2630 let column_indices = vec![
2631 ColumnIndex {
2632 index: 9,
2633 side: JoinSide::Left,
2634 },
2635 ColumnIndex {
2636 index: 9,
2637 side: JoinSide::Right,
2638 },
2639 ];
2640 let filter =
2641 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2642 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2643
2644 Ok(())
2645 }
2646
2647 #[rstest]
2648 #[tokio::test(flavor = "multi_thread")]
2649 async fn testing_ascending_float_pruning(
2650 #[values(
2651 JoinType::Inner,
2652 JoinType::Left,
2653 JoinType::Right,
2654 JoinType::RightSemi,
2655 JoinType::LeftSemi,
2656 JoinType::LeftAnti,
2657 JoinType::LeftMark,
2658 JoinType::RightAnti,
2659 JoinType::RightMark,
2660 JoinType::Full
2661 )]
2662 join_type: JoinType,
2663 #[values(
2664 (4, 5),
2665 (12, 17),
2666 )]
2667 cardinality: (i32, i32),
2668 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2669 ) -> Result<()> {
2670 let session_config = SessionConfig::new().with_repartition_joins(false);
2671 let task_ctx = TaskContext::default().with_session_config(session_config);
2672 let task_ctx = Arc::new(task_ctx);
2673 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2674
2675 let left_schema = &left_partition[0].schema();
2676 let right_schema = &right_partition[0].schema();
2677 let left_sorted = [PhysicalSortExpr {
2678 expr: col("l_float", left_schema)?,
2679 options: SortOptions::default(),
2680 }]
2681 .into();
2682 let right_sorted = [PhysicalSortExpr {
2683 expr: col("r_float", right_schema)?,
2684 options: SortOptions::default(),
2685 }]
2686 .into();
2687 let (left, right) = create_memory_table(
2688 left_partition,
2689 right_partition,
2690 vec![left_sorted],
2691 vec![right_sorted],
2692 )?;
2693
2694 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2695
2696 let intermediate_schema = Schema::new(vec![
2697 Field::new("left", DataType::Float64, true),
2698 Field::new("right", DataType::Float64, true),
2699 ]);
2700 let filter_expr = join_expr_tests_fixture_f64(
2701 case_expr,
2702 col("left", &intermediate_schema)?,
2703 col("right", &intermediate_schema)?,
2704 );
2705 let column_indices = vec![
2706 ColumnIndex {
2707 index: 10, side: JoinSide::Left,
2709 },
2710 ColumnIndex {
2711 index: 10, side: JoinSide::Right,
2713 },
2714 ];
2715 let filter =
2716 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2717
2718 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2719 Ok(())
2720 }
2721}