1use std::any::Any;
29use std::fmt::{self, Debug};
30use std::mem::{size_of, size_of_val};
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use std::vec;
34
35use crate::common::SharedMemoryReservation;
36use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
37use crate::joins::stream_join_utils::{
38 calculate_filter_expr_intervals, combine_two_batches,
39 convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
40 get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
41 PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
42};
43use crate::joins::utils::{
44 apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
45 check_join_is_valid, equal_rows_arr, symmetric_join_output_partitioning, update_hash,
46 BatchSplitter, BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn,
47 JoinOnRef, NoopBatchTransformer, StatefulStreamResult,
48};
49use crate::projection::{
50 join_allows_pushdown, join_table_borders, new_join_children,
51 physical_to_column_exprs, update_join_filter, update_join_on, ProjectionExec,
52};
53use crate::{
54 joins::StreamJoinPartitionMode,
55 metrics::{ExecutionPlanMetricsSet, MetricsSet},
56 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
57 PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
58};
59
60use arrow::array::{
61 ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array,
62 UInt64Array,
63};
64use arrow::compute::concat_batches;
65use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
66use arrow::record_batch::RecordBatch;
67use datafusion_common::hash_utils::create_hashes;
68use datafusion_common::utils::bisect;
69use datafusion_common::{
70 internal_err, plan_err, HashSet, JoinSide, JoinType, NullEquality, Result,
71};
72use datafusion_execution::memory_pool::MemoryConsumer;
73use datafusion_execution::TaskContext;
74use datafusion_expr::interval_arithmetic::Interval;
75use datafusion_physical_expr::equivalence::join_equivalence_properties;
76use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
77use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
78use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
79
80use ahash::RandomState;
81use futures::{ready, Stream, StreamExt};
82use parking_lot::Mutex;
83
84const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
85
86#[derive(Debug, Clone)]
172pub struct SymmetricHashJoinExec {
173 pub(crate) left: Arc<dyn ExecutionPlan>,
175 pub(crate) right: Arc<dyn ExecutionPlan>,
177 pub(crate) on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
179 pub(crate) filter: Option<JoinFilter>,
181 pub(crate) join_type: JoinType,
183 random_state: RandomState,
185 metrics: ExecutionPlanMetricsSet,
187 column_indices: Vec<ColumnIndex>,
189 pub(crate) null_equality: NullEquality,
191 pub(crate) left_sort_exprs: Option<LexOrdering>,
193 pub(crate) right_sort_exprs: Option<LexOrdering>,
195 mode: StreamJoinPartitionMode,
197 cache: PlanProperties,
199}
200
201impl SymmetricHashJoinExec {
202 #[allow(clippy::too_many_arguments)]
209 pub fn try_new(
210 left: Arc<dyn ExecutionPlan>,
211 right: Arc<dyn ExecutionPlan>,
212 on: JoinOn,
213 filter: Option<JoinFilter>,
214 join_type: &JoinType,
215 null_equality: NullEquality,
216 left_sort_exprs: Option<LexOrdering>,
217 right_sort_exprs: Option<LexOrdering>,
218 mode: StreamJoinPartitionMode,
219 ) -> Result<Self> {
220 let left_schema = left.schema();
221 let right_schema = right.schema();
222
223 if on.is_empty() {
225 return plan_err!(
226 "On constraints in SymmetricHashJoinExec should be non-empty"
227 );
228 }
229
230 check_join_is_valid(&left_schema, &right_schema, &on)?;
232
233 let (schema, column_indices) =
235 build_join_schema(&left_schema, &right_schema, join_type);
236
237 let random_state = RandomState::with_seeds(0, 0, 0, 0);
239 let schema = Arc::new(schema);
240 let cache = Self::compute_properties(&left, &right, schema, *join_type, &on)?;
241 Ok(SymmetricHashJoinExec {
242 left,
243 right,
244 on,
245 filter,
246 join_type: *join_type,
247 random_state,
248 metrics: ExecutionPlanMetricsSet::new(),
249 column_indices,
250 null_equality,
251 left_sort_exprs,
252 right_sort_exprs,
253 mode,
254 cache,
255 })
256 }
257
258 fn compute_properties(
260 left: &Arc<dyn ExecutionPlan>,
261 right: &Arc<dyn ExecutionPlan>,
262 schema: SchemaRef,
263 join_type: JoinType,
264 join_on: JoinOnRef,
265 ) -> Result<PlanProperties> {
266 let eq_properties = join_equivalence_properties(
268 left.equivalence_properties().clone(),
269 right.equivalence_properties().clone(),
270 &join_type,
271 schema,
272 &[false, false],
273 None,
275 join_on,
276 )?;
277
278 let output_partitioning =
279 symmetric_join_output_partitioning(left, right, &join_type)?;
280
281 Ok(PlanProperties::new(
282 eq_properties,
283 output_partitioning,
284 emission_type_from_children([left, right]),
285 boundedness_from_children([left, right]),
286 ))
287 }
288
289 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
291 &self.left
292 }
293
294 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
296 &self.right
297 }
298
299 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
301 &self.on
302 }
303
304 pub fn filter(&self) -> Option<&JoinFilter> {
306 self.filter.as_ref()
307 }
308
309 pub fn join_type(&self) -> &JoinType {
311 &self.join_type
312 }
313
314 pub fn null_equality(&self) -> NullEquality {
316 self.null_equality
317 }
318
319 pub fn partition_mode(&self) -> StreamJoinPartitionMode {
321 self.mode
322 }
323
324 pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
326 self.left_sort_exprs.as_ref()
327 }
328
329 pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
331 self.right_sort_exprs.as_ref()
332 }
333
334 pub fn check_if_order_information_available(&self) -> Result<bool> {
336 if let Some(filter) = self.filter() {
337 let left = self.left();
338 if let Some(left_ordering) = left.output_ordering() {
339 let right = self.right();
340 if let Some(right_ordering) = right.output_ordering() {
341 let left_convertible = convert_sort_expr_with_filter_schema(
342 &JoinSide::Left,
343 filter,
344 &left.schema(),
345 &left_ordering[0],
346 )?
347 .is_some();
348 let right_convertible = convert_sort_expr_with_filter_schema(
349 &JoinSide::Right,
350 filter,
351 &right.schema(),
352 &right_ordering[0],
353 )?
354 .is_some();
355 return Ok(left_convertible && right_convertible);
356 }
357 }
358 }
359 Ok(false)
360 }
361}
362
363impl DisplayAs for SymmetricHashJoinExec {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 match t {
366 DisplayFormatType::Default | DisplayFormatType::Verbose => {
367 let display_filter = self.filter.as_ref().map_or_else(
368 || "".to_string(),
369 |f| format!(", filter={}", f.expression()),
370 );
371 let on = self
372 .on
373 .iter()
374 .map(|(c1, c2)| format!("({c1}, {c2})"))
375 .collect::<Vec<String>>()
376 .join(", ");
377 write!(
378 f,
379 "SymmetricHashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
380 self.mode, self.join_type, on, display_filter
381 )
382 }
383 DisplayFormatType::TreeRender => {
384 let on = self
385 .on
386 .iter()
387 .map(|(c1, c2)| {
388 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
389 })
390 .collect::<Vec<String>>()
391 .join(", ");
392
393 writeln!(f, "mode={:?}", self.mode)?;
394 if *self.join_type() != JoinType::Inner {
395 writeln!(f, "join_type={:?}", self.join_type)?;
396 }
397 writeln!(f, "on={on}")
398 }
399 }
400 }
401}
402
403impl ExecutionPlan for SymmetricHashJoinExec {
404 fn name(&self) -> &'static str {
405 "SymmetricHashJoinExec"
406 }
407
408 fn as_any(&self) -> &dyn Any {
409 self
410 }
411
412 fn properties(&self) -> &PlanProperties {
413 &self.cache
414 }
415
416 fn required_input_distribution(&self) -> Vec<Distribution> {
417 match self.mode {
418 StreamJoinPartitionMode::Partitioned => {
419 let (left_expr, right_expr) = self
420 .on
421 .iter()
422 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
423 .unzip();
424 vec![
425 Distribution::HashPartitioned(left_expr),
426 Distribution::HashPartitioned(right_expr),
427 ]
428 }
429 StreamJoinPartitionMode::SinglePartition => {
430 vec![Distribution::SinglePartition, Distribution::SinglePartition]
431 }
432 }
433 }
434
435 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
436 vec![
437 self.left_sort_exprs
438 .as_ref()
439 .map(|e| OrderingRequirements::from(e.clone())),
440 self.right_sort_exprs
441 .as_ref()
442 .map(|e| OrderingRequirements::from(e.clone())),
443 ]
444 }
445
446 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
447 vec![&self.left, &self.right]
448 }
449
450 fn with_new_children(
451 self: Arc<Self>,
452 children: Vec<Arc<dyn ExecutionPlan>>,
453 ) -> Result<Arc<dyn ExecutionPlan>> {
454 Ok(Arc::new(SymmetricHashJoinExec::try_new(
455 Arc::clone(&children[0]),
456 Arc::clone(&children[1]),
457 self.on.clone(),
458 self.filter.clone(),
459 &self.join_type,
460 self.null_equality,
461 self.left_sort_exprs.clone(),
462 self.right_sort_exprs.clone(),
463 self.mode,
464 )?))
465 }
466
467 fn metrics(&self) -> Option<MetricsSet> {
468 Some(self.metrics.clone_inner())
469 }
470
471 fn statistics(&self) -> Result<Statistics> {
472 Ok(Statistics::new_unknown(&self.schema()))
474 }
475
476 fn execute(
477 &self,
478 partition: usize,
479 context: Arc<TaskContext>,
480 ) -> Result<SendableRecordBatchStream> {
481 let left_partitions = self.left.output_partitioning().partition_count();
482 let right_partitions = self.right.output_partitioning().partition_count();
483 if left_partitions != right_partitions {
484 return internal_err!(
485 "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
486 consider using RepartitionExec"
487 );
488 }
489 let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
492 self.left_sort_exprs(),
493 self.right_sort_exprs(),
494 &self.filter,
495 ) {
496 (Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
497 let (left, right, graph) = prepare_sorted_exprs(
498 filter,
499 &self.left,
500 &self.right,
501 left_sort_exprs,
502 right_sort_exprs,
503 )?;
504 (Some(left), Some(right), Some(graph))
505 }
506 _ => (None, None, None),
509 };
510
511 let (on_left, on_right) = self.on.iter().cloned().unzip();
512
513 let left_side_joiner =
514 OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
515 let right_side_joiner =
516 OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
517
518 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
519
520 let right_stream = self.right.execute(partition, Arc::clone(&context))?;
521
522 let batch_size = context.session_config().batch_size();
523 let enforce_batch_size_in_joins =
524 context.session_config().enforce_batch_size_in_joins();
525
526 let reservation = Arc::new(Mutex::new(
527 MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
528 .register(context.memory_pool()),
529 ));
530 if let Some(g) = graph.as_ref() {
531 reservation.lock().try_grow(g.size())?;
532 }
533
534 if enforce_batch_size_in_joins {
535 Ok(Box::pin(SymmetricHashJoinStream {
536 left_stream,
537 right_stream,
538 schema: self.schema(),
539 filter: self.filter.clone(),
540 join_type: self.join_type,
541 random_state: self.random_state.clone(),
542 left: left_side_joiner,
543 right: right_side_joiner,
544 column_indices: self.column_indices.clone(),
545 metrics: StreamJoinMetrics::new(partition, &self.metrics),
546 graph,
547 left_sorted_filter_expr,
548 right_sorted_filter_expr,
549 null_equality: self.null_equality,
550 state: SHJStreamState::PullRight,
551 reservation,
552 batch_transformer: BatchSplitter::new(batch_size),
553 }))
554 } else {
555 Ok(Box::pin(SymmetricHashJoinStream {
556 left_stream,
557 right_stream,
558 schema: self.schema(),
559 filter: self.filter.clone(),
560 join_type: self.join_type,
561 random_state: self.random_state.clone(),
562 left: left_side_joiner,
563 right: right_side_joiner,
564 column_indices: self.column_indices.clone(),
565 metrics: StreamJoinMetrics::new(partition, &self.metrics),
566 graph,
567 left_sorted_filter_expr,
568 right_sorted_filter_expr,
569 null_equality: self.null_equality,
570 state: SHJStreamState::PullRight,
571 reservation,
572 batch_transformer: NoopBatchTransformer::new(),
573 }))
574 }
575 }
576
577 fn try_swapping_with_projection(
581 &self,
582 projection: &ProjectionExec,
583 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
584 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr())
586 else {
587 return Ok(None);
588 };
589
590 let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
591 self.left().schema().fields().len(),
592 &projection_as_columns,
593 );
594
595 if !join_allows_pushdown(
596 &projection_as_columns,
597 &self.schema(),
598 far_right_left_col_ind,
599 far_left_right_col_ind,
600 ) {
601 return Ok(None);
602 }
603
604 let Some(new_on) = update_join_on(
605 &projection_as_columns[0..=far_right_left_col_ind as _],
606 &projection_as_columns[far_left_right_col_ind as _..],
607 self.on(),
608 self.left().schema().fields().len(),
609 ) else {
610 return Ok(None);
611 };
612
613 let new_filter = if let Some(filter) = self.filter() {
614 match update_join_filter(
615 &projection_as_columns[0..=far_right_left_col_ind as _],
616 &projection_as_columns[far_left_right_col_ind as _..],
617 filter,
618 self.left().schema().fields().len(),
619 ) {
620 Some(updated_filter) => Some(updated_filter),
621 None => return Ok(None),
622 }
623 } else {
624 None
625 };
626
627 let (new_left, new_right) = new_join_children(
628 &projection_as_columns,
629 far_right_left_col_ind,
630 far_left_right_col_ind,
631 self.left(),
632 self.right(),
633 )?;
634
635 SymmetricHashJoinExec::try_new(
636 Arc::new(new_left),
637 Arc::new(new_right),
638 new_on,
639 new_filter,
640 self.join_type(),
641 self.null_equality(),
642 self.right().output_ordering().cloned(),
643 self.left().output_ordering().cloned(),
644 self.partition_mode(),
645 )
646 .map(|e| Some(Arc::new(e) as _))
647 }
648}
649
650struct SymmetricHashJoinStream<T> {
652 left_stream: SendableRecordBatchStream,
654 right_stream: SendableRecordBatchStream,
655 schema: Arc<Schema>,
657 filter: Option<JoinFilter>,
659 join_type: JoinType,
661 left: OneSideHashJoiner,
663 right: OneSideHashJoiner,
665 column_indices: Vec<ColumnIndex>,
667 graph: Option<ExprIntervalGraph>,
669 left_sorted_filter_expr: Option<SortedFilterExpr>,
671 right_sorted_filter_expr: Option<SortedFilterExpr>,
673 random_state: RandomState,
675 null_equality: NullEquality,
677 metrics: StreamJoinMetrics,
679 reservation: SharedMemoryReservation,
681 state: SHJStreamState,
683 batch_transformer: T,
685}
686
687impl<T: BatchTransformer + Unpin + Send> RecordBatchStream
688 for SymmetricHashJoinStream<T>
689{
690 fn schema(&self) -> SchemaRef {
691 Arc::clone(&self.schema)
692 }
693}
694
695impl<T: BatchTransformer + Unpin + Send> Stream for SymmetricHashJoinStream<T> {
696 type Item = Result<RecordBatch>;
697
698 fn poll_next(
699 mut self: std::pin::Pin<&mut Self>,
700 cx: &mut Context<'_>,
701 ) -> Poll<Option<Self::Item>> {
702 self.poll_next_impl(cx)
703 }
704}
705
706fn determine_prune_length(
725 buffer: &RecordBatch,
726 build_side_filter_expr: &SortedFilterExpr,
727) -> Result<usize> {
728 let origin_sorted_expr = build_side_filter_expr.origin_sorted_expr();
729 let interval = build_side_filter_expr.interval();
730 let batch_arr = origin_sorted_expr
732 .expr
733 .evaluate(buffer)?
734 .into_array(buffer.num_rows())?;
735
736 let target = if origin_sorted_expr.options.descending {
738 interval.upper().clone()
739 } else {
740 interval.lower().clone()
741 };
742
743 bisect::<true>(&[batch_arr], &[target], &[origin_sorted_expr.options])
745}
746
747fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> bool {
762 if build_side == JoinSide::Left {
763 matches!(
764 join_type,
765 JoinType::Left
766 | JoinType::LeftAnti
767 | JoinType::Full
768 | JoinType::LeftSemi
769 | JoinType::LeftMark
770 )
771 } else {
772 matches!(
773 join_type,
774 JoinType::Right
775 | JoinType::RightAnti
776 | JoinType::Full
777 | JoinType::RightSemi
778 | JoinType::RightMark
779 )
780 }
781}
782
783fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: ArrowPrimitiveType>(
801 build_side: JoinSide,
802 prune_length: usize,
803 visited_rows: &HashSet<usize>,
804 deleted_offset: usize,
805 join_type: JoinType,
806) -> Result<(PrimitiveArray<L>, PrimitiveArray<R>)>
807where
808 NativeAdapter<L>: From<<L as ArrowPrimitiveType>::Native>,
809{
810 let result = match (build_side, join_type) {
812 (JoinSide::Left, JoinType::LeftMark) => {
828 let build_indices = (0..prune_length)
829 .map(L::Native::from_usize)
830 .collect::<PrimitiveArray<L>>();
831 let probe_indices = (0..prune_length)
832 .map(|idx| {
833 visited_rows
835 .contains(&(idx + deleted_offset))
836 .then_some(R::Native::from_usize(0).unwrap())
837 })
838 .collect();
839 (build_indices, probe_indices)
840 }
841 (JoinSide::Right, JoinType::RightMark) => {
842 let build_indices = (0..prune_length)
843 .map(L::Native::from_usize)
844 .collect::<PrimitiveArray<L>>();
845 let probe_indices = (0..prune_length)
846 .map(|idx| {
847 visited_rows
849 .contains(&(idx + deleted_offset))
850 .then_some(R::Native::from_usize(0).unwrap())
851 })
852 .collect();
853 (build_indices, probe_indices)
854 }
855 (JoinSide::Left, JoinType::Left | JoinType::LeftAnti)
857 | (JoinSide::Right, JoinType::Right | JoinType::RightAnti)
858 | (_, JoinType::Full) => {
859 let build_unmatched_indices =
860 get_pruning_anti_indices(prune_length, deleted_offset, visited_rows);
861 let mut builder =
862 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
863 builder.append_nulls(build_unmatched_indices.len());
864 let probe_indices = builder.finish();
865 (build_unmatched_indices, probe_indices)
866 }
867 (JoinSide::Left, JoinType::LeftSemi) | (JoinSide::Right, JoinType::RightSemi) => {
869 let build_unmatched_indices =
870 get_pruning_semi_indices(prune_length, deleted_offset, visited_rows);
871 let mut builder =
872 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
873 builder.append_nulls(build_unmatched_indices.len());
874 let probe_indices = builder.finish();
875 (build_unmatched_indices, probe_indices)
876 }
877 _ => unreachable!(),
879 };
880 Ok(result)
881}
882
883pub(crate) fn build_side_determined_results(
901 build_hash_joiner: &OneSideHashJoiner,
902 output_schema: &SchemaRef,
903 prune_length: usize,
904 probe_schema: SchemaRef,
905 join_type: JoinType,
906 column_indices: &[ColumnIndex],
907) -> Result<Option<RecordBatch>> {
908 if prune_length > 0
910 && need_to_produce_result_in_final(build_hash_joiner.build_side, join_type)
911 {
912 let (build_indices, probe_indices) = calculate_indices_by_join_type(
914 build_hash_joiner.build_side,
915 prune_length,
916 &build_hash_joiner.visited_rows,
917 build_hash_joiner.deleted_offset,
918 join_type,
919 )?;
920
921 let empty_probe_batch = RecordBatch::new_empty(probe_schema);
923 build_batch_from_indices(
925 output_schema.as_ref(),
926 &build_hash_joiner.input_buffer,
927 &empty_probe_batch,
928 &build_indices,
929 &probe_indices,
930 column_indices,
931 build_hash_joiner.build_side,
932 )
933 .map(|batch| (batch.num_rows() > 0).then_some(batch))
934 } else {
935 Ok(None)
937 }
938}
939
940#[allow(clippy::too_many_arguments)]
960pub(crate) fn join_with_probe_batch(
961 build_hash_joiner: &mut OneSideHashJoiner,
962 probe_hash_joiner: &mut OneSideHashJoiner,
963 schema: &SchemaRef,
964 join_type: JoinType,
965 filter: Option<&JoinFilter>,
966 probe_batch: &RecordBatch,
967 column_indices: &[ColumnIndex],
968 random_state: &RandomState,
969 null_equality: NullEquality,
970) -> Result<Option<RecordBatch>> {
971 if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
972 return Ok(None);
973 }
974 let (build_indices, probe_indices) = lookup_join_hashmap(
975 &build_hash_joiner.hashmap,
976 &build_hash_joiner.input_buffer,
977 probe_batch,
978 &build_hash_joiner.on,
979 &probe_hash_joiner.on,
980 random_state,
981 null_equality,
982 &mut build_hash_joiner.hashes_buffer,
983 Some(build_hash_joiner.deleted_offset),
984 )?;
985
986 let (build_indices, probe_indices) = if let Some(filter) = filter {
987 apply_join_filter_to_indices(
988 &build_hash_joiner.input_buffer,
989 probe_batch,
990 build_indices,
991 probe_indices,
992 filter,
993 build_hash_joiner.build_side,
994 None,
995 )?
996 } else {
997 (build_indices, probe_indices)
998 };
999
1000 if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
1001 record_visited_indices(
1002 &mut build_hash_joiner.visited_rows,
1003 build_hash_joiner.deleted_offset,
1004 &build_indices,
1005 );
1006 }
1007 if need_to_produce_result_in_final(build_hash_joiner.build_side.negate(), join_type) {
1008 record_visited_indices(
1009 &mut probe_hash_joiner.visited_rows,
1010 probe_hash_joiner.offset,
1011 &probe_indices,
1012 );
1013 }
1014 if matches!(
1015 join_type,
1016 JoinType::LeftAnti
1017 | JoinType::RightAnti
1018 | JoinType::LeftSemi
1019 | JoinType::LeftMark
1020 | JoinType::RightSemi
1021 ) {
1022 Ok(None)
1023 } else {
1024 build_batch_from_indices(
1025 schema,
1026 &build_hash_joiner.input_buffer,
1027 probe_batch,
1028 &build_indices,
1029 &probe_indices,
1030 column_indices,
1031 build_hash_joiner.build_side,
1032 )
1033 .map(|batch| (batch.num_rows() > 0).then_some(batch))
1034 }
1035}
1036
1037#[allow(clippy::too_many_arguments)]
1057fn lookup_join_hashmap(
1058 build_hashmap: &PruningJoinHashMap,
1059 build_batch: &RecordBatch,
1060 probe_batch: &RecordBatch,
1061 build_on: &[PhysicalExprRef],
1062 probe_on: &[PhysicalExprRef],
1063 random_state: &RandomState,
1064 null_equality: NullEquality,
1065 hashes_buffer: &mut Vec<u64>,
1066 deleted_offset: Option<usize>,
1067) -> Result<(UInt64Array, UInt32Array)> {
1068 let keys_values = probe_on
1069 .iter()
1070 .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1071 .collect::<Result<Vec<_>>>()?;
1072 let build_join_values = build_on
1073 .iter()
1074 .map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1075 .collect::<Result<Vec<_>>>()?;
1076
1077 hashes_buffer.clear();
1078 hashes_buffer.resize(probe_batch.num_rows(), 0);
1079 let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
1080
1081 let (mut matched_probe, mut matched_build) = build_hashmap.get_matched_indices(
1112 Box::new(hash_values.iter().enumerate().rev()),
1113 deleted_offset,
1114 );
1115
1116 matched_probe.reverse();
1117 matched_build.reverse();
1118
1119 let build_indices: UInt64Array = matched_build.into();
1120 let probe_indices: UInt32Array = matched_probe.into();
1121
1122 let (build_indices, probe_indices) = equal_rows_arr(
1123 &build_indices,
1124 &probe_indices,
1125 &build_join_values,
1126 &keys_values,
1127 null_equality,
1128 )?;
1129
1130 Ok((build_indices, probe_indices))
1131}
1132
1133pub struct OneSideHashJoiner {
1134 build_side: JoinSide,
1136 pub input_buffer: RecordBatch,
1138 pub(crate) on: Vec<PhysicalExprRef>,
1140 pub(crate) hashmap: PruningJoinHashMap,
1142 pub(crate) hashes_buffer: Vec<u64>,
1144 pub(crate) visited_rows: HashSet<usize>,
1146 pub(crate) offset: usize,
1148 pub(crate) deleted_offset: usize,
1150}
1151
1152impl OneSideHashJoiner {
1153 pub fn size(&self) -> usize {
1154 let mut size = 0;
1155 size += size_of_val(self);
1156 size += size_of_val(&self.build_side);
1157 size += self.input_buffer.get_array_memory_size();
1158 size += size_of_val(&self.on);
1159 size += self.hashmap.size();
1160 size += self.hashes_buffer.capacity() * size_of::<u64>();
1161 size += self.visited_rows.capacity() * size_of::<usize>();
1162 size += size_of_val(&self.offset);
1163 size += size_of_val(&self.deleted_offset);
1164 size
1165 }
1166 pub fn new(
1167 build_side: JoinSide,
1168 on: Vec<PhysicalExprRef>,
1169 schema: SchemaRef,
1170 ) -> Self {
1171 Self {
1172 build_side,
1173 input_buffer: RecordBatch::new_empty(schema),
1174 on,
1175 hashmap: PruningJoinHashMap::with_capacity(0),
1176 hashes_buffer: vec![],
1177 visited_rows: HashSet::new(),
1178 offset: 0,
1179 deleted_offset: 0,
1180 }
1181 }
1182
1183 pub(crate) fn update_internal_state(
1194 &mut self,
1195 batch: &RecordBatch,
1196 random_state: &RandomState,
1197 ) -> Result<()> {
1198 self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
1200 self.hashes_buffer.resize(batch.num_rows(), 0);
1202 update_hash(
1205 &self.on,
1206 batch,
1207 &mut self.hashmap,
1208 self.offset,
1209 random_state,
1210 &mut self.hashes_buffer,
1211 self.deleted_offset,
1212 false,
1213 )?;
1214 Ok(())
1215 }
1216
1217 pub(crate) fn calculate_prune_length_with_probe_batch(
1229 &mut self,
1230 build_side_sorted_filter_expr: &mut SortedFilterExpr,
1231 probe_side_sorted_filter_expr: &mut SortedFilterExpr,
1232 graph: &mut ExprIntervalGraph,
1233 ) -> Result<usize> {
1234 if self.input_buffer.num_rows() == 0 {
1236 return Ok(0);
1237 }
1238 let mut filter_intervals = vec![];
1241 for expr in [
1242 &build_side_sorted_filter_expr,
1243 &probe_side_sorted_filter_expr,
1244 ] {
1245 filter_intervals.push((expr.node_index(), expr.interval().clone()))
1246 }
1247 graph.update_ranges(&mut filter_intervals, Interval::CERTAINLY_TRUE)?;
1249 let calculated_build_side_interval = filter_intervals.remove(0).1;
1251 if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
1253 return Ok(0);
1254 }
1255 build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
1257
1258 determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
1259 }
1260
1261 pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> {
1262 self.hashmap.prune_hash_values(
1264 prune_length,
1265 self.deleted_offset as u64,
1266 HASHMAP_SHRINK_SCALE_FACTOR,
1267 );
1268 for row in self.deleted_offset..(self.deleted_offset + prune_length) {
1270 self.visited_rows.remove(&row);
1271 }
1272 self.input_buffer = self
1274 .input_buffer
1275 .slice(prune_length, self.input_buffer.num_rows() - prune_length);
1276 self.deleted_offset += prune_length;
1278 Ok(())
1279 }
1280}
1281
1282impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
1327 fn poll_next_impl(
1341 &mut self,
1342 cx: &mut Context<'_>,
1343 ) -> Poll<Option<Result<RecordBatch>>> {
1344 loop {
1345 match self.batch_transformer.next() {
1346 None => {
1347 let result = match self.state() {
1348 SHJStreamState::PullRight => {
1349 ready!(self.fetch_next_from_right_stream(cx))
1350 }
1351 SHJStreamState::PullLeft => {
1352 ready!(self.fetch_next_from_left_stream(cx))
1353 }
1354 SHJStreamState::RightExhausted => {
1355 ready!(self.handle_right_stream_end(cx))
1356 }
1357 SHJStreamState::LeftExhausted => {
1358 ready!(self.handle_left_stream_end(cx))
1359 }
1360 SHJStreamState::BothExhausted {
1361 final_result: false,
1362 } => self.prepare_for_final_results_after_exhaustion(),
1363 SHJStreamState::BothExhausted { final_result: true } => {
1364 return Poll::Ready(None);
1365 }
1366 };
1367
1368 match result? {
1369 StatefulStreamResult::Ready(None) => {
1370 return Poll::Ready(None);
1371 }
1372 StatefulStreamResult::Ready(Some(batch)) => {
1373 self.batch_transformer.set_batch(batch);
1374 }
1375 _ => {}
1376 }
1377 }
1378 Some((batch, _)) => {
1379 self.metrics.output_batches.add(1);
1380 return self
1381 .metrics
1382 .baseline_metrics
1383 .record_poll(Poll::Ready(Some(Ok(batch))));
1384 }
1385 }
1386 }
1387 }
1388 fn fetch_next_from_right_stream(
1398 &mut self,
1399 cx: &mut Context<'_>,
1400 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1401 match ready!(self.right_stream().poll_next_unpin(cx)) {
1402 Some(Ok(batch)) => {
1403 if batch.num_rows() == 0 {
1404 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1405 }
1406 self.set_state(SHJStreamState::PullLeft);
1407 Poll::Ready(self.process_batch_from_right(batch))
1408 }
1409 Some(Err(e)) => Poll::Ready(Err(e)),
1410 None => {
1411 self.set_state(SHJStreamState::RightExhausted);
1412 Poll::Ready(Ok(StatefulStreamResult::Continue))
1413 }
1414 }
1415 }
1416
1417 fn fetch_next_from_left_stream(
1427 &mut self,
1428 cx: &mut Context<'_>,
1429 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1430 match ready!(self.left_stream().poll_next_unpin(cx)) {
1431 Some(Ok(batch)) => {
1432 if batch.num_rows() == 0 {
1433 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1434 }
1435 self.set_state(SHJStreamState::PullRight);
1436 Poll::Ready(self.process_batch_from_left(batch))
1437 }
1438 Some(Err(e)) => Poll::Ready(Err(e)),
1439 None => {
1440 self.set_state(SHJStreamState::LeftExhausted);
1441 Poll::Ready(Ok(StatefulStreamResult::Continue))
1442 }
1443 }
1444 }
1445
1446 fn handle_right_stream_end(
1457 &mut self,
1458 cx: &mut Context<'_>,
1459 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1460 match ready!(self.left_stream().poll_next_unpin(cx)) {
1461 Some(Ok(batch)) => {
1462 if batch.num_rows() == 0 {
1463 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1464 }
1465 Poll::Ready(self.process_batch_after_right_end(batch))
1466 }
1467 Some(Err(e)) => Poll::Ready(Err(e)),
1468 None => {
1469 self.set_state(SHJStreamState::BothExhausted {
1470 final_result: false,
1471 });
1472 Poll::Ready(Ok(StatefulStreamResult::Continue))
1473 }
1474 }
1475 }
1476
1477 fn handle_left_stream_end(
1488 &mut self,
1489 cx: &mut Context<'_>,
1490 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1491 match ready!(self.right_stream().poll_next_unpin(cx)) {
1492 Some(Ok(batch)) => {
1493 if batch.num_rows() == 0 {
1494 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1495 }
1496 Poll::Ready(self.process_batch_after_left_end(batch))
1497 }
1498 Some(Err(e)) => Poll::Ready(Err(e)),
1499 None => {
1500 self.set_state(SHJStreamState::BothExhausted {
1501 final_result: false,
1502 });
1503 Poll::Ready(Ok(StatefulStreamResult::Continue))
1504 }
1505 }
1506 }
1507
1508 fn prepare_for_final_results_after_exhaustion(
1518 &mut self,
1519 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1520 self.set_state(SHJStreamState::BothExhausted { final_result: true });
1521 self.process_batches_before_finalization()
1522 }
1523
1524 fn process_batch_from_right(
1525 &mut self,
1526 batch: RecordBatch,
1527 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1528 self.perform_join_for_given_side(batch, JoinSide::Right)
1529 .map(|maybe_batch| {
1530 if maybe_batch.is_some() {
1531 StatefulStreamResult::Ready(maybe_batch)
1532 } else {
1533 StatefulStreamResult::Continue
1534 }
1535 })
1536 }
1537
1538 fn process_batch_from_left(
1539 &mut self,
1540 batch: RecordBatch,
1541 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1542 self.perform_join_for_given_side(batch, JoinSide::Left)
1543 .map(|maybe_batch| {
1544 if maybe_batch.is_some() {
1545 StatefulStreamResult::Ready(maybe_batch)
1546 } else {
1547 StatefulStreamResult::Continue
1548 }
1549 })
1550 }
1551
1552 fn process_batch_after_left_end(
1553 &mut self,
1554 right_batch: RecordBatch,
1555 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1556 self.process_batch_from_right(right_batch)
1557 }
1558
1559 fn process_batch_after_right_end(
1560 &mut self,
1561 left_batch: RecordBatch,
1562 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1563 self.process_batch_from_left(left_batch)
1564 }
1565
1566 fn process_batches_before_finalization(
1567 &mut self,
1568 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1569 let left_result = build_side_determined_results(
1571 &self.left,
1572 &self.schema,
1573 self.left.input_buffer.num_rows(),
1574 self.right.input_buffer.schema(),
1575 self.join_type,
1576 &self.column_indices,
1577 )?;
1578 let right_result = build_side_determined_results(
1580 &self.right,
1581 &self.schema,
1582 self.right.input_buffer.num_rows(),
1583 self.left.input_buffer.schema(),
1584 self.join_type,
1585 &self.column_indices,
1586 )?;
1587
1588 let result = combine_two_batches(&self.schema, left_result, right_result)?;
1590
1591 if result.is_some() {
1593 return Ok(StatefulStreamResult::Ready(result));
1594 }
1595 Ok(StatefulStreamResult::Continue)
1596 }
1597
1598 fn right_stream(&mut self) -> &mut SendableRecordBatchStream {
1599 &mut self.right_stream
1600 }
1601
1602 fn left_stream(&mut self) -> &mut SendableRecordBatchStream {
1603 &mut self.left_stream
1604 }
1605
1606 fn set_state(&mut self, state: SHJStreamState) {
1607 self.state = state;
1608 }
1609
1610 fn state(&mut self) -> SHJStreamState {
1611 self.state.clone()
1612 }
1613
1614 fn size(&self) -> usize {
1615 let mut size = 0;
1616 size += size_of_val(&self.schema);
1617 size += size_of_val(&self.filter);
1618 size += size_of_val(&self.join_type);
1619 size += self.left.size();
1620 size += self.right.size();
1621 size += size_of_val(&self.column_indices);
1622 size += self.graph.as_ref().map(|g| g.size()).unwrap_or(0);
1623 size += size_of_val(&self.left_sorted_filter_expr);
1624 size += size_of_val(&self.right_sorted_filter_expr);
1625 size += size_of_val(&self.random_state);
1626 size += size_of_val(&self.null_equality);
1627 size += size_of_val(&self.metrics);
1628 size
1629 }
1630
1631 fn perform_join_for_given_side(
1639 &mut self,
1640 probe_batch: RecordBatch,
1641 probe_side: JoinSide,
1642 ) -> Result<Option<RecordBatch>> {
1643 let (
1644 probe_hash_joiner,
1645 build_hash_joiner,
1646 probe_side_sorted_filter_expr,
1647 build_side_sorted_filter_expr,
1648 probe_side_metrics,
1649 ) = if probe_side.eq(&JoinSide::Left) {
1650 (
1651 &mut self.left,
1652 &mut self.right,
1653 &mut self.left_sorted_filter_expr,
1654 &mut self.right_sorted_filter_expr,
1655 &mut self.metrics.left,
1656 )
1657 } else {
1658 (
1659 &mut self.right,
1660 &mut self.left,
1661 &mut self.right_sorted_filter_expr,
1662 &mut self.left_sorted_filter_expr,
1663 &mut self.metrics.right,
1664 )
1665 };
1666 probe_side_metrics.input_batches.add(1);
1668 probe_side_metrics.input_rows.add(probe_batch.num_rows());
1669 probe_hash_joiner.update_internal_state(&probe_batch, &self.random_state)?;
1671 let equal_result = join_with_probe_batch(
1673 build_hash_joiner,
1674 probe_hash_joiner,
1675 &self.schema,
1676 self.join_type,
1677 self.filter.as_ref(),
1678 &probe_batch,
1679 &self.column_indices,
1680 &self.random_state,
1681 self.null_equality,
1682 )?;
1683 probe_hash_joiner.offset += probe_batch.num_rows();
1685
1686 let anti_result = if let (
1687 Some(build_side_sorted_filter_expr),
1688 Some(probe_side_sorted_filter_expr),
1689 Some(graph),
1690 ) = (
1691 build_side_sorted_filter_expr.as_mut(),
1692 probe_side_sorted_filter_expr.as_mut(),
1693 self.graph.as_mut(),
1694 ) {
1695 calculate_filter_expr_intervals(
1697 &build_hash_joiner.input_buffer,
1698 build_side_sorted_filter_expr,
1699 &probe_batch,
1700 probe_side_sorted_filter_expr,
1701 )?;
1702 let prune_length = build_hash_joiner
1703 .calculate_prune_length_with_probe_batch(
1704 build_side_sorted_filter_expr,
1705 probe_side_sorted_filter_expr,
1706 graph,
1707 )?;
1708 let result = build_side_determined_results(
1709 build_hash_joiner,
1710 &self.schema,
1711 prune_length,
1712 probe_batch.schema(),
1713 self.join_type,
1714 &self.column_indices,
1715 )?;
1716 build_hash_joiner.prune_internal_state(prune_length)?;
1717 result
1718 } else {
1719 None
1720 };
1721
1722 let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
1724 let capacity = self.size();
1725 self.metrics.stream_memory_usage.set(capacity);
1726 self.reservation.lock().try_resize(capacity)?;
1727 Ok(result)
1728 }
1729}
1730
1731#[derive(Clone, Debug)]
1739pub enum SHJStreamState {
1740 PullRight,
1742
1743 PullLeft,
1745
1746 RightExhausted,
1748
1749 LeftExhausted,
1751
1752 BothExhausted { final_result: bool },
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761 use std::collections::HashMap;
1762 use std::sync::{LazyLock, Mutex};
1763
1764 use super::*;
1765 use crate::joins::test_utils::{
1766 build_sides_record_batches, compare_batches, complicated_filter,
1767 create_memory_table, join_expr_tests_fixture_f64, join_expr_tests_fixture_i32,
1768 join_expr_tests_fixture_temporal, partitioned_hash_join_with_filter,
1769 partitioned_sym_join_with_filter, split_record_batches,
1770 };
1771
1772 use arrow::compute::SortOptions;
1773 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
1774 use datafusion_common::ScalarValue;
1775 use datafusion_execution::config::SessionConfig;
1776 use datafusion_expr::Operator;
1777 use datafusion_physical_expr::expressions::{binary, col, lit, Column};
1778 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1779
1780 use rstest::*;
1781
1782 const TABLE_SIZE: i32 = 30;
1783
1784 type TableKey = (i32, i32, usize); type TableValue = (Vec<RecordBatch>, Vec<RecordBatch>); static TABLE_CACHE: LazyLock<Mutex<HashMap<TableKey, TableValue>>> =
1789 LazyLock::new(|| Mutex::new(HashMap::new()));
1790
1791 fn get_or_create_table(
1792 cardinality: (i32, i32),
1793 batch_size: usize,
1794 ) -> Result<TableValue> {
1795 {
1796 let cache = TABLE_CACHE.lock().unwrap();
1797 if let Some(table) = cache.get(&(cardinality.0, cardinality.1, batch_size)) {
1798 return Ok(table.clone());
1799 }
1800 }
1801
1802 let (left_batch, right_batch) =
1804 build_sides_record_batches(TABLE_SIZE, cardinality)?;
1805
1806 let (left_partition, right_partition) = (
1807 split_record_batches(&left_batch, batch_size)?,
1808 split_record_batches(&right_batch, batch_size)?,
1809 );
1810
1811 let mut cache = TABLE_CACHE.lock().unwrap();
1813
1814 cache.insert(
1816 (cardinality.0, cardinality.1, batch_size),
1817 (left_partition.clone(), right_partition.clone()),
1818 );
1819
1820 Ok((left_partition, right_partition))
1821 }
1822
1823 pub async fn experiment(
1824 left: Arc<dyn ExecutionPlan>,
1825 right: Arc<dyn ExecutionPlan>,
1826 filter: Option<JoinFilter>,
1827 join_type: JoinType,
1828 on: JoinOn,
1829 task_ctx: Arc<TaskContext>,
1830 ) -> Result<()> {
1831 let first_batches = partitioned_sym_join_with_filter(
1832 Arc::clone(&left),
1833 Arc::clone(&right),
1834 on.clone(),
1835 filter.clone(),
1836 &join_type,
1837 NullEquality::NullEqualsNothing,
1838 Arc::clone(&task_ctx),
1839 )
1840 .await?;
1841 let second_batches = partitioned_hash_join_with_filter(
1842 left,
1843 right,
1844 on,
1845 filter,
1846 &join_type,
1847 NullEquality::NullEqualsNothing,
1848 task_ctx,
1849 )
1850 .await?;
1851 compare_batches(&first_batches, &second_batches);
1852 Ok(())
1853 }
1854
1855 #[rstest]
1856 #[tokio::test(flavor = "multi_thread")]
1857 async fn complex_join_all_one_ascending_numeric(
1858 #[values(
1859 JoinType::Inner,
1860 JoinType::Left,
1861 JoinType::Right,
1862 JoinType::RightSemi,
1863 JoinType::LeftSemi,
1864 JoinType::LeftAnti,
1865 JoinType::LeftMark,
1866 JoinType::RightAnti,
1867 JoinType::Full
1868 )]
1869 join_type: JoinType,
1870 #[values(
1871 (4, 5),
1872 (12, 17),
1873 )]
1874 cardinality: (i32, i32),
1875 ) -> Result<()> {
1876 let task_ctx = Arc::new(TaskContext::default());
1878
1879 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
1880
1881 let left_schema = &left_partition[0].schema();
1882 let right_schema = &right_partition[0].schema();
1883
1884 let left_sorted = [PhysicalSortExpr {
1885 expr: binary(
1886 col("la1", left_schema)?,
1887 Operator::Plus,
1888 col("la2", left_schema)?,
1889 left_schema,
1890 )?,
1891 options: SortOptions::default(),
1892 }]
1893 .into();
1894 let right_sorted = [PhysicalSortExpr {
1895 expr: col("ra1", right_schema)?,
1896 options: SortOptions::default(),
1897 }]
1898 .into();
1899 let (left, right) = create_memory_table(
1900 left_partition,
1901 right_partition,
1902 vec![left_sorted],
1903 vec![right_sorted],
1904 )?;
1905
1906 let on = vec![(
1907 binary(
1908 col("lc1", left_schema)?,
1909 Operator::Plus,
1910 lit(ScalarValue::Int32(Some(1))),
1911 left_schema,
1912 )?,
1913 Arc::new(Column::new_with_schema("rc1", right_schema)?) as _,
1914 )];
1915
1916 let intermediate_schema = Schema::new(vec![
1917 Field::new("0", DataType::Int32, true),
1918 Field::new("1", DataType::Int32, true),
1919 Field::new("2", DataType::Int32, true),
1920 ]);
1921 let filter_expr = complicated_filter(&intermediate_schema)?;
1922 let column_indices = vec![
1923 ColumnIndex {
1924 index: left_schema.index_of("la1")?,
1925 side: JoinSide::Left,
1926 },
1927 ColumnIndex {
1928 index: left_schema.index_of("la2")?,
1929 side: JoinSide::Left,
1930 },
1931 ColumnIndex {
1932 index: right_schema.index_of("ra1")?,
1933 side: JoinSide::Right,
1934 },
1935 ];
1936 let filter =
1937 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1938
1939 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1940 Ok(())
1941 }
1942
1943 #[rstest]
1944 #[tokio::test(flavor = "multi_thread")]
1945 async fn join_all_one_ascending_numeric(
1946 #[values(
1947 JoinType::Inner,
1948 JoinType::Left,
1949 JoinType::Right,
1950 JoinType::RightSemi,
1951 JoinType::LeftSemi,
1952 JoinType::LeftAnti,
1953 JoinType::LeftMark,
1954 JoinType::RightAnti,
1955 JoinType::Full
1956 )]
1957 join_type: JoinType,
1958 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1959 ) -> Result<()> {
1960 let task_ctx = Arc::new(TaskContext::default());
1961 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1962
1963 let left_schema = &left_partition[0].schema();
1964 let right_schema = &right_partition[0].schema();
1965
1966 let left_sorted = [PhysicalSortExpr {
1967 expr: col("la1", left_schema)?,
1968 options: SortOptions::default(),
1969 }]
1970 .into();
1971 let right_sorted = [PhysicalSortExpr {
1972 expr: col("ra1", right_schema)?,
1973 options: SortOptions::default(),
1974 }]
1975 .into();
1976 let (left, right) = create_memory_table(
1977 left_partition,
1978 right_partition,
1979 vec![left_sorted],
1980 vec![right_sorted],
1981 )?;
1982
1983 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1984
1985 let intermediate_schema = Schema::new(vec![
1986 Field::new("left", DataType::Int32, true),
1987 Field::new("right", DataType::Int32, true),
1988 ]);
1989 let filter_expr = join_expr_tests_fixture_i32(
1990 case_expr,
1991 col("left", &intermediate_schema)?,
1992 col("right", &intermediate_schema)?,
1993 );
1994 let column_indices = vec![
1995 ColumnIndex {
1996 index: 0,
1997 side: JoinSide::Left,
1998 },
1999 ColumnIndex {
2000 index: 0,
2001 side: JoinSide::Right,
2002 },
2003 ];
2004 let filter =
2005 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2006
2007 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2008 Ok(())
2009 }
2010
2011 #[rstest]
2012 #[tokio::test(flavor = "multi_thread")]
2013 async fn join_without_sort_information(
2014 #[values(
2015 JoinType::Inner,
2016 JoinType::Left,
2017 JoinType::Right,
2018 JoinType::RightSemi,
2019 JoinType::LeftSemi,
2020 JoinType::LeftAnti,
2021 JoinType::LeftMark,
2022 JoinType::RightAnti,
2023 JoinType::Full
2024 )]
2025 join_type: JoinType,
2026 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2027 ) -> Result<()> {
2028 let task_ctx = Arc::new(TaskContext::default());
2029 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
2030
2031 let left_schema = &left_partition[0].schema();
2032 let right_schema = &right_partition[0].schema();
2033 let (left, right) =
2034 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2035
2036 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2037
2038 let intermediate_schema = Schema::new(vec![
2039 Field::new("left", DataType::Int32, true),
2040 Field::new("right", DataType::Int32, true),
2041 ]);
2042 let filter_expr = join_expr_tests_fixture_i32(
2043 case_expr,
2044 col("left", &intermediate_schema)?,
2045 col("right", &intermediate_schema)?,
2046 );
2047 let column_indices = vec![
2048 ColumnIndex {
2049 index: 5,
2050 side: JoinSide::Left,
2051 },
2052 ColumnIndex {
2053 index: 5,
2054 side: JoinSide::Right,
2055 },
2056 ];
2057 let filter =
2058 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2059
2060 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2061 Ok(())
2062 }
2063
2064 #[rstest]
2065 #[tokio::test(flavor = "multi_thread")]
2066 async fn join_without_filter(
2067 #[values(
2068 JoinType::Inner,
2069 JoinType::Left,
2070 JoinType::Right,
2071 JoinType::RightSemi,
2072 JoinType::LeftSemi,
2073 JoinType::LeftAnti,
2074 JoinType::LeftMark,
2075 JoinType::RightAnti,
2076 JoinType::Full
2077 )]
2078 join_type: JoinType,
2079 ) -> Result<()> {
2080 let task_ctx = Arc::new(TaskContext::default());
2081 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2082 let left_schema = &left_partition[0].schema();
2083 let right_schema = &right_partition[0].schema();
2084 let (left, right) =
2085 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2086
2087 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2088 experiment(left, right, None, join_type, on, task_ctx).await?;
2089 Ok(())
2090 }
2091
2092 #[rstest]
2093 #[tokio::test(flavor = "multi_thread")]
2094 async fn join_all_one_descending_numeric_particular(
2095 #[values(
2096 JoinType::Inner,
2097 JoinType::Left,
2098 JoinType::Right,
2099 JoinType::RightSemi,
2100 JoinType::LeftSemi,
2101 JoinType::LeftAnti,
2102 JoinType::LeftMark,
2103 JoinType::RightAnti,
2104 JoinType::Full
2105 )]
2106 join_type: JoinType,
2107 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2108 ) -> Result<()> {
2109 let task_ctx = Arc::new(TaskContext::default());
2110 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2111 let left_schema = &left_partition[0].schema();
2112 let right_schema = &right_partition[0].schema();
2113 let left_sorted = [PhysicalSortExpr {
2114 expr: col("la1_des", left_schema)?,
2115 options: SortOptions {
2116 descending: true,
2117 nulls_first: true,
2118 },
2119 }]
2120 .into();
2121 let right_sorted = [PhysicalSortExpr {
2122 expr: col("ra1_des", right_schema)?,
2123 options: SortOptions {
2124 descending: true,
2125 nulls_first: true,
2126 },
2127 }]
2128 .into();
2129 let (left, right) = create_memory_table(
2130 left_partition,
2131 right_partition,
2132 vec![left_sorted],
2133 vec![right_sorted],
2134 )?;
2135
2136 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2137
2138 let intermediate_schema = Schema::new(vec![
2139 Field::new("left", DataType::Int32, true),
2140 Field::new("right", DataType::Int32, true),
2141 ]);
2142 let filter_expr = join_expr_tests_fixture_i32(
2143 case_expr,
2144 col("left", &intermediate_schema)?,
2145 col("right", &intermediate_schema)?,
2146 );
2147 let column_indices = vec![
2148 ColumnIndex {
2149 index: 5,
2150 side: JoinSide::Left,
2151 },
2152 ColumnIndex {
2153 index: 5,
2154 side: JoinSide::Right,
2155 },
2156 ];
2157 let filter =
2158 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2159
2160 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2161 Ok(())
2162 }
2163
2164 #[tokio::test(flavor = "multi_thread")]
2165 async fn build_null_columns_first() -> Result<()> {
2166 let join_type = JoinType::Full;
2167 let case_expr = 1;
2168 let session_config = SessionConfig::new().with_repartition_joins(false);
2169 let task_ctx = TaskContext::default().with_session_config(session_config);
2170 let task_ctx = Arc::new(task_ctx);
2171 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2172 let left_schema = &left_partition[0].schema();
2173 let right_schema = &right_partition[0].schema();
2174 let left_sorted = [PhysicalSortExpr {
2175 expr: col("l_asc_null_first", left_schema)?,
2176 options: SortOptions {
2177 descending: false,
2178 nulls_first: true,
2179 },
2180 }]
2181 .into();
2182 let right_sorted = [PhysicalSortExpr {
2183 expr: col("r_asc_null_first", right_schema)?,
2184 options: SortOptions {
2185 descending: false,
2186 nulls_first: true,
2187 },
2188 }]
2189 .into();
2190 let (left, right) = create_memory_table(
2191 left_partition,
2192 right_partition,
2193 vec![left_sorted],
2194 vec![right_sorted],
2195 )?;
2196
2197 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2198
2199 let intermediate_schema = Schema::new(vec![
2200 Field::new("left", DataType::Int32, true),
2201 Field::new("right", DataType::Int32, true),
2202 ]);
2203 let filter_expr = join_expr_tests_fixture_i32(
2204 case_expr,
2205 col("left", &intermediate_schema)?,
2206 col("right", &intermediate_schema)?,
2207 );
2208 let column_indices = vec![
2209 ColumnIndex {
2210 index: 6,
2211 side: JoinSide::Left,
2212 },
2213 ColumnIndex {
2214 index: 6,
2215 side: JoinSide::Right,
2216 },
2217 ];
2218 let filter =
2219 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2220 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2221 Ok(())
2222 }
2223
2224 #[tokio::test(flavor = "multi_thread")]
2225 async fn build_null_columns_last() -> Result<()> {
2226 let join_type = JoinType::Full;
2227 let case_expr = 1;
2228 let session_config = SessionConfig::new().with_repartition_joins(false);
2229 let task_ctx = TaskContext::default().with_session_config(session_config);
2230 let task_ctx = Arc::new(task_ctx);
2231 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2232
2233 let left_schema = &left_partition[0].schema();
2234 let right_schema = &right_partition[0].schema();
2235 let left_sorted = [PhysicalSortExpr {
2236 expr: col("l_asc_null_last", left_schema)?,
2237 options: SortOptions {
2238 descending: false,
2239 nulls_first: false,
2240 },
2241 }]
2242 .into();
2243 let right_sorted = [PhysicalSortExpr {
2244 expr: col("r_asc_null_last", right_schema)?,
2245 options: SortOptions {
2246 descending: false,
2247 nulls_first: false,
2248 },
2249 }]
2250 .into();
2251 let (left, right) = create_memory_table(
2252 left_partition,
2253 right_partition,
2254 vec![left_sorted],
2255 vec![right_sorted],
2256 )?;
2257
2258 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2259
2260 let intermediate_schema = Schema::new(vec![
2261 Field::new("left", DataType::Int32, true),
2262 Field::new("right", DataType::Int32, true),
2263 ]);
2264 let filter_expr = join_expr_tests_fixture_i32(
2265 case_expr,
2266 col("left", &intermediate_schema)?,
2267 col("right", &intermediate_schema)?,
2268 );
2269 let column_indices = vec![
2270 ColumnIndex {
2271 index: 7,
2272 side: JoinSide::Left,
2273 },
2274 ColumnIndex {
2275 index: 7,
2276 side: JoinSide::Right,
2277 },
2278 ];
2279 let filter =
2280 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2281
2282 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2283 Ok(())
2284 }
2285
2286 #[tokio::test(flavor = "multi_thread")]
2287 async fn build_null_columns_first_descending() -> Result<()> {
2288 let join_type = JoinType::Full;
2289 let cardinality = (10, 11);
2290 let case_expr = 1;
2291 let session_config = SessionConfig::new().with_repartition_joins(false);
2292 let task_ctx = TaskContext::default().with_session_config(session_config);
2293 let task_ctx = Arc::new(task_ctx);
2294 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2295
2296 let left_schema = &left_partition[0].schema();
2297 let right_schema = &right_partition[0].schema();
2298 let left_sorted = [PhysicalSortExpr {
2299 expr: col("l_desc_null_first", left_schema)?,
2300 options: SortOptions {
2301 descending: true,
2302 nulls_first: true,
2303 },
2304 }]
2305 .into();
2306 let right_sorted = [PhysicalSortExpr {
2307 expr: col("r_desc_null_first", right_schema)?,
2308 options: SortOptions {
2309 descending: true,
2310 nulls_first: true,
2311 },
2312 }]
2313 .into();
2314 let (left, right) = create_memory_table(
2315 left_partition,
2316 right_partition,
2317 vec![left_sorted],
2318 vec![right_sorted],
2319 )?;
2320
2321 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2322
2323 let intermediate_schema = Schema::new(vec![
2324 Field::new("left", DataType::Int32, true),
2325 Field::new("right", DataType::Int32, true),
2326 ]);
2327 let filter_expr = join_expr_tests_fixture_i32(
2328 case_expr,
2329 col("left", &intermediate_schema)?,
2330 col("right", &intermediate_schema)?,
2331 );
2332 let column_indices = vec![
2333 ColumnIndex {
2334 index: 8,
2335 side: JoinSide::Left,
2336 },
2337 ColumnIndex {
2338 index: 8,
2339 side: JoinSide::Right,
2340 },
2341 ];
2342 let filter =
2343 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2344
2345 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2346 Ok(())
2347 }
2348
2349 #[tokio::test(flavor = "multi_thread")]
2350 async fn complex_join_all_one_ascending_numeric_missing_stat() -> Result<()> {
2351 let cardinality = (3, 4);
2352 let join_type = JoinType::Full;
2353
2354 let session_config = SessionConfig::new().with_repartition_joins(false);
2356 let task_ctx = TaskContext::default().with_session_config(session_config);
2357 let task_ctx = Arc::new(task_ctx);
2358 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2359
2360 let left_schema = &left_partition[0].schema();
2361 let right_schema = &right_partition[0].schema();
2362 let left_sorted = [PhysicalSortExpr {
2363 expr: col("la1", left_schema)?,
2364 options: SortOptions::default(),
2365 }]
2366 .into();
2367 let right_sorted = [PhysicalSortExpr {
2368 expr: col("ra1", right_schema)?,
2369 options: SortOptions::default(),
2370 }]
2371 .into();
2372 let (left, right) = create_memory_table(
2373 left_partition,
2374 right_partition,
2375 vec![left_sorted],
2376 vec![right_sorted],
2377 )?;
2378
2379 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2380
2381 let intermediate_schema = Schema::new(vec![
2382 Field::new("0", DataType::Int32, true),
2383 Field::new("1", DataType::Int32, true),
2384 Field::new("2", DataType::Int32, true),
2385 ]);
2386 let filter_expr = complicated_filter(&intermediate_schema)?;
2387 let column_indices = vec![
2388 ColumnIndex {
2389 index: 0,
2390 side: JoinSide::Left,
2391 },
2392 ColumnIndex {
2393 index: 4,
2394 side: JoinSide::Left,
2395 },
2396 ColumnIndex {
2397 index: 0,
2398 side: JoinSide::Right,
2399 },
2400 ];
2401 let filter =
2402 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2403
2404 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2405 Ok(())
2406 }
2407
2408 #[tokio::test(flavor = "multi_thread")]
2409 async fn complex_join_all_one_ascending_equivalence() -> Result<()> {
2410 let cardinality = (3, 4);
2411 let join_type = JoinType::Full;
2412
2413 let config = SessionConfig::new().with_repartition_joins(false);
2415 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
2418 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2419 let left_schema = &left_partition[0].schema();
2420 let right_schema = &right_partition[0].schema();
2421 let left_sorted = vec![
2422 [PhysicalSortExpr {
2423 expr: col("la1", left_schema)?,
2424 options: SortOptions::default(),
2425 }]
2426 .into(),
2427 [PhysicalSortExpr {
2428 expr: col("la2", left_schema)?,
2429 options: SortOptions::default(),
2430 }]
2431 .into(),
2432 ];
2433
2434 let right_sorted = [PhysicalSortExpr {
2435 expr: col("ra1", right_schema)?,
2436 options: SortOptions::default(),
2437 }]
2438 .into();
2439
2440 let (left, right) = create_memory_table(
2441 left_partition,
2442 right_partition,
2443 left_sorted,
2444 vec![right_sorted],
2445 )?;
2446
2447 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2448
2449 let intermediate_schema = Schema::new(vec![
2450 Field::new("0", DataType::Int32, true),
2451 Field::new("1", DataType::Int32, true),
2452 Field::new("2", DataType::Int32, true),
2453 ]);
2454 let filter_expr = complicated_filter(&intermediate_schema)?;
2455 let column_indices = vec![
2456 ColumnIndex {
2457 index: 0,
2458 side: JoinSide::Left,
2459 },
2460 ColumnIndex {
2461 index: 4,
2462 side: JoinSide::Left,
2463 },
2464 ColumnIndex {
2465 index: 0,
2466 side: JoinSide::Right,
2467 },
2468 ];
2469 let filter =
2470 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2471
2472 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2473 Ok(())
2474 }
2475
2476 #[rstest]
2477 #[tokio::test(flavor = "multi_thread")]
2478 async fn testing_with_temporal_columns(
2479 #[values(
2480 JoinType::Inner,
2481 JoinType::Left,
2482 JoinType::Right,
2483 JoinType::RightSemi,
2484 JoinType::LeftSemi,
2485 JoinType::LeftAnti,
2486 JoinType::LeftMark,
2487 JoinType::RightAnti,
2488 JoinType::Full
2489 )]
2490 join_type: JoinType,
2491 #[values(
2492 (4, 5),
2493 (12, 17),
2494 )]
2495 cardinality: (i32, i32),
2496 #[values(0, 1, 2)] case_expr: usize,
2497 ) -> Result<()> {
2498 let session_config = SessionConfig::new().with_repartition_joins(false);
2499 let task_ctx = TaskContext::default().with_session_config(session_config);
2500 let task_ctx = Arc::new(task_ctx);
2501 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2502
2503 let left_schema = &left_partition[0].schema();
2504 let right_schema = &right_partition[0].schema();
2505 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2506 let left_sorted = [PhysicalSortExpr {
2507 expr: col("lt1", left_schema)?,
2508 options: SortOptions {
2509 descending: false,
2510 nulls_first: true,
2511 },
2512 }]
2513 .into();
2514 let right_sorted = [PhysicalSortExpr {
2515 expr: col("rt1", right_schema)?,
2516 options: SortOptions {
2517 descending: false,
2518 nulls_first: true,
2519 },
2520 }]
2521 .into();
2522 let (left, right) = create_memory_table(
2523 left_partition,
2524 right_partition,
2525 vec![left_sorted],
2526 vec![right_sorted],
2527 )?;
2528 let intermediate_schema = Schema::new(vec![
2529 Field::new(
2530 "left",
2531 DataType::Timestamp(TimeUnit::Millisecond, None),
2532 false,
2533 ),
2534 Field::new(
2535 "right",
2536 DataType::Timestamp(TimeUnit::Millisecond, None),
2537 false,
2538 ),
2539 ]);
2540 let filter_expr = join_expr_tests_fixture_temporal(
2541 case_expr,
2542 col("left", &intermediate_schema)?,
2543 col("right", &intermediate_schema)?,
2544 &intermediate_schema,
2545 )?;
2546 let column_indices = vec![
2547 ColumnIndex {
2548 index: 3,
2549 side: JoinSide::Left,
2550 },
2551 ColumnIndex {
2552 index: 3,
2553 side: JoinSide::Right,
2554 },
2555 ];
2556 let filter =
2557 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2558 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2559 Ok(())
2560 }
2561
2562 #[rstest]
2563 #[tokio::test(flavor = "multi_thread")]
2564 async fn test_with_interval_columns(
2565 #[values(
2566 JoinType::Inner,
2567 JoinType::Left,
2568 JoinType::Right,
2569 JoinType::RightSemi,
2570 JoinType::LeftSemi,
2571 JoinType::LeftAnti,
2572 JoinType::LeftMark,
2573 JoinType::RightAnti,
2574 JoinType::Full
2575 )]
2576 join_type: JoinType,
2577 #[values(
2578 (4, 5),
2579 (12, 17),
2580 )]
2581 cardinality: (i32, i32),
2582 ) -> Result<()> {
2583 let session_config = SessionConfig::new().with_repartition_joins(false);
2584 let task_ctx = TaskContext::default().with_session_config(session_config);
2585 let task_ctx = Arc::new(task_ctx);
2586 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2587
2588 let left_schema = &left_partition[0].schema();
2589 let right_schema = &right_partition[0].schema();
2590 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2591 let left_sorted = [PhysicalSortExpr {
2592 expr: col("li1", left_schema)?,
2593 options: SortOptions {
2594 descending: false,
2595 nulls_first: true,
2596 },
2597 }]
2598 .into();
2599 let right_sorted = [PhysicalSortExpr {
2600 expr: col("ri1", right_schema)?,
2601 options: SortOptions {
2602 descending: false,
2603 nulls_first: true,
2604 },
2605 }]
2606 .into();
2607 let (left, right) = create_memory_table(
2608 left_partition,
2609 right_partition,
2610 vec![left_sorted],
2611 vec![right_sorted],
2612 )?;
2613 let intermediate_schema = Schema::new(vec![
2614 Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
2615 Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
2616 ]);
2617 let filter_expr = join_expr_tests_fixture_temporal(
2618 0,
2619 col("left", &intermediate_schema)?,
2620 col("right", &intermediate_schema)?,
2621 &intermediate_schema,
2622 )?;
2623 let column_indices = vec![
2624 ColumnIndex {
2625 index: 9,
2626 side: JoinSide::Left,
2627 },
2628 ColumnIndex {
2629 index: 9,
2630 side: JoinSide::Right,
2631 },
2632 ];
2633 let filter =
2634 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2635 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2636
2637 Ok(())
2638 }
2639
2640 #[rstest]
2641 #[tokio::test(flavor = "multi_thread")]
2642 async fn testing_ascending_float_pruning(
2643 #[values(
2644 JoinType::Inner,
2645 JoinType::Left,
2646 JoinType::Right,
2647 JoinType::RightSemi,
2648 JoinType::LeftSemi,
2649 JoinType::LeftAnti,
2650 JoinType::LeftMark,
2651 JoinType::RightAnti,
2652 JoinType::Full
2653 )]
2654 join_type: JoinType,
2655 #[values(
2656 (4, 5),
2657 (12, 17),
2658 )]
2659 cardinality: (i32, i32),
2660 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2661 ) -> Result<()> {
2662 let session_config = SessionConfig::new().with_repartition_joins(false);
2663 let task_ctx = TaskContext::default().with_session_config(session_config);
2664 let task_ctx = Arc::new(task_ctx);
2665 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2666
2667 let left_schema = &left_partition[0].schema();
2668 let right_schema = &right_partition[0].schema();
2669 let left_sorted = [PhysicalSortExpr {
2670 expr: col("l_float", left_schema)?,
2671 options: SortOptions::default(),
2672 }]
2673 .into();
2674 let right_sorted = [PhysicalSortExpr {
2675 expr: col("r_float", right_schema)?,
2676 options: SortOptions::default(),
2677 }]
2678 .into();
2679 let (left, right) = create_memory_table(
2680 left_partition,
2681 right_partition,
2682 vec![left_sorted],
2683 vec![right_sorted],
2684 )?;
2685
2686 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2687
2688 let intermediate_schema = Schema::new(vec![
2689 Field::new("left", DataType::Float64, true),
2690 Field::new("right", DataType::Float64, true),
2691 ]);
2692 let filter_expr = join_expr_tests_fixture_f64(
2693 case_expr,
2694 col("left", &intermediate_schema)?,
2695 col("right", &intermediate_schema)?,
2696 );
2697 let column_indices = vec![
2698 ColumnIndex {
2699 index: 10, side: JoinSide::Left,
2701 },
2702 ColumnIndex {
2703 index: 10, side: JoinSide::Right,
2705 },
2706 ];
2707 let filter =
2708 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2709
2710 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2711 Ok(())
2712 }
2713}