1use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::execution_plan::{boundedness_from_children, EmissionType};
25use crate::filter_pushdown::{
26 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
27 FilterPushdownPropagation,
28};
29use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
30use crate::joins::hash_join::stream::{
31 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
32};
33use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
34use crate::joins::utils::{
35 asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection,
36 update_hash, OnceAsync, OnceFut,
37};
38use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
39use crate::projection::{
40 try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
41 ProjectionExec,
42};
43use crate::spill::get_record_batch_memory_size;
44use crate::ExecutionPlanProperties;
45use crate::{
46 common::can_project,
47 joins::utils::{
48 build_join_schema, check_join_is_valid, estimate_join_statistics,
49 need_produce_result_in_final, symmetric_join_output_partitioning,
50 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
51 },
52 metrics::{ExecutionPlanMetricsSet, MetricsSet},
53 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
54 PlanProperties, SendableRecordBatchStream, Statistics,
55};
56
57use arrow::array::{Array, ArrayRef, BooleanBufferBuilder};
58use arrow::compute::concat_batches;
59use arrow::datatypes::SchemaRef;
60use arrow::record_batch::RecordBatch;
61use arrow::util::bit_util;
62use datafusion_common::config::ConfigOptions;
63use datafusion_common::utils::memory::estimate_memory_size;
64use datafusion_common::{
65 internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
66 ScalarValue,
67};
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_execution::TaskContext;
70use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
71use datafusion_physical_expr::equivalence::{
72 join_equivalence_properties, ProjectionMapping,
73};
74use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
75use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
76
77use ahash::RandomState;
78use datafusion_physical_expr_common::physical_expr::fmt_sql;
79use futures::TryStreamExt;
80use parking_lot::Mutex;
81
82const HASH_JOIN_SEED: RandomState =
84 RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
85
86pub(super) struct JoinLeftData {
88 pub(super) hash_map: Box<dyn JoinHashMapType>,
90 batch: RecordBatch,
92 values: Vec<ArrayRef>,
94 visited_indices_bitmap: SharedBitmapBuilder,
96 probe_threads_counter: AtomicUsize,
99 _reservation: MemoryReservation,
104 pub(super) bounds: Option<Vec<ColumnBounds>>,
106}
107
108impl JoinLeftData {
109 pub(super) fn new(
111 hash_map: Box<dyn JoinHashMapType>,
112 batch: RecordBatch,
113 values: Vec<ArrayRef>,
114 visited_indices_bitmap: SharedBitmapBuilder,
115 probe_threads_counter: AtomicUsize,
116 reservation: MemoryReservation,
117 bounds: Option<Vec<ColumnBounds>>,
118 ) -> Self {
119 Self {
120 hash_map,
121 batch,
122 values,
123 visited_indices_bitmap,
124 probe_threads_counter,
125 _reservation: reservation,
126 bounds,
127 }
128 }
129
130 pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
132 &*self.hash_map
133 }
134
135 pub(super) fn batch(&self) -> &RecordBatch {
137 &self.batch
138 }
139
140 pub(super) fn values(&self) -> &[ArrayRef] {
142 &self.values
143 }
144
145 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
147 &self.visited_indices_bitmap
148 }
149
150 pub(super) fn report_probe_completed(&self) -> bool {
153 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
154 }
155}
156
157#[allow(rustdoc::private_intra_doc_links)]
158pub struct HashJoinExec {
322 pub left: Arc<dyn ExecutionPlan>,
324 pub right: Arc<dyn ExecutionPlan>,
326 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
328 pub filter: Option<JoinFilter>,
330 pub join_type: JoinType,
332 join_schema: SchemaRef,
335 left_fut: Arc<OnceAsync<JoinLeftData>>,
342 random_state: RandomState,
344 pub mode: PartitionMode,
346 metrics: ExecutionPlanMetricsSet,
348 pub projection: Option<Vec<usize>>,
350 column_indices: Vec<ColumnIndex>,
352 pub null_equality: NullEquality,
354 cache: PlanProperties,
356 dynamic_filter: Option<HashJoinExecDynamicFilter>,
360}
361
362#[derive(Clone)]
363struct HashJoinExecDynamicFilter {
364 filter: Arc<DynamicFilterPhysicalExpr>,
366 bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
369}
370
371impl fmt::Debug for HashJoinExec {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 f.debug_struct("HashJoinExec")
374 .field("left", &self.left)
375 .field("right", &self.right)
376 .field("on", &self.on)
377 .field("filter", &self.filter)
378 .field("join_type", &self.join_type)
379 .field("join_schema", &self.join_schema)
380 .field("left_fut", &self.left_fut)
381 .field("random_state", &self.random_state)
382 .field("mode", &self.mode)
383 .field("metrics", &self.metrics)
384 .field("projection", &self.projection)
385 .field("column_indices", &self.column_indices)
386 .field("null_equality", &self.null_equality)
387 .field("cache", &self.cache)
388 .finish()
390 }
391}
392
393impl EmbeddedProjection for HashJoinExec {
394 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
395 self.with_projection(projection)
396 }
397}
398
399impl HashJoinExec {
400 #[allow(clippy::too_many_arguments)]
405 pub fn try_new(
406 left: Arc<dyn ExecutionPlan>,
407 right: Arc<dyn ExecutionPlan>,
408 on: JoinOn,
409 filter: Option<JoinFilter>,
410 join_type: &JoinType,
411 projection: Option<Vec<usize>>,
412 partition_mode: PartitionMode,
413 null_equality: NullEquality,
414 ) -> Result<Self> {
415 let left_schema = left.schema();
416 let right_schema = right.schema();
417 if on.is_empty() {
418 return plan_err!("On constraints in HashJoinExec should be non-empty");
419 }
420
421 check_join_is_valid(&left_schema, &right_schema, &on)?;
422
423 let (join_schema, column_indices) =
424 build_join_schema(&left_schema, &right_schema, join_type);
425
426 let random_state = HASH_JOIN_SEED;
427
428 let join_schema = Arc::new(join_schema);
429
430 can_project(&join_schema, projection.as_ref())?;
432
433 let cache = Self::compute_properties(
434 &left,
435 &right,
436 Arc::clone(&join_schema),
437 *join_type,
438 &on,
439 partition_mode,
440 projection.as_ref(),
441 )?;
442
443 Ok(HashJoinExec {
447 left,
448 right,
449 on,
450 filter,
451 join_type: *join_type,
452 join_schema,
453 left_fut: Default::default(),
454 random_state,
455 mode: partition_mode,
456 metrics: ExecutionPlanMetricsSet::new(),
457 projection,
458 column_indices,
459 null_equality,
460 cache,
461 dynamic_filter: None,
462 })
463 }
464
465 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
466 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
469 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
471 }
472
473 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
475 &self.left
476 }
477
478 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
480 &self.right
481 }
482
483 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
485 &self.on
486 }
487
488 pub fn filter(&self) -> Option<&JoinFilter> {
490 self.filter.as_ref()
491 }
492
493 pub fn join_type(&self) -> &JoinType {
495 &self.join_type
496 }
497
498 pub fn join_schema(&self) -> &SchemaRef {
501 &self.join_schema
502 }
503
504 pub fn partition_mode(&self) -> &PartitionMode {
506 &self.mode
507 }
508
509 pub fn null_equality(&self) -> NullEquality {
511 self.null_equality
512 }
513
514 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
516 vec![
517 false,
518 matches!(
519 join_type,
520 JoinType::Inner
521 | JoinType::Right
522 | JoinType::RightAnti
523 | JoinType::RightSemi
524 | JoinType::RightMark
525 ),
526 ]
527 }
528
529 pub fn probe_side() -> JoinSide {
531 JoinSide::Right
533 }
534
535 pub fn contains_projection(&self) -> bool {
537 self.projection.is_some()
538 }
539
540 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
542 can_project(&self.schema(), projection.as_ref())?;
544 let projection = match projection {
545 Some(projection) => match &self.projection {
546 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
547 None => Some(projection),
548 },
549 None => None,
550 };
551 Self::try_new(
552 Arc::clone(&self.left),
553 Arc::clone(&self.right),
554 self.on.clone(),
555 self.filter.clone(),
556 &self.join_type,
557 projection,
558 self.mode,
559 self.null_equality,
560 )
561 }
562
563 fn compute_properties(
565 left: &Arc<dyn ExecutionPlan>,
566 right: &Arc<dyn ExecutionPlan>,
567 schema: SchemaRef,
568 join_type: JoinType,
569 on: JoinOnRef,
570 mode: PartitionMode,
571 projection: Option<&Vec<usize>>,
572 ) -> Result<PlanProperties> {
573 let mut eq_properties = join_equivalence_properties(
575 left.equivalence_properties().clone(),
576 right.equivalence_properties().clone(),
577 &join_type,
578 Arc::clone(&schema),
579 &Self::maintains_input_order(join_type),
580 Some(Self::probe_side()),
581 on,
582 )?;
583
584 let mut output_partitioning = match mode {
585 PartitionMode::CollectLeft => {
586 asymmetric_join_output_partitioning(left, right, &join_type)?
587 }
588 PartitionMode::Auto => Partitioning::UnknownPartitioning(
589 right.output_partitioning().partition_count(),
590 ),
591 PartitionMode::Partitioned => {
592 symmetric_join_output_partitioning(left, right, &join_type)?
593 }
594 };
595
596 let emission_type = if left.boundedness().is_unbounded() {
597 EmissionType::Final
598 } else if right.pipeline_behavior() == EmissionType::Incremental {
599 match join_type {
600 JoinType::Inner
603 | JoinType::LeftSemi
604 | JoinType::RightSemi
605 | JoinType::Right
606 | JoinType::RightAnti
607 | JoinType::RightMark => EmissionType::Incremental,
608 JoinType::Left
611 | JoinType::LeftAnti
612 | JoinType::LeftMark
613 | JoinType::Full => EmissionType::Both,
614 }
615 } else {
616 right.pipeline_behavior()
617 };
618
619 if let Some(projection) = projection {
621 let projection_mapping =
623 ProjectionMapping::from_indices(projection, &schema)?;
624 let out_schema = project_schema(&schema, Some(projection))?;
625 output_partitioning =
626 output_partitioning.project(&projection_mapping, &eq_properties);
627 eq_properties = eq_properties.project(&projection_mapping, out_schema);
628 }
629
630 Ok(PlanProperties::new(
631 eq_properties,
632 output_partitioning,
633 emission_type,
634 boundedness_from_children([left, right]),
635 ))
636 }
637
638 pub fn swap_inputs(
662 &self,
663 partition_mode: PartitionMode,
664 ) -> Result<Arc<dyn ExecutionPlan>> {
665 let left = self.left();
666 let right = self.right();
667 let new_join = HashJoinExec::try_new(
668 Arc::clone(right),
669 Arc::clone(left),
670 self.on()
671 .iter()
672 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
673 .collect(),
674 self.filter().map(JoinFilter::swap),
675 &self.join_type().swap(),
676 swap_join_projection(
677 left.schema().fields().len(),
678 right.schema().fields().len(),
679 self.projection.as_ref(),
680 self.join_type(),
681 ),
682 partition_mode,
683 self.null_equality(),
684 )?;
685 if matches!(
687 self.join_type(),
688 JoinType::LeftSemi
689 | JoinType::RightSemi
690 | JoinType::LeftAnti
691 | JoinType::RightAnti
692 ) || self.projection.is_some()
693 {
694 Ok(Arc::new(new_join))
695 } else {
696 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
697 }
698 }
699}
700
701impl DisplayAs for HashJoinExec {
702 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
703 match t {
704 DisplayFormatType::Default | DisplayFormatType::Verbose => {
705 let display_filter = self.filter.as_ref().map_or_else(
706 || "".to_string(),
707 |f| format!(", filter={}", f.expression()),
708 );
709 let display_projections = if self.contains_projection() {
710 format!(
711 ", projection=[{}]",
712 self.projection
713 .as_ref()
714 .unwrap()
715 .iter()
716 .map(|index| format!(
717 "{}@{}",
718 self.join_schema.fields().get(*index).unwrap().name(),
719 index
720 ))
721 .collect::<Vec<_>>()
722 .join(", ")
723 )
724 } else {
725 "".to_string()
726 };
727 let on = self
728 .on
729 .iter()
730 .map(|(c1, c2)| format!("({c1}, {c2})"))
731 .collect::<Vec<String>>()
732 .join(", ");
733 write!(
734 f,
735 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}",
736 self.mode, self.join_type, on, display_filter, display_projections,
737 )
738 }
739 DisplayFormatType::TreeRender => {
740 let on = self
741 .on
742 .iter()
743 .map(|(c1, c2)| {
744 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
745 })
746 .collect::<Vec<String>>()
747 .join(", ");
748
749 if *self.join_type() != JoinType::Inner {
750 writeln!(f, "join_type={:?}", self.join_type)?;
751 }
752
753 writeln!(f, "on={on}")?;
754
755 if let Some(filter) = self.filter.as_ref() {
756 writeln!(f, "filter={filter}")?;
757 }
758
759 Ok(())
760 }
761 }
762 }
763}
764
765impl ExecutionPlan for HashJoinExec {
766 fn name(&self) -> &'static str {
767 "HashJoinExec"
768 }
769
770 fn as_any(&self) -> &dyn Any {
771 self
772 }
773
774 fn properties(&self) -> &PlanProperties {
775 &self.cache
776 }
777
778 fn required_input_distribution(&self) -> Vec<Distribution> {
779 match self.mode {
780 PartitionMode::CollectLeft => vec![
781 Distribution::SinglePartition,
782 Distribution::UnspecifiedDistribution,
783 ],
784 PartitionMode::Partitioned => {
785 let (left_expr, right_expr) = self
786 .on
787 .iter()
788 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
789 .unzip();
790 vec![
791 Distribution::HashPartitioned(left_expr),
792 Distribution::HashPartitioned(right_expr),
793 ]
794 }
795 PartitionMode::Auto => vec![
796 Distribution::UnspecifiedDistribution,
797 Distribution::UnspecifiedDistribution,
798 ],
799 }
800 }
801
802 fn maintains_input_order(&self) -> Vec<bool> {
819 Self::maintains_input_order(self.join_type)
820 }
821
822 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
823 vec![&self.left, &self.right]
824 }
825
826 fn with_new_children(
832 self: Arc<Self>,
833 children: Vec<Arc<dyn ExecutionPlan>>,
834 ) -> Result<Arc<dyn ExecutionPlan>> {
835 Ok(Arc::new(HashJoinExec {
836 left: Arc::clone(&children[0]),
837 right: Arc::clone(&children[1]),
838 on: self.on.clone(),
839 filter: self.filter.clone(),
840 join_type: self.join_type,
841 join_schema: Arc::clone(&self.join_schema),
842 left_fut: Arc::clone(&self.left_fut),
843 random_state: self.random_state.clone(),
844 mode: self.mode,
845 metrics: ExecutionPlanMetricsSet::new(),
846 projection: self.projection.clone(),
847 column_indices: self.column_indices.clone(),
848 null_equality: self.null_equality,
849 cache: Self::compute_properties(
850 &children[0],
851 &children[1],
852 Arc::clone(&self.join_schema),
853 self.join_type,
854 &self.on,
855 self.mode,
856 self.projection.as_ref(),
857 )?,
858 dynamic_filter: self.dynamic_filter.clone(),
860 }))
861 }
862
863 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
864 Ok(Arc::new(HashJoinExec {
865 left: Arc::clone(&self.left),
866 right: Arc::clone(&self.right),
867 on: self.on.clone(),
868 filter: self.filter.clone(),
869 join_type: self.join_type,
870 join_schema: Arc::clone(&self.join_schema),
871 left_fut: Arc::new(OnceAsync::default()),
873 random_state: self.random_state.clone(),
874 mode: self.mode,
875 metrics: ExecutionPlanMetricsSet::new(),
876 projection: self.projection.clone(),
877 column_indices: self.column_indices.clone(),
878 null_equality: self.null_equality,
879 cache: self.cache.clone(),
880 dynamic_filter: None,
882 }))
883 }
884
885 fn execute(
886 &self,
887 partition: usize,
888 context: Arc<TaskContext>,
889 ) -> Result<SendableRecordBatchStream> {
890 let on_left = self
891 .on
892 .iter()
893 .map(|on| Arc::clone(&on.0))
894 .collect::<Vec<_>>();
895 let left_partitions = self.left.output_partitioning().partition_count();
896 let right_partitions = self.right.output_partitioning().partition_count();
897
898 if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions
899 {
900 return internal_err!(
901 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
902 consider using RepartitionExec"
903 );
904 }
905
906 if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
907 return internal_err!(
908 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
909 consider using CoalescePartitionsExec or the EnforceDistribution rule"
910 );
911 }
912
913 let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
914
915 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
916 let left_fut = match self.mode {
917 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
918 let left_stream = self.left.execute(0, Arc::clone(&context))?;
919
920 let reservation =
921 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
922
923 Ok(collect_left_input(
924 self.random_state.clone(),
925 left_stream,
926 on_left.clone(),
927 join_metrics.clone(),
928 reservation,
929 need_produce_result_in_final(self.join_type),
930 self.right().output_partitioning().partition_count(),
931 enable_dynamic_filter_pushdown,
932 ))
933 })?,
934 PartitionMode::Partitioned => {
935 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
936
937 let reservation =
938 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
939 .register(context.memory_pool());
940
941 OnceFut::new(collect_left_input(
942 self.random_state.clone(),
943 left_stream,
944 on_left.clone(),
945 join_metrics.clone(),
946 reservation,
947 need_produce_result_in_final(self.join_type),
948 1,
949 enable_dynamic_filter_pushdown,
950 ))
951 }
952 PartitionMode::Auto => {
953 return plan_err!(
954 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
955 PartitionMode::Auto
956 );
957 }
958 };
959
960 let batch_size = context.session_config().batch_size();
961
962 let bounds_accumulator = enable_dynamic_filter_pushdown
964 .then(|| {
965 self.dynamic_filter.as_ref().map(|df| {
966 let filter = Arc::clone(&df.filter);
967 let on_right = self
968 .on
969 .iter()
970 .map(|(_, right_expr)| Arc::clone(right_expr))
971 .collect::<Vec<_>>();
972 Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
973 Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
974 self.mode,
975 self.left.as_ref(),
976 self.right.as_ref(),
977 filter,
978 on_right,
979 ))
980 })))
981 })
982 })
983 .flatten()
984 .flatten();
985
986 let right_stream = self.right.execute(partition, context)?;
989
990 let column_indices_after_projection = match &self.projection {
992 Some(projection) => projection
993 .iter()
994 .map(|i| self.column_indices[*i].clone())
995 .collect(),
996 None => self.column_indices.clone(),
997 };
998
999 let on_right = self
1000 .on
1001 .iter()
1002 .map(|(_, right_expr)| Arc::clone(right_expr))
1003 .collect::<Vec<_>>();
1004
1005 Ok(Box::pin(HashJoinStream::new(
1006 partition,
1007 self.schema(),
1008 on_right,
1009 self.filter.clone(),
1010 self.join_type,
1011 right_stream,
1012 self.random_state.clone(),
1013 join_metrics,
1014 column_indices_after_projection,
1015 self.null_equality,
1016 HashJoinStreamState::WaitBuildSide,
1017 BuildSide::Initial(BuildSideInitialState { left_fut }),
1018 batch_size,
1019 vec![],
1020 self.right.output_ordering().is_some(),
1021 bounds_accumulator,
1022 )))
1023 }
1024
1025 fn metrics(&self) -> Option<MetricsSet> {
1026 Some(self.metrics.clone_inner())
1027 }
1028
1029 fn statistics(&self) -> Result<Statistics> {
1030 self.partition_statistics(None)
1031 }
1032
1033 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1034 if partition.is_some() {
1035 return Ok(Statistics::new_unknown(&self.schema()));
1036 }
1037 let stats = estimate_join_statistics(
1041 self.left.partition_statistics(None)?,
1042 self.right.partition_statistics(None)?,
1043 self.on.clone(),
1044 &self.join_type,
1045 &self.join_schema,
1046 )?;
1047 Ok(stats.project(self.projection.as_ref()))
1049 }
1050
1051 fn try_swapping_with_projection(
1055 &self,
1056 projection: &ProjectionExec,
1057 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1058 if self.contains_projection() {
1060 return Ok(None);
1061 }
1062
1063 if let Some(JoinData {
1064 projected_left_child,
1065 projected_right_child,
1066 join_filter,
1067 join_on,
1068 }) = try_pushdown_through_join(
1069 projection,
1070 self.left(),
1071 self.right(),
1072 self.on(),
1073 self.schema(),
1074 self.filter(),
1075 )? {
1076 Ok(Some(Arc::new(HashJoinExec::try_new(
1077 Arc::new(projected_left_child),
1078 Arc::new(projected_right_child),
1079 join_on,
1080 join_filter,
1081 self.join_type(),
1082 None,
1084 *self.partition_mode(),
1085 self.null_equality,
1086 )?)))
1087 } else {
1088 try_embed_projection(projection, self)
1089 }
1090 }
1091
1092 fn gather_filters_for_pushdown(
1093 &self,
1094 phase: FilterPushdownPhase,
1095 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1096 config: &ConfigOptions,
1097 ) -> Result<FilterDescription> {
1098 if self.join_type != JoinType::Inner {
1103 return Ok(FilterDescription::all_unsupported(
1104 &parent_filters,
1105 &self.children(),
1106 ));
1107 }
1108
1109 let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1111 &parent_filters,
1112 self.left(),
1113 )?;
1114 let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1115 &parent_filters,
1116 self.right(),
1117 )?;
1118
1119 if matches!(phase, FilterPushdownPhase::Post)
1121 && config.optimizer.enable_dynamic_filter_pushdown
1122 {
1123 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1125 right_child = right_child.with_self_filter(dynamic_filter);
1126 }
1127
1128 Ok(FilterDescription::new()
1129 .with_child(left_child)
1130 .with_child(right_child))
1131 }
1132
1133 fn handle_child_pushdown_result(
1134 &self,
1135 _phase: FilterPushdownPhase,
1136 child_pushdown_result: ChildPushdownResult,
1137 _config: &ConfigOptions,
1138 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1139 if self.join_type != JoinType::Inner {
1144 return Ok(FilterPushdownPropagation::all_unsupported(
1148 child_pushdown_result,
1149 ));
1150 }
1151
1152 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1153 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1157 let predicate = Arc::clone(&filter.predicate);
1160 if let Ok(dynamic_filter) =
1161 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1162 {
1163 let new_node = Arc::new(HashJoinExec {
1165 left: Arc::clone(&self.left),
1166 right: Arc::clone(&self.right),
1167 on: self.on.clone(),
1168 filter: self.filter.clone(),
1169 join_type: self.join_type,
1170 join_schema: Arc::clone(&self.join_schema),
1171 left_fut: Arc::clone(&self.left_fut),
1172 random_state: self.random_state.clone(),
1173 mode: self.mode,
1174 metrics: ExecutionPlanMetricsSet::new(),
1175 projection: self.projection.clone(),
1176 column_indices: self.column_indices.clone(),
1177 null_equality: self.null_equality,
1178 cache: self.cache.clone(),
1179 dynamic_filter: Some(HashJoinExecDynamicFilter {
1180 filter: dynamic_filter,
1181 bounds_accumulator: OnceLock::new(),
1182 }),
1183 });
1184 result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1185 }
1186 }
1187 Ok(result)
1188 }
1189}
1190
1191fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
1193 arrays
1194 .iter()
1195 .map(|array| {
1196 if array.is_empty() {
1197 return Ok(ColumnBounds::new(
1199 ScalarValue::try_from(array.data_type())?,
1200 ScalarValue::try_from(array.data_type())?,
1201 ));
1202 }
1203
1204 let min_val = min_batch(array)?;
1206 let max_val = max_batch(array)?;
1207
1208 Ok(ColumnBounds::new(min_val, max_val))
1209 })
1210 .collect()
1211}
1212
1213#[expect(clippy::too_many_arguments)]
1214async fn collect_left_input(
1243 random_state: RandomState,
1244 left_stream: SendableRecordBatchStream,
1245 on_left: Vec<PhysicalExprRef>,
1246 metrics: BuildProbeJoinMetrics,
1247 reservation: MemoryReservation,
1248 with_visited_indices_bitmap: bool,
1249 probe_threads_count: usize,
1250 should_compute_bounds: bool,
1251) -> Result<JoinLeftData> {
1252 let schema = left_stream.schema();
1253
1254 let initial = (Vec::new(), 0, metrics, reservation);
1258 let (batches, num_rows, metrics, mut reservation) = left_stream
1259 .try_fold(initial, |mut acc, batch| async {
1260 let batch_size = get_record_batch_memory_size(&batch);
1261 acc.3.try_grow(batch_size)?;
1263 acc.2.build_mem_used.add(batch_size);
1265 acc.2.build_input_batches.add(1);
1266 acc.2.build_input_rows.add(batch.num_rows());
1267 acc.1 += batch.num_rows();
1269 acc.0.push(batch);
1271 Ok(acc)
1272 })
1273 .await?;
1274
1275 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1278 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1279
1280 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1283 let estimated_hashtable_size =
1284 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1285 reservation.try_grow(estimated_hashtable_size)?;
1286 metrics.build_mem_used.add(estimated_hashtable_size);
1287 Box::new(JoinHashMapU64::with_capacity(num_rows))
1288 } else {
1289 let estimated_hashtable_size =
1290 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1291 reservation.try_grow(estimated_hashtable_size)?;
1292 metrics.build_mem_used.add(estimated_hashtable_size);
1293 Box::new(JoinHashMapU32::with_capacity(num_rows))
1294 };
1295
1296 let mut hashes_buffer = Vec::new();
1297 let mut offset = 0;
1298
1299 let batches_iter = batches.iter().rev();
1301 for batch in batches_iter.clone() {
1302 hashes_buffer.clear();
1303 hashes_buffer.resize(batch.num_rows(), 0);
1304 update_hash(
1305 &on_left,
1306 batch,
1307 &mut *hashmap,
1308 offset,
1309 &random_state,
1310 &mut hashes_buffer,
1311 0,
1312 true,
1313 )?;
1314 offset += batch.num_rows();
1315 }
1316 let single_batch = concat_batches(&schema, batches_iter)?;
1318
1319 let visited_indices_bitmap = if with_visited_indices_bitmap {
1321 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1322 reservation.try_grow(bitmap_size)?;
1323 metrics.build_mem_used.add(bitmap_size);
1324
1325 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1326 bitmap_buffer.append_n(num_rows, false);
1327 bitmap_buffer
1328 } else {
1329 BooleanBufferBuilder::new(0)
1330 };
1331
1332 let left_values = on_left
1333 .iter()
1334 .map(|c| {
1335 c.evaluate(&single_batch)?
1336 .into_array(single_batch.num_rows())
1337 })
1338 .collect::<Result<Vec<_>>>()?;
1339
1340 let bounds = if should_compute_bounds && num_rows > 0 {
1342 Some(compute_bounds(&left_values)?)
1343 } else {
1344 None
1345 };
1346
1347 let data = JoinLeftData::new(
1348 hashmap,
1349 single_batch,
1350 left_values.clone(),
1351 Mutex::new(visited_indices_bitmap),
1352 AtomicUsize::new(probe_threads_count),
1353 reservation,
1354 bounds,
1355 );
1356
1357 Ok(data)
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use super::*;
1363 use crate::coalesce_partitions::CoalescePartitionsExec;
1364 use crate::joins::hash_join::stream::lookup_join_hashmap;
1365 use crate::test::{assert_join_metrics, TestMemoryExec};
1366 use crate::{
1367 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1368 test::exec::MockExec,
1369 };
1370
1371 use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1372 use arrow::buffer::NullBuffer;
1373 use arrow::datatypes::{DataType, Field};
1374 use arrow_schema::Schema;
1375 use datafusion_common::hash_utils::create_hashes;
1376 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1377 use datafusion_common::{
1378 assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
1379 ScalarValue,
1380 };
1381 use datafusion_execution::config::SessionConfig;
1382 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1383 use datafusion_expr::Operator;
1384 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1385 use datafusion_physical_expr::PhysicalExpr;
1386 use hashbrown::HashTable;
1387 use insta::{allow_duplicates, assert_snapshot};
1388 use rstest::*;
1389 use rstest_reuse::*;
1390
1391 fn div_ceil(a: usize, b: usize) -> usize {
1392 a.div_ceil(b)
1393 }
1394
1395 #[template]
1396 #[rstest]
1397 fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1398
1399 fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1400 let session_config = SessionConfig::default().with_batch_size(batch_size);
1401 Arc::new(TaskContext::default().with_session_config(session_config))
1402 }
1403
1404 fn build_table(
1405 a: (&str, &Vec<i32>),
1406 b: (&str, &Vec<i32>),
1407 c: (&str, &Vec<i32>),
1408 ) -> Arc<dyn ExecutionPlan> {
1409 let batch = build_table_i32(a, b, c);
1410 let schema = batch.schema();
1411 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1412 }
1413
1414 fn join(
1415 left: Arc<dyn ExecutionPlan>,
1416 right: Arc<dyn ExecutionPlan>,
1417 on: JoinOn,
1418 join_type: &JoinType,
1419 null_equality: NullEquality,
1420 ) -> Result<HashJoinExec> {
1421 HashJoinExec::try_new(
1422 left,
1423 right,
1424 on,
1425 None,
1426 join_type,
1427 None,
1428 PartitionMode::CollectLeft,
1429 null_equality,
1430 )
1431 }
1432
1433 fn join_with_filter(
1434 left: Arc<dyn ExecutionPlan>,
1435 right: Arc<dyn ExecutionPlan>,
1436 on: JoinOn,
1437 filter: JoinFilter,
1438 join_type: &JoinType,
1439 null_equality: NullEquality,
1440 ) -> Result<HashJoinExec> {
1441 HashJoinExec::try_new(
1442 left,
1443 right,
1444 on,
1445 Some(filter),
1446 join_type,
1447 None,
1448 PartitionMode::CollectLeft,
1449 null_equality,
1450 )
1451 }
1452
1453 async fn join_collect(
1454 left: Arc<dyn ExecutionPlan>,
1455 right: Arc<dyn ExecutionPlan>,
1456 on: JoinOn,
1457 join_type: &JoinType,
1458 null_equality: NullEquality,
1459 context: Arc<TaskContext>,
1460 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1461 let join = join(left, right, on, join_type, null_equality)?;
1462 let columns_header = columns(&join.schema());
1463
1464 let stream = join.execute(0, context)?;
1465 let batches = common::collect(stream).await?;
1466 let metrics = join.metrics().unwrap();
1467
1468 Ok((columns_header, batches, metrics))
1469 }
1470
1471 async fn partitioned_join_collect(
1472 left: Arc<dyn ExecutionPlan>,
1473 right: Arc<dyn ExecutionPlan>,
1474 on: JoinOn,
1475 join_type: &JoinType,
1476 null_equality: NullEquality,
1477 context: Arc<TaskContext>,
1478 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1479 join_collect_with_partition_mode(
1480 left,
1481 right,
1482 on,
1483 join_type,
1484 PartitionMode::Partitioned,
1485 null_equality,
1486 context,
1487 )
1488 .await
1489 }
1490
1491 async fn join_collect_with_partition_mode(
1492 left: Arc<dyn ExecutionPlan>,
1493 right: Arc<dyn ExecutionPlan>,
1494 on: JoinOn,
1495 join_type: &JoinType,
1496 partition_mode: PartitionMode,
1497 null_equality: NullEquality,
1498 context: Arc<TaskContext>,
1499 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1500 let partition_count = 4;
1501
1502 let (left_expr, right_expr) = on
1503 .iter()
1504 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1505 .unzip();
1506
1507 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1508 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1509 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1510 left,
1511 Partitioning::Hash(left_expr, partition_count),
1512 )?),
1513 PartitionMode::Auto => {
1514 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1515 }
1516 };
1517
1518 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1519 PartitionMode::CollectLeft => {
1520 let partition_column_name = right.schema().field(0).name().clone();
1521 let partition_expr = vec![Arc::new(Column::new_with_schema(
1522 &partition_column_name,
1523 &right.schema(),
1524 )?) as _];
1525 Arc::new(RepartitionExec::try_new(
1526 right,
1527 Partitioning::Hash(partition_expr, partition_count),
1528 )?) as _
1529 }
1530 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1531 right,
1532 Partitioning::Hash(right_expr, partition_count),
1533 )?),
1534 PartitionMode::Auto => {
1535 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1536 }
1537 };
1538
1539 let join = HashJoinExec::try_new(
1540 left_repartitioned,
1541 right_repartitioned,
1542 on,
1543 None,
1544 join_type,
1545 None,
1546 partition_mode,
1547 null_equality,
1548 )?;
1549
1550 let columns = columns(&join.schema());
1551
1552 let mut batches = vec![];
1553 for i in 0..partition_count {
1554 let stream = join.execute(i, Arc::clone(&context))?;
1555 let more_batches = common::collect(stream).await?;
1556 batches.extend(
1557 more_batches
1558 .into_iter()
1559 .filter(|b| b.num_rows() > 0)
1560 .collect::<Vec<_>>(),
1561 );
1562 }
1563 let metrics = join.metrics().unwrap();
1564
1565 Ok((columns, batches, metrics))
1566 }
1567
1568 #[apply(batch_sizes)]
1569 #[tokio::test]
1570 async fn join_inner_one(batch_size: usize) -> Result<()> {
1571 let task_ctx = prepare_task_ctx(batch_size);
1572 let left = build_table(
1573 ("a1", &vec![1, 2, 3]),
1574 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1576 );
1577 let right = build_table(
1578 ("a2", &vec![10, 20, 30]),
1579 ("b1", &vec![4, 5, 6]),
1580 ("c2", &vec![70, 80, 90]),
1581 );
1582
1583 let on = vec![(
1584 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1585 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1586 )];
1587
1588 let (columns, batches, metrics) = join_collect(
1589 Arc::clone(&left),
1590 Arc::clone(&right),
1591 on.clone(),
1592 &JoinType::Inner,
1593 NullEquality::NullEqualsNothing,
1594 task_ctx,
1595 )
1596 .await?;
1597
1598 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1599
1600 allow_duplicates! {
1601 assert_snapshot!(batches_to_string(&batches), @r#"
1603 +----+----+----+----+----+----+
1604 | a1 | b1 | c1 | a2 | b1 | c2 |
1605 +----+----+----+----+----+----+
1606 | 1 | 4 | 7 | 10 | 4 | 70 |
1607 | 2 | 5 | 8 | 20 | 5 | 80 |
1608 | 3 | 5 | 9 | 20 | 5 | 80 |
1609 +----+----+----+----+----+----+
1610 "#);
1611 }
1612
1613 assert_join_metrics!(metrics, 3);
1614
1615 Ok(())
1616 }
1617
1618 #[apply(batch_sizes)]
1619 #[tokio::test]
1620 async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1621 let task_ctx = prepare_task_ctx(batch_size);
1622 let left = build_table(
1623 ("a1", &vec![1, 2, 3]),
1624 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1626 );
1627 let right = build_table(
1628 ("a2", &vec![10, 20, 30]),
1629 ("b1", &vec![4, 5, 6]),
1630 ("c2", &vec![70, 80, 90]),
1631 );
1632 let on = vec![(
1633 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1634 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1635 )];
1636
1637 let (columns, batches, metrics) = partitioned_join_collect(
1638 Arc::clone(&left),
1639 Arc::clone(&right),
1640 on.clone(),
1641 &JoinType::Inner,
1642 NullEquality::NullEqualsNothing,
1643 task_ctx,
1644 )
1645 .await?;
1646
1647 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1648
1649 allow_duplicates! {
1650 assert_snapshot!(batches_to_sort_string(&batches), @r#"
1651 +----+----+----+----+----+----+
1652 | a1 | b1 | c1 | a2 | b1 | c2 |
1653 +----+----+----+----+----+----+
1654 | 1 | 4 | 7 | 10 | 4 | 70 |
1655 | 2 | 5 | 8 | 20 | 5 | 80 |
1656 | 3 | 5 | 9 | 20 | 5 | 80 |
1657 +----+----+----+----+----+----+
1658 "#);
1659 }
1660
1661 assert_join_metrics!(metrics, 3);
1662
1663 Ok(())
1664 }
1665
1666 #[tokio::test]
1667 async fn join_inner_one_no_shared_column_names() -> Result<()> {
1668 let task_ctx = Arc::new(TaskContext::default());
1669 let left = build_table(
1670 ("a1", &vec![1, 2, 3]),
1671 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1673 );
1674 let right = build_table(
1675 ("a2", &vec![10, 20, 30]),
1676 ("b2", &vec![4, 5, 6]),
1677 ("c2", &vec![70, 80, 90]),
1678 );
1679 let on = vec![(
1680 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1681 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1682 )];
1683
1684 let (columns, batches, metrics) = join_collect(
1685 left,
1686 right,
1687 on,
1688 &JoinType::Inner,
1689 NullEquality::NullEqualsNothing,
1690 task_ctx,
1691 )
1692 .await?;
1693
1694 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1695
1696 allow_duplicates! {
1698 assert_snapshot!(batches_to_string(&batches), @r#"
1699 +----+----+----+----+----+----+
1700 | a1 | b1 | c1 | a2 | b2 | c2 |
1701 +----+----+----+----+----+----+
1702 | 1 | 4 | 7 | 10 | 4 | 70 |
1703 | 2 | 5 | 8 | 20 | 5 | 80 |
1704 | 3 | 5 | 9 | 20 | 5 | 80 |
1705 +----+----+----+----+----+----+
1706 "#);
1707 }
1708
1709 assert_join_metrics!(metrics, 3);
1710
1711 Ok(())
1712 }
1713
1714 #[tokio::test]
1715 async fn join_inner_one_randomly_ordered() -> Result<()> {
1716 let task_ctx = Arc::new(TaskContext::default());
1717 let left = build_table(
1718 ("a1", &vec![0, 3, 2, 1]),
1719 ("b1", &vec![4, 5, 5, 4]),
1720 ("c1", &vec![6, 9, 8, 7]),
1721 );
1722 let right = build_table(
1723 ("a2", &vec![20, 30, 10]),
1724 ("b2", &vec![5, 6, 4]),
1725 ("c2", &vec![80, 90, 70]),
1726 );
1727 let on = vec![(
1728 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1729 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1730 )];
1731
1732 let (columns, batches, metrics) = join_collect(
1733 left,
1734 right,
1735 on,
1736 &JoinType::Inner,
1737 NullEquality::NullEqualsNothing,
1738 task_ctx,
1739 )
1740 .await?;
1741
1742 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1743
1744 allow_duplicates! {
1746 assert_snapshot!(batches_to_string(&batches), @r#"
1747 +----+----+----+----+----+----+
1748 | a1 | b1 | c1 | a2 | b2 | c2 |
1749 +----+----+----+----+----+----+
1750 | 3 | 5 | 9 | 20 | 5 | 80 |
1751 | 2 | 5 | 8 | 20 | 5 | 80 |
1752 | 0 | 4 | 6 | 10 | 4 | 70 |
1753 | 1 | 4 | 7 | 10 | 4 | 70 |
1754 +----+----+----+----+----+----+
1755 "#);
1756 }
1757
1758 assert_join_metrics!(metrics, 4);
1759
1760 Ok(())
1761 }
1762
1763 #[apply(batch_sizes)]
1764 #[tokio::test]
1765 async fn join_inner_two(batch_size: usize) -> Result<()> {
1766 let task_ctx = prepare_task_ctx(batch_size);
1767 let left = build_table(
1768 ("a1", &vec![1, 2, 2]),
1769 ("b2", &vec![1, 2, 2]),
1770 ("c1", &vec![7, 8, 9]),
1771 );
1772 let right = build_table(
1773 ("a1", &vec![1, 2, 3]),
1774 ("b2", &vec![1, 2, 2]),
1775 ("c2", &vec![70, 80, 90]),
1776 );
1777 let on = vec![
1778 (
1779 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1780 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1781 ),
1782 (
1783 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1784 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1785 ),
1786 ];
1787
1788 let (columns, batches, metrics) = join_collect(
1789 left,
1790 right,
1791 on,
1792 &JoinType::Inner,
1793 NullEquality::NullEqualsNothing,
1794 task_ctx,
1795 )
1796 .await?;
1797
1798 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1799
1800 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1801 let mut expected_batch_count = div_ceil(3, batch_size);
1804 if batch_size == 1 {
1805 expected_batch_count += 1;
1806 }
1807 expected_batch_count
1808 } else {
1809 div_ceil(9, batch_size)
1812 };
1813
1814 assert_eq!(batches.len(), expected_batch_count);
1815
1816 allow_duplicates! {
1818 assert_snapshot!(batches_to_string(&batches), @r#"
1819 +----+----+----+----+----+----+
1820 | a1 | b2 | c1 | a1 | b2 | c2 |
1821 +----+----+----+----+----+----+
1822 | 1 | 1 | 7 | 1 | 1 | 70 |
1823 | 2 | 2 | 8 | 2 | 2 | 80 |
1824 | 2 | 2 | 9 | 2 | 2 | 80 |
1825 +----+----+----+----+----+----+
1826 "#);
1827 }
1828
1829 assert_join_metrics!(metrics, 3);
1830
1831 Ok(())
1832 }
1833
1834 #[apply(batch_sizes)]
1836 #[tokio::test]
1837 async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
1838 let task_ctx = prepare_task_ctx(batch_size);
1839 let batch1 = build_table_i32(
1840 ("a1", &vec![1, 2]),
1841 ("b2", &vec![1, 2]),
1842 ("c1", &vec![7, 8]),
1843 );
1844 let batch2 =
1845 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
1846 let schema = batch1.schema();
1847 let left =
1848 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1849 .unwrap();
1850 let left = Arc::new(CoalescePartitionsExec::new(left));
1851
1852 let right = build_table(
1853 ("a1", &vec![1, 2, 3]),
1854 ("b2", &vec![1, 2, 2]),
1855 ("c2", &vec![70, 80, 90]),
1856 );
1857 let on = vec![
1858 (
1859 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1860 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1861 ),
1862 (
1863 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1864 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1865 ),
1866 ];
1867
1868 let (columns, batches, metrics) = join_collect(
1869 left,
1870 right,
1871 on,
1872 &JoinType::Inner,
1873 NullEquality::NullEqualsNothing,
1874 task_ctx,
1875 )
1876 .await?;
1877
1878 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1879
1880 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1881 let mut expected_batch_count = div_ceil(3, batch_size);
1884 if batch_size == 1 {
1885 expected_batch_count += 1;
1886 }
1887 expected_batch_count
1888 } else {
1889 div_ceil(9, batch_size)
1892 };
1893
1894 assert_eq!(batches.len(), expected_batch_count);
1895
1896 allow_duplicates! {
1898 assert_snapshot!(batches_to_string(&batches), @r#"
1899 +----+----+----+----+----+----+
1900 | a1 | b2 | c1 | a1 | b2 | c2 |
1901 +----+----+----+----+----+----+
1902 | 1 | 1 | 7 | 1 | 1 | 70 |
1903 | 2 | 2 | 8 | 2 | 2 | 80 |
1904 | 2 | 2 | 9 | 2 | 2 | 80 |
1905 +----+----+----+----+----+----+
1906 "#);
1907 }
1908
1909 assert_join_metrics!(metrics, 3);
1910
1911 Ok(())
1912 }
1913
1914 #[tokio::test]
1915 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
1916 let task_ctx = Arc::new(TaskContext::default());
1917 let batch1 = build_table_i32(
1918 ("a1", &vec![0, 3]),
1919 ("b1", &vec![4, 5]),
1920 ("c1", &vec![6, 9]),
1921 );
1922 let batch2 = build_table_i32(
1923 ("a1", &vec![2, 1]),
1924 ("b1", &vec![5, 4]),
1925 ("c1", &vec![8, 7]),
1926 );
1927 let schema = batch1.schema();
1928
1929 let left =
1930 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1931 .unwrap();
1932 let left = Arc::new(CoalescePartitionsExec::new(left));
1933 let right = build_table(
1934 ("a2", &vec![20, 30, 10]),
1935 ("b2", &vec![5, 6, 4]),
1936 ("c2", &vec![80, 90, 70]),
1937 );
1938 let on = vec![(
1939 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1940 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1941 )];
1942
1943 let (columns, batches, metrics) = join_collect(
1944 left,
1945 right,
1946 on,
1947 &JoinType::Inner,
1948 NullEquality::NullEqualsNothing,
1949 task_ctx,
1950 )
1951 .await?;
1952
1953 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1954
1955 allow_duplicates! {
1957 assert_snapshot!(batches_to_string(&batches), @r#"
1958 +----+----+----+----+----+----+
1959 | a1 | b1 | c1 | a2 | b2 | c2 |
1960 +----+----+----+----+----+----+
1961 | 3 | 5 | 9 | 20 | 5 | 80 |
1962 | 2 | 5 | 8 | 20 | 5 | 80 |
1963 | 0 | 4 | 6 | 10 | 4 | 70 |
1964 | 1 | 4 | 7 | 10 | 4 | 70 |
1965 +----+----+----+----+----+----+
1966 "#);
1967 }
1968
1969 assert_join_metrics!(metrics, 4);
1970
1971 Ok(())
1972 }
1973
1974 #[apply(batch_sizes)]
1976 #[tokio::test]
1977 async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
1978 let task_ctx = prepare_task_ctx(batch_size);
1979 let left = build_table(
1980 ("a1", &vec![1, 2, 3]),
1981 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1983 );
1984
1985 let batch1 = build_table_i32(
1986 ("a2", &vec![10, 20]),
1987 ("b1", &vec![4, 6]),
1988 ("c2", &vec![70, 80]),
1989 );
1990 let batch2 =
1991 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
1992 let schema = batch1.schema();
1993 let right =
1994 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1995 .unwrap();
1996
1997 let on = vec![(
1998 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1999 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2000 )];
2001
2002 let join = join(
2003 left,
2004 right,
2005 on,
2006 &JoinType::Inner,
2007 NullEquality::NullEqualsNothing,
2008 )?;
2009
2010 let columns = columns(&join.schema());
2011 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2012
2013 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2015 let batches = common::collect(stream).await?;
2016
2017 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2018 let mut expected_batch_count = div_ceil(1, batch_size);
2021 if batch_size == 1 {
2022 expected_batch_count += 1;
2023 }
2024 expected_batch_count
2025 } else {
2026 div_ceil(6, batch_size)
2029 };
2030 assert_eq!(batches.len(), expected_batch_count);
2031
2032 allow_duplicates! {
2034 assert_snapshot!(batches_to_string(&batches), @r#"
2035 +----+----+----+----+----+----+
2036 | a1 | b1 | c1 | a2 | b1 | c2 |
2037 +----+----+----+----+----+----+
2038 | 1 | 4 | 7 | 10 | 4 | 70 |
2039 +----+----+----+----+----+----+
2040 "#);
2041 }
2042
2043 let stream = join.execute(1, Arc::clone(&task_ctx))?;
2045 let batches = common::collect(stream).await?;
2046
2047 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2048 div_ceil(2, batch_size)
2050 } else {
2051 div_ceil(3, batch_size)
2054 };
2055 assert_eq!(batches.len(), expected_batch_count);
2056
2057 allow_duplicates! {
2059 assert_snapshot!(batches_to_string(&batches), @r#"
2060 +----+----+----+----+----+----+
2061 | a1 | b1 | c1 | a2 | b1 | c2 |
2062 +----+----+----+----+----+----+
2063 | 2 | 5 | 8 | 30 | 5 | 90 |
2064 | 3 | 5 | 9 | 30 | 5 | 90 |
2065 +----+----+----+----+----+----+
2066 "#);
2067 }
2068
2069 Ok(())
2070 }
2071
2072 fn build_table_two_batches(
2073 a: (&str, &Vec<i32>),
2074 b: (&str, &Vec<i32>),
2075 c: (&str, &Vec<i32>),
2076 ) -> Arc<dyn ExecutionPlan> {
2077 let batch = build_table_i32(a, b, c);
2078 let schema = batch.schema();
2079 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2080 }
2081
2082 #[apply(batch_sizes)]
2083 #[tokio::test]
2084 async fn join_left_multi_batch(batch_size: usize) {
2085 let task_ctx = prepare_task_ctx(batch_size);
2086 let left = build_table(
2087 ("a1", &vec![1, 2, 3]),
2088 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2090 );
2091 let right = build_table_two_batches(
2092 ("a2", &vec![10, 20, 30]),
2093 ("b1", &vec![4, 5, 6]),
2094 ("c2", &vec![70, 80, 90]),
2095 );
2096 let on = vec![(
2097 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2098 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2099 )];
2100
2101 let join = join(
2102 left,
2103 right,
2104 on,
2105 &JoinType::Left,
2106 NullEquality::NullEqualsNothing,
2107 )
2108 .unwrap();
2109
2110 let columns = columns(&join.schema());
2111 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2112
2113 let stream = join.execute(0, task_ctx).unwrap();
2114 let batches = common::collect(stream).await.unwrap();
2115
2116 allow_duplicates! {
2117 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2118 +----+----+----+----+----+----+
2119 | a1 | b1 | c1 | a2 | b1 | c2 |
2120 +----+----+----+----+----+----+
2121 | 1 | 4 | 7 | 10 | 4 | 70 |
2122 | 1 | 4 | 7 | 10 | 4 | 70 |
2123 | 2 | 5 | 8 | 20 | 5 | 80 |
2124 | 2 | 5 | 8 | 20 | 5 | 80 |
2125 | 3 | 7 | 9 | | | |
2126 +----+----+----+----+----+----+
2127 "#);
2128 }
2129 }
2130
2131 #[apply(batch_sizes)]
2132 #[tokio::test]
2133 async fn join_full_multi_batch(batch_size: usize) {
2134 let task_ctx = prepare_task_ctx(batch_size);
2135 let left = build_table(
2136 ("a1", &vec![1, 2, 3]),
2137 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2139 );
2140 let right = build_table_two_batches(
2142 ("a2", &vec![10, 20, 30]),
2143 ("b2", &vec![4, 5, 6]),
2144 ("c2", &vec![70, 80, 90]),
2145 );
2146 let on = vec![(
2147 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2148 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2149 )];
2150
2151 let join = join(
2152 left,
2153 right,
2154 on,
2155 &JoinType::Full,
2156 NullEquality::NullEqualsNothing,
2157 )
2158 .unwrap();
2159
2160 let columns = columns(&join.schema());
2161 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2162
2163 let stream = join.execute(0, task_ctx).unwrap();
2164 let batches = common::collect(stream).await.unwrap();
2165
2166 allow_duplicates! {
2167 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2168 +----+----+----+----+----+----+
2169 | a1 | b1 | c1 | a2 | b2 | c2 |
2170 +----+----+----+----+----+----+
2171 | | | | 30 | 6 | 90 |
2172 | | | | 30 | 6 | 90 |
2173 | 1 | 4 | 7 | 10 | 4 | 70 |
2174 | 1 | 4 | 7 | 10 | 4 | 70 |
2175 | 2 | 5 | 8 | 20 | 5 | 80 |
2176 | 2 | 5 | 8 | 20 | 5 | 80 |
2177 | 3 | 7 | 9 | | | |
2178 +----+----+----+----+----+----+
2179 "#);
2180 }
2181 }
2182
2183 #[apply(batch_sizes)]
2184 #[tokio::test]
2185 async fn join_left_empty_right(batch_size: usize) {
2186 let task_ctx = prepare_task_ctx(batch_size);
2187 let left = build_table(
2188 ("a1", &vec![1, 2, 3]),
2189 ("b1", &vec![4, 5, 7]),
2190 ("c1", &vec![7, 8, 9]),
2191 );
2192 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2193 let on = vec![(
2194 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2195 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2196 )];
2197 let schema = right.schema();
2198 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2199 let join = join(
2200 left,
2201 right,
2202 on,
2203 &JoinType::Left,
2204 NullEquality::NullEqualsNothing,
2205 )
2206 .unwrap();
2207
2208 let columns = columns(&join.schema());
2209 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2210
2211 let stream = join.execute(0, task_ctx).unwrap();
2212 let batches = common::collect(stream).await.unwrap();
2213
2214 allow_duplicates! {
2215 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2216 +----+----+----+----+----+----+
2217 | a1 | b1 | c1 | a2 | b1 | c2 |
2218 +----+----+----+----+----+----+
2219 | 1 | 4 | 7 | | | |
2220 | 2 | 5 | 8 | | | |
2221 | 3 | 7 | 9 | | | |
2222 +----+----+----+----+----+----+
2223 "#);
2224 }
2225 }
2226
2227 #[apply(batch_sizes)]
2228 #[tokio::test]
2229 async fn join_full_empty_right(batch_size: usize) {
2230 let task_ctx = prepare_task_ctx(batch_size);
2231 let left = build_table(
2232 ("a1", &vec![1, 2, 3]),
2233 ("b1", &vec![4, 5, 7]),
2234 ("c1", &vec![7, 8, 9]),
2235 );
2236 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2237 let on = vec![(
2238 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2239 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2240 )];
2241 let schema = right.schema();
2242 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2243 let join = join(
2244 left,
2245 right,
2246 on,
2247 &JoinType::Full,
2248 NullEquality::NullEqualsNothing,
2249 )
2250 .unwrap();
2251
2252 let columns = columns(&join.schema());
2253 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2254
2255 let stream = join.execute(0, task_ctx).unwrap();
2256 let batches = common::collect(stream).await.unwrap();
2257
2258 allow_duplicates! {
2259 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2260 +----+----+----+----+----+----+
2261 | a1 | b1 | c1 | a2 | b2 | c2 |
2262 +----+----+----+----+----+----+
2263 | 1 | 4 | 7 | | | |
2264 | 2 | 5 | 8 | | | |
2265 | 3 | 7 | 9 | | | |
2266 +----+----+----+----+----+----+
2267 "#);
2268 }
2269 }
2270
2271 #[apply(batch_sizes)]
2272 #[tokio::test]
2273 async fn join_left_one(batch_size: usize) -> Result<()> {
2274 let task_ctx = prepare_task_ctx(batch_size);
2275 let left = build_table(
2276 ("a1", &vec![1, 2, 3]),
2277 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2279 );
2280 let right = build_table(
2281 ("a2", &vec![10, 20, 30]),
2282 ("b1", &vec![4, 5, 6]),
2283 ("c2", &vec![70, 80, 90]),
2284 );
2285 let on = vec![(
2286 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2287 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2288 )];
2289
2290 let (columns, batches, metrics) = join_collect(
2291 Arc::clone(&left),
2292 Arc::clone(&right),
2293 on.clone(),
2294 &JoinType::Left,
2295 NullEquality::NullEqualsNothing,
2296 task_ctx,
2297 )
2298 .await?;
2299
2300 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2301
2302 allow_duplicates! {
2303 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2304 +----+----+----+----+----+----+
2305 | a1 | b1 | c1 | a2 | b1 | c2 |
2306 +----+----+----+----+----+----+
2307 | 1 | 4 | 7 | 10 | 4 | 70 |
2308 | 2 | 5 | 8 | 20 | 5 | 80 |
2309 | 3 | 7 | 9 | | | |
2310 +----+----+----+----+----+----+
2311 "#);
2312 }
2313
2314 assert_join_metrics!(metrics, 3);
2315
2316 Ok(())
2317 }
2318
2319 #[apply(batch_sizes)]
2320 #[tokio::test]
2321 async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2322 let task_ctx = prepare_task_ctx(batch_size);
2323 let left = build_table(
2324 ("a1", &vec![1, 2, 3]),
2325 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2327 );
2328 let right = build_table(
2329 ("a2", &vec![10, 20, 30]),
2330 ("b1", &vec![4, 5, 6]),
2331 ("c2", &vec![70, 80, 90]),
2332 );
2333 let on = vec![(
2334 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2335 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2336 )];
2337
2338 let (columns, batches, metrics) = partitioned_join_collect(
2339 Arc::clone(&left),
2340 Arc::clone(&right),
2341 on.clone(),
2342 &JoinType::Left,
2343 NullEquality::NullEqualsNothing,
2344 task_ctx,
2345 )
2346 .await?;
2347
2348 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2349
2350 allow_duplicates! {
2351 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2352 +----+----+----+----+----+----+
2353 | a1 | b1 | c1 | a2 | b1 | c2 |
2354 +----+----+----+----+----+----+
2355 | 1 | 4 | 7 | 10 | 4 | 70 |
2356 | 2 | 5 | 8 | 20 | 5 | 80 |
2357 | 3 | 7 | 9 | | | |
2358 +----+----+----+----+----+----+
2359 "#);
2360 }
2361
2362 assert_join_metrics!(metrics, 3);
2363
2364 Ok(())
2365 }
2366
2367 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2368 build_table(
2371 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2372 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2373 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2374 )
2375 }
2376
2377 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2378 build_table(
2381 ("a2", &vec![8, 12, 6, 2, 10, 4]),
2382 ("b2", &vec![8, 10, 6, 2, 10, 4]),
2383 ("c2", &vec![20, 40, 60, 80, 100, 120]),
2384 )
2385 }
2386
2387 #[apply(batch_sizes)]
2388 #[tokio::test]
2389 async fn join_left_semi(batch_size: usize) -> Result<()> {
2390 let task_ctx = prepare_task_ctx(batch_size);
2391 let left = build_semi_anti_left_table();
2392 let right = build_semi_anti_right_table();
2393 let on = vec![(
2395 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2396 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2397 )];
2398
2399 let join = join(
2400 left,
2401 right,
2402 on,
2403 &JoinType::LeftSemi,
2404 NullEquality::NullEqualsNothing,
2405 )?;
2406
2407 let columns = columns(&join.schema());
2408 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2409
2410 let stream = join.execute(0, task_ctx)?;
2411 let batches = common::collect(stream).await?;
2412
2413 allow_duplicates! {
2415 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2416 +----+----+-----+
2417 | a1 | b1 | c1 |
2418 +----+----+-----+
2419 | 11 | 8 | 110 |
2420 | 13 | 10 | 130 |
2421 | 9 | 8 | 90 |
2422 +----+----+-----+
2423 "#);
2424 }
2425
2426 Ok(())
2427 }
2428
2429 #[apply(batch_sizes)]
2430 #[tokio::test]
2431 async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2432 let task_ctx = prepare_task_ctx(batch_size);
2433 let left = build_semi_anti_left_table();
2434 let right = build_semi_anti_right_table();
2435
2436 let on = vec![(
2438 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2439 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2440 )];
2441
2442 let column_indices = vec![ColumnIndex {
2443 index: 0,
2444 side: JoinSide::Right,
2445 }];
2446 let intermediate_schema =
2447 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2448
2449 let filter_expression = Arc::new(BinaryExpr::new(
2450 Arc::new(Column::new("x", 0)),
2451 Operator::NotEq,
2452 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2453 )) as Arc<dyn PhysicalExpr>;
2454
2455 let filter = JoinFilter::new(
2456 filter_expression,
2457 column_indices.clone(),
2458 Arc::new(intermediate_schema.clone()),
2459 );
2460
2461 let join = join_with_filter(
2462 Arc::clone(&left),
2463 Arc::clone(&right),
2464 on.clone(),
2465 filter,
2466 &JoinType::LeftSemi,
2467 NullEquality::NullEqualsNothing,
2468 )?;
2469
2470 let columns_header = columns(&join.schema());
2471 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2472
2473 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2474 let batches = common::collect(stream).await?;
2475
2476 allow_duplicates! {
2477 assert_snapshot!(batches_to_sort_string(&batches), @r"
2478 +----+----+-----+
2479 | a1 | b1 | c1 |
2480 +----+----+-----+
2481 | 11 | 8 | 110 |
2482 | 13 | 10 | 130 |
2483 | 9 | 8 | 90 |
2484 +----+----+-----+
2485 ");
2486 }
2487
2488 let filter_expression = Arc::new(BinaryExpr::new(
2490 Arc::new(Column::new("x", 0)),
2491 Operator::Gt,
2492 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2493 )) as Arc<dyn PhysicalExpr>;
2494 let filter = JoinFilter::new(
2495 filter_expression,
2496 column_indices,
2497 Arc::new(intermediate_schema),
2498 );
2499
2500 let join = join_with_filter(
2501 left,
2502 right,
2503 on,
2504 filter,
2505 &JoinType::LeftSemi,
2506 NullEquality::NullEqualsNothing,
2507 )?;
2508
2509 let columns_header = columns(&join.schema());
2510 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2511
2512 let stream = join.execute(0, task_ctx)?;
2513 let batches = common::collect(stream).await?;
2514
2515 allow_duplicates! {
2516 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2517 +----+----+-----+
2518 | a1 | b1 | c1 |
2519 +----+----+-----+
2520 | 13 | 10 | 130 |
2521 +----+----+-----+
2522 "#);
2523 }
2524
2525 Ok(())
2526 }
2527
2528 #[apply(batch_sizes)]
2529 #[tokio::test]
2530 async fn join_right_semi(batch_size: usize) -> Result<()> {
2531 let task_ctx = prepare_task_ctx(batch_size);
2532 let left = build_semi_anti_left_table();
2533 let right = build_semi_anti_right_table();
2534
2535 let on = vec![(
2537 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2538 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2539 )];
2540
2541 let join = join(
2542 left,
2543 right,
2544 on,
2545 &JoinType::RightSemi,
2546 NullEquality::NullEqualsNothing,
2547 )?;
2548
2549 let columns = columns(&join.schema());
2550 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2551
2552 let stream = join.execute(0, task_ctx)?;
2553 let batches = common::collect(stream).await?;
2554
2555 allow_duplicates! {
2557 assert_snapshot!(batches_to_string(&batches), @r#"
2558 +----+----+-----+
2559 | a2 | b2 | c2 |
2560 +----+----+-----+
2561 | 8 | 8 | 20 |
2562 | 12 | 10 | 40 |
2563 | 10 | 10 | 100 |
2564 +----+----+-----+
2565 "#);
2566 }
2567
2568 Ok(())
2569 }
2570
2571 #[apply(batch_sizes)]
2572 #[tokio::test]
2573 async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2574 let task_ctx = prepare_task_ctx(batch_size);
2575 let left = build_semi_anti_left_table();
2576 let right = build_semi_anti_right_table();
2577
2578 let on = vec![(
2580 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2581 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2582 )];
2583
2584 let column_indices = vec![ColumnIndex {
2585 index: 0,
2586 side: JoinSide::Left,
2587 }];
2588 let intermediate_schema =
2589 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2590
2591 let filter_expression = Arc::new(BinaryExpr::new(
2592 Arc::new(Column::new("x", 0)),
2593 Operator::NotEq,
2594 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2595 )) as Arc<dyn PhysicalExpr>;
2596
2597 let filter = JoinFilter::new(
2598 filter_expression,
2599 column_indices.clone(),
2600 Arc::new(intermediate_schema.clone()),
2601 );
2602
2603 let join = join_with_filter(
2604 Arc::clone(&left),
2605 Arc::clone(&right),
2606 on.clone(),
2607 filter,
2608 &JoinType::RightSemi,
2609 NullEquality::NullEqualsNothing,
2610 )?;
2611
2612 let columns = columns(&join.schema());
2613 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2614
2615 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2616 let batches = common::collect(stream).await?;
2617
2618 allow_duplicates! {
2620 assert_snapshot!(batches_to_string(&batches), @r#"
2621 +----+----+-----+
2622 | a2 | b2 | c2 |
2623 +----+----+-----+
2624 | 8 | 8 | 20 |
2625 | 12 | 10 | 40 |
2626 | 10 | 10 | 100 |
2627 +----+----+-----+
2628 "#);
2629 }
2630
2631 let filter_expression = Arc::new(BinaryExpr::new(
2633 Arc::new(Column::new("x", 0)),
2634 Operator::Gt,
2635 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2636 )) as Arc<dyn PhysicalExpr>;
2637
2638 let filter = JoinFilter::new(
2639 filter_expression,
2640 column_indices,
2641 Arc::new(intermediate_schema.clone()),
2642 );
2643
2644 let join = join_with_filter(
2645 left,
2646 right,
2647 on,
2648 filter,
2649 &JoinType::RightSemi,
2650 NullEquality::NullEqualsNothing,
2651 )?;
2652 let stream = join.execute(0, task_ctx)?;
2653 let batches = common::collect(stream).await?;
2654
2655 allow_duplicates! {
2657 assert_snapshot!(batches_to_string(&batches), @r#"
2658 +----+----+-----+
2659 | a2 | b2 | c2 |
2660 +----+----+-----+
2661 | 12 | 10 | 40 |
2662 | 10 | 10 | 100 |
2663 +----+----+-----+
2664 "#);
2665 }
2666
2667 Ok(())
2668 }
2669
2670 #[apply(batch_sizes)]
2671 #[tokio::test]
2672 async fn join_left_anti(batch_size: usize) -> Result<()> {
2673 let task_ctx = prepare_task_ctx(batch_size);
2674 let left = build_semi_anti_left_table();
2675 let right = build_semi_anti_right_table();
2676 let on = vec![(
2678 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2679 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2680 )];
2681
2682 let join = join(
2683 left,
2684 right,
2685 on,
2686 &JoinType::LeftAnti,
2687 NullEquality::NullEqualsNothing,
2688 )?;
2689
2690 let columns = columns(&join.schema());
2691 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2692
2693 let stream = join.execute(0, task_ctx)?;
2694 let batches = common::collect(stream).await?;
2695
2696 allow_duplicates! {
2697 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2698 +----+----+----+
2699 | a1 | b1 | c1 |
2700 +----+----+----+
2701 | 1 | 1 | 10 |
2702 | 3 | 3 | 30 |
2703 | 5 | 5 | 50 |
2704 | 7 | 7 | 70 |
2705 +----+----+----+
2706 "#);
2707 }
2708 Ok(())
2709 }
2710
2711 #[apply(batch_sizes)]
2712 #[tokio::test]
2713 async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2714 let task_ctx = prepare_task_ctx(batch_size);
2715 let left = build_semi_anti_left_table();
2716 let right = build_semi_anti_right_table();
2717 let on = vec![(
2719 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2720 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2721 )];
2722
2723 let column_indices = vec![ColumnIndex {
2724 index: 0,
2725 side: JoinSide::Right,
2726 }];
2727 let intermediate_schema =
2728 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2729 let filter_expression = Arc::new(BinaryExpr::new(
2730 Arc::new(Column::new("x", 0)),
2731 Operator::NotEq,
2732 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2733 )) as Arc<dyn PhysicalExpr>;
2734
2735 let filter = JoinFilter::new(
2736 filter_expression,
2737 column_indices.clone(),
2738 Arc::new(intermediate_schema.clone()),
2739 );
2740
2741 let join = join_with_filter(
2742 Arc::clone(&left),
2743 Arc::clone(&right),
2744 on.clone(),
2745 filter,
2746 &JoinType::LeftAnti,
2747 NullEquality::NullEqualsNothing,
2748 )?;
2749
2750 let columns_header = columns(&join.schema());
2751 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2752
2753 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2754 let batches = common::collect(stream).await?;
2755
2756 allow_duplicates! {
2757 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2758 +----+----+-----+
2759 | a1 | b1 | c1 |
2760 +----+----+-----+
2761 | 1 | 1 | 10 |
2762 | 11 | 8 | 110 |
2763 | 3 | 3 | 30 |
2764 | 5 | 5 | 50 |
2765 | 7 | 7 | 70 |
2766 | 9 | 8 | 90 |
2767 +----+----+-----+
2768 "#);
2769 }
2770
2771 let filter_expression = Arc::new(BinaryExpr::new(
2773 Arc::new(Column::new("x", 0)),
2774 Operator::NotEq,
2775 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2776 )) as Arc<dyn PhysicalExpr>;
2777
2778 let filter = JoinFilter::new(
2779 filter_expression,
2780 column_indices,
2781 Arc::new(intermediate_schema),
2782 );
2783
2784 let join = join_with_filter(
2785 left,
2786 right,
2787 on,
2788 filter,
2789 &JoinType::LeftAnti,
2790 NullEquality::NullEqualsNothing,
2791 )?;
2792
2793 let columns_header = columns(&join.schema());
2794 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2795
2796 let stream = join.execute(0, task_ctx)?;
2797 let batches = common::collect(stream).await?;
2798
2799 allow_duplicates! {
2800 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2801 +----+----+-----+
2802 | a1 | b1 | c1 |
2803 +----+----+-----+
2804 | 1 | 1 | 10 |
2805 | 11 | 8 | 110 |
2806 | 3 | 3 | 30 |
2807 | 5 | 5 | 50 |
2808 | 7 | 7 | 70 |
2809 | 9 | 8 | 90 |
2810 +----+----+-----+
2811 "#);
2812 }
2813
2814 Ok(())
2815 }
2816
2817 #[apply(batch_sizes)]
2818 #[tokio::test]
2819 async fn join_right_anti(batch_size: usize) -> Result<()> {
2820 let task_ctx = prepare_task_ctx(batch_size);
2821 let left = build_semi_anti_left_table();
2822 let right = build_semi_anti_right_table();
2823 let on = vec![(
2824 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2825 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2826 )];
2827
2828 let join = join(
2829 left,
2830 right,
2831 on,
2832 &JoinType::RightAnti,
2833 NullEquality::NullEqualsNothing,
2834 )?;
2835
2836 let columns = columns(&join.schema());
2837 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2838
2839 let stream = join.execute(0, task_ctx)?;
2840 let batches = common::collect(stream).await?;
2841
2842 allow_duplicates! {
2844 assert_snapshot!(batches_to_string(&batches), @r#"
2845 +----+----+-----+
2846 | a2 | b2 | c2 |
2847 +----+----+-----+
2848 | 6 | 6 | 60 |
2849 | 2 | 2 | 80 |
2850 | 4 | 4 | 120 |
2851 +----+----+-----+
2852 "#);
2853 }
2854 Ok(())
2855 }
2856
2857 #[apply(batch_sizes)]
2858 #[tokio::test]
2859 async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
2860 let task_ctx = prepare_task_ctx(batch_size);
2861 let left = build_semi_anti_left_table();
2862 let right = build_semi_anti_right_table();
2863 let on = vec![(
2865 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2866 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2867 )];
2868
2869 let column_indices = vec![ColumnIndex {
2870 index: 0,
2871 side: JoinSide::Left,
2872 }];
2873 let intermediate_schema =
2874 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2875
2876 let filter_expression = Arc::new(BinaryExpr::new(
2877 Arc::new(Column::new("x", 0)),
2878 Operator::NotEq,
2879 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
2880 )) as Arc<dyn PhysicalExpr>;
2881
2882 let filter = JoinFilter::new(
2883 filter_expression,
2884 column_indices,
2885 Arc::new(intermediate_schema.clone()),
2886 );
2887
2888 let join = join_with_filter(
2889 Arc::clone(&left),
2890 Arc::clone(&right),
2891 on.clone(),
2892 filter,
2893 &JoinType::RightAnti,
2894 NullEquality::NullEqualsNothing,
2895 )?;
2896
2897 let columns_header = columns(&join.schema());
2898 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
2899
2900 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2901 let batches = common::collect(stream).await?;
2902
2903 allow_duplicates! {
2905 assert_snapshot!(batches_to_string(&batches), @r#"
2906 +----+----+-----+
2907 | a2 | b2 | c2 |
2908 +----+----+-----+
2909 | 12 | 10 | 40 |
2910 | 6 | 6 | 60 |
2911 | 2 | 2 | 80 |
2912 | 10 | 10 | 100 |
2913 | 4 | 4 | 120 |
2914 +----+----+-----+
2915 "#);
2916 }
2917
2918 let column_indices = vec![ColumnIndex {
2920 index: 1,
2921 side: JoinSide::Right,
2922 }];
2923 let filter_expression = Arc::new(BinaryExpr::new(
2924 Arc::new(Column::new("x", 0)),
2925 Operator::NotEq,
2926 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2927 )) as Arc<dyn PhysicalExpr>;
2928
2929 let filter = JoinFilter::new(
2930 filter_expression,
2931 column_indices,
2932 Arc::new(intermediate_schema),
2933 );
2934
2935 let join = join_with_filter(
2936 left,
2937 right,
2938 on,
2939 filter,
2940 &JoinType::RightAnti,
2941 NullEquality::NullEqualsNothing,
2942 )?;
2943
2944 let columns_header = columns(&join.schema());
2945 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
2946
2947 let stream = join.execute(0, task_ctx)?;
2948 let batches = common::collect(stream).await?;
2949
2950 allow_duplicates! {
2952 assert_snapshot!(batches_to_string(&batches), @r#"
2953 +----+----+-----+
2954 | a2 | b2 | c2 |
2955 +----+----+-----+
2956 | 8 | 8 | 20 |
2957 | 6 | 6 | 60 |
2958 | 2 | 2 | 80 |
2959 | 4 | 4 | 120 |
2960 +----+----+-----+
2961 "#);
2962 }
2963
2964 Ok(())
2965 }
2966
2967 #[apply(batch_sizes)]
2968 #[tokio::test]
2969 async fn join_right_one(batch_size: usize) -> Result<()> {
2970 let task_ctx = prepare_task_ctx(batch_size);
2971 let left = build_table(
2972 ("a1", &vec![1, 2, 3]),
2973 ("b1", &vec![4, 5, 7]),
2974 ("c1", &vec![7, 8, 9]),
2975 );
2976 let right = build_table(
2977 ("a2", &vec![10, 20, 30]),
2978 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
2980 );
2981 let on = vec![(
2982 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2983 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2984 )];
2985
2986 let (columns, batches, metrics) = join_collect(
2987 left,
2988 right,
2989 on,
2990 &JoinType::Right,
2991 NullEquality::NullEqualsNothing,
2992 task_ctx,
2993 )
2994 .await?;
2995
2996 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2997
2998 allow_duplicates! {
2999 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3000 +----+----+----+----+----+----+
3001 | a1 | b1 | c1 | a2 | b1 | c2 |
3002 +----+----+----+----+----+----+
3003 | | | | 30 | 6 | 90 |
3004 | 1 | 4 | 7 | 10 | 4 | 70 |
3005 | 2 | 5 | 8 | 20 | 5 | 80 |
3006 +----+----+----+----+----+----+
3007 "#);
3008 }
3009
3010 assert_join_metrics!(metrics, 3);
3011
3012 Ok(())
3013 }
3014
3015 #[apply(batch_sizes)]
3016 #[tokio::test]
3017 async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3018 let task_ctx = prepare_task_ctx(batch_size);
3019 let left = build_table(
3020 ("a1", &vec![1, 2, 3]),
3021 ("b1", &vec![4, 5, 7]),
3022 ("c1", &vec![7, 8, 9]),
3023 );
3024 let right = build_table(
3025 ("a2", &vec![10, 20, 30]),
3026 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3028 );
3029 let on = vec![(
3030 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3031 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3032 )];
3033
3034 let (columns, batches, metrics) = partitioned_join_collect(
3035 left,
3036 right,
3037 on,
3038 &JoinType::Right,
3039 NullEquality::NullEqualsNothing,
3040 task_ctx,
3041 )
3042 .await?;
3043
3044 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3045
3046 allow_duplicates! {
3047 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3048 +----+----+----+----+----+----+
3049 | a1 | b1 | c1 | a2 | b1 | c2 |
3050 +----+----+----+----+----+----+
3051 | | | | 30 | 6 | 90 |
3052 | 1 | 4 | 7 | 10 | 4 | 70 |
3053 | 2 | 5 | 8 | 20 | 5 | 80 |
3054 +----+----+----+----+----+----+
3055 "#);
3056 }
3057
3058 assert_join_metrics!(metrics, 3);
3059
3060 Ok(())
3061 }
3062
3063 #[apply(batch_sizes)]
3064 #[tokio::test]
3065 async fn join_full_one(batch_size: usize) -> Result<()> {
3066 let task_ctx = prepare_task_ctx(batch_size);
3067 let left = build_table(
3068 ("a1", &vec![1, 2, 3]),
3069 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3071 );
3072 let right = build_table(
3073 ("a2", &vec![10, 20, 30]),
3074 ("b2", &vec![4, 5, 6]),
3075 ("c2", &vec![70, 80, 90]),
3076 );
3077 let on = vec![(
3078 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3079 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3080 )];
3081
3082 let join = join(
3083 left,
3084 right,
3085 on,
3086 &JoinType::Full,
3087 NullEquality::NullEqualsNothing,
3088 )?;
3089
3090 let columns = columns(&join.schema());
3091 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3092
3093 let stream = join.execute(0, task_ctx)?;
3094 let batches = common::collect(stream).await?;
3095
3096 allow_duplicates! {
3097 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3098 +----+----+----+----+----+----+
3099 | a1 | b1 | c1 | a2 | b2 | c2 |
3100 +----+----+----+----+----+----+
3101 | | | | 30 | 6 | 90 |
3102 | 1 | 4 | 7 | 10 | 4 | 70 |
3103 | 2 | 5 | 8 | 20 | 5 | 80 |
3104 | 3 | 7 | 9 | | | |
3105 +----+----+----+----+----+----+
3106 "#);
3107 }
3108
3109 Ok(())
3110 }
3111
3112 #[apply(batch_sizes)]
3113 #[tokio::test]
3114 async fn join_left_mark(batch_size: usize) -> Result<()> {
3115 let task_ctx = prepare_task_ctx(batch_size);
3116 let left = build_table(
3117 ("a1", &vec![1, 2, 3]),
3118 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3120 );
3121 let right = build_table(
3122 ("a2", &vec![10, 20, 30]),
3123 ("b1", &vec![4, 5, 6]),
3124 ("c2", &vec![70, 80, 90]),
3125 );
3126 let on = vec![(
3127 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3128 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3129 )];
3130
3131 let (columns, batches, metrics) = join_collect(
3132 Arc::clone(&left),
3133 Arc::clone(&right),
3134 on.clone(),
3135 &JoinType::LeftMark,
3136 NullEquality::NullEqualsNothing,
3137 task_ctx,
3138 )
3139 .await?;
3140
3141 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3142
3143 allow_duplicates! {
3144 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3145 +----+----+----+-------+
3146 | a1 | b1 | c1 | mark |
3147 +----+----+----+-------+
3148 | 1 | 4 | 7 | true |
3149 | 2 | 5 | 8 | true |
3150 | 3 | 7 | 9 | false |
3151 +----+----+----+-------+
3152 "#);
3153 }
3154
3155 assert_join_metrics!(metrics, 3);
3156
3157 Ok(())
3158 }
3159
3160 #[apply(batch_sizes)]
3161 #[tokio::test]
3162 async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3163 let task_ctx = prepare_task_ctx(batch_size);
3164 let left = build_table(
3165 ("a1", &vec![1, 2, 3]),
3166 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3168 );
3169 let right = build_table(
3170 ("a2", &vec![10, 20, 30, 40]),
3171 ("b1", &vec![4, 4, 5, 6]),
3172 ("c2", &vec![60, 70, 80, 90]),
3173 );
3174 let on = vec![(
3175 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3176 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3177 )];
3178
3179 let (columns, batches, metrics) = partitioned_join_collect(
3180 Arc::clone(&left),
3181 Arc::clone(&right),
3182 on.clone(),
3183 &JoinType::LeftMark,
3184 NullEquality::NullEqualsNothing,
3185 task_ctx,
3186 )
3187 .await?;
3188
3189 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3190
3191 allow_duplicates! {
3192 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3193 +----+----+----+-------+
3194 | a1 | b1 | c1 | mark |
3195 +----+----+----+-------+
3196 | 1 | 4 | 7 | true |
3197 | 2 | 5 | 8 | true |
3198 | 3 | 7 | 9 | false |
3199 +----+----+----+-------+
3200 "#);
3201 }
3202
3203 assert_join_metrics!(metrics, 3);
3204
3205 Ok(())
3206 }
3207
3208 #[apply(batch_sizes)]
3209 #[tokio::test]
3210 async fn join_right_mark(batch_size: usize) -> Result<()> {
3211 let task_ctx = prepare_task_ctx(batch_size);
3212 let left = build_table(
3213 ("a1", &vec![1, 2, 3]),
3214 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3216 );
3217 let right = build_table(
3218 ("a2", &vec![10, 20, 30]),
3219 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3221 );
3222 let on = vec![(
3223 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3224 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3225 )];
3226
3227 let (columns, batches, metrics) = join_collect(
3228 Arc::clone(&left),
3229 Arc::clone(&right),
3230 on.clone(),
3231 &JoinType::RightMark,
3232 NullEquality::NullEqualsNothing,
3233 task_ctx,
3234 )
3235 .await?;
3236
3237 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3238
3239 let expected = [
3240 "+----+----+----+-------+",
3241 "| a2 | b1 | c2 | mark |",
3242 "+----+----+----+-------+",
3243 "| 10 | 4 | 70 | true |",
3244 "| 20 | 5 | 80 | true |",
3245 "| 30 | 6 | 90 | false |",
3246 "+----+----+----+-------+",
3247 ];
3248 assert_batches_sorted_eq!(expected, &batches);
3249
3250 assert_join_metrics!(metrics, 3);
3251
3252 Ok(())
3253 }
3254
3255 #[apply(batch_sizes)]
3256 #[tokio::test]
3257 async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3258 let task_ctx = prepare_task_ctx(batch_size);
3259 let left = build_table(
3260 ("a1", &vec![1, 2, 3]),
3261 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3263 );
3264 let right = build_table(
3265 ("a2", &vec![10, 20, 30, 40]),
3266 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
3268 );
3269 let on = vec![(
3270 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3271 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3272 )];
3273
3274 let (columns, batches, metrics) = partitioned_join_collect(
3275 Arc::clone(&left),
3276 Arc::clone(&right),
3277 on.clone(),
3278 &JoinType::RightMark,
3279 NullEquality::NullEqualsNothing,
3280 task_ctx,
3281 )
3282 .await?;
3283
3284 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3285
3286 let expected = [
3287 "+----+----+----+-------+",
3288 "| a2 | b1 | c2 | mark |",
3289 "+----+----+----+-------+",
3290 "| 10 | 4 | 60 | true |",
3291 "| 20 | 4 | 70 | true |",
3292 "| 30 | 5 | 80 | true |",
3293 "| 40 | 6 | 90 | false |",
3294 "+----+----+----+-------+",
3295 ];
3296 assert_batches_sorted_eq!(expected, &batches);
3297
3298 assert_join_metrics!(metrics, 4);
3299
3300 Ok(())
3301 }
3302
3303 #[test]
3304 fn join_with_hash_collisions_64() -> Result<()> {
3305 let mut hashmap_left = HashTable::with_capacity(4);
3306 let left = build_table_i32(
3307 ("a", &vec![10, 20]),
3308 ("x", &vec![100, 200]),
3309 ("y", &vec![200, 300]),
3310 );
3311
3312 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3313 let hashes_buff = &mut vec![0; left.num_rows()];
3314 let hashes = create_hashes(
3315 &[Arc::clone(&left.columns()[0])],
3316 &random_state,
3317 hashes_buff,
3318 )?;
3319
3320 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3325 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3326
3327 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3328 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3329
3330 let next = vec![2, 0];
3331
3332 let right = build_table_i32(
3333 ("a", &vec![10, 20]),
3334 ("b", &vec![0, 0]),
3335 ("c", &vec![30, 40]),
3336 );
3337
3338 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3340
3341 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3342
3343 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3344 let right_keys_values =
3345 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3346 let mut hashes_buffer = vec![0; right.num_rows()];
3347 create_hashes(
3348 &[Arc::clone(&right_keys_values)],
3349 &random_state,
3350 &mut hashes_buffer,
3351 )?;
3352
3353 let (l, r, _) = lookup_join_hashmap(
3354 &join_hash_map,
3355 &[left_keys_values],
3356 &[right_keys_values],
3357 NullEquality::NullEqualsNothing,
3358 &hashes_buffer,
3359 8192,
3360 (0, None),
3361 )?;
3362
3363 let left_ids: UInt64Array = vec![0, 1].into();
3364
3365 let right_ids: UInt32Array = vec![0, 1].into();
3366
3367 assert_eq!(left_ids, l);
3368
3369 assert_eq!(right_ids, r);
3370
3371 Ok(())
3372 }
3373
3374 #[test]
3375 fn join_with_hash_collisions_u32() -> Result<()> {
3376 let mut hashmap_left = HashTable::with_capacity(4);
3377 let left = build_table_i32(
3378 ("a", &vec![10, 20]),
3379 ("x", &vec![100, 200]),
3380 ("y", &vec![200, 300]),
3381 );
3382
3383 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3384 let hashes_buff = &mut vec![0; left.num_rows()];
3385 let hashes = create_hashes(
3386 &[Arc::clone(&left.columns()[0])],
3387 &random_state,
3388 hashes_buff,
3389 )?;
3390
3391 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3392 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3393 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3394 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3395
3396 let next: Vec<u32> = vec![2, 0];
3397
3398 let right = build_table_i32(
3399 ("a", &vec![10, 20]),
3400 ("b", &vec![0, 0]),
3401 ("c", &vec![30, 40]),
3402 );
3403
3404 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3405
3406 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3407
3408 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3409 let right_keys_values =
3410 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3411 let mut hashes_buffer = vec![0; right.num_rows()];
3412 create_hashes(
3413 &[Arc::clone(&right_keys_values)],
3414 &random_state,
3415 &mut hashes_buffer,
3416 )?;
3417
3418 let (l, r, _) = lookup_join_hashmap(
3419 &join_hash_map,
3420 &[left_keys_values],
3421 &[right_keys_values],
3422 NullEquality::NullEqualsNothing,
3423 &hashes_buffer,
3424 8192,
3425 (0, None),
3426 )?;
3427
3428 let left_ids: UInt64Array = vec![0, 1].into();
3430 let right_ids: UInt32Array = vec![0, 1].into();
3431
3432 assert_eq!(left_ids, l);
3433 assert_eq!(right_ids, r);
3434
3435 Ok(())
3436 }
3437
3438 #[tokio::test]
3439 async fn join_with_duplicated_column_names() -> Result<()> {
3440 let task_ctx = Arc::new(TaskContext::default());
3441 let left = build_table(
3442 ("a", &vec![1, 2, 3]),
3443 ("b", &vec![4, 5, 7]),
3444 ("c", &vec![7, 8, 9]),
3445 );
3446 let right = build_table(
3447 ("a", &vec![10, 20, 30]),
3448 ("b", &vec![1, 2, 7]),
3449 ("c", &vec![70, 80, 90]),
3450 );
3451 let on = vec![(
3452 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3454 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3455 )];
3456
3457 let join = join(
3458 left,
3459 right,
3460 on,
3461 &JoinType::Inner,
3462 NullEquality::NullEqualsNothing,
3463 )?;
3464
3465 let columns = columns(&join.schema());
3466 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3467
3468 let stream = join.execute(0, task_ctx)?;
3469 let batches = common::collect(stream).await?;
3470
3471 allow_duplicates! {
3472 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3473 +---+---+---+----+---+----+
3474 | a | b | c | a | b | c |
3475 +---+---+---+----+---+----+
3476 | 1 | 4 | 7 | 10 | 1 | 70 |
3477 | 2 | 5 | 8 | 20 | 2 | 80 |
3478 +---+---+---+----+---+----+
3479 "#);
3480 }
3481
3482 Ok(())
3483 }
3484
3485 fn prepare_join_filter() -> JoinFilter {
3486 let column_indices = vec![
3487 ColumnIndex {
3488 index: 2,
3489 side: JoinSide::Left,
3490 },
3491 ColumnIndex {
3492 index: 2,
3493 side: JoinSide::Right,
3494 },
3495 ];
3496 let intermediate_schema = Schema::new(vec![
3497 Field::new("c", DataType::Int32, true),
3498 Field::new("c", DataType::Int32, true),
3499 ]);
3500 let filter_expression = Arc::new(BinaryExpr::new(
3501 Arc::new(Column::new("c", 0)),
3502 Operator::Gt,
3503 Arc::new(Column::new("c", 1)),
3504 )) as Arc<dyn PhysicalExpr>;
3505
3506 JoinFilter::new(
3507 filter_expression,
3508 column_indices,
3509 Arc::new(intermediate_schema),
3510 )
3511 }
3512
3513 #[apply(batch_sizes)]
3514 #[tokio::test]
3515 async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3516 let task_ctx = prepare_task_ctx(batch_size);
3517 let left = build_table(
3518 ("a", &vec![0, 1, 2, 2]),
3519 ("b", &vec![4, 5, 7, 8]),
3520 ("c", &vec![7, 8, 9, 1]),
3521 );
3522 let right = build_table(
3523 ("a", &vec![10, 20, 30, 40]),
3524 ("b", &vec![2, 2, 3, 4]),
3525 ("c", &vec![7, 5, 6, 4]),
3526 );
3527 let on = vec![(
3528 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3529 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3530 )];
3531 let filter = prepare_join_filter();
3532
3533 let join = join_with_filter(
3534 left,
3535 right,
3536 on,
3537 filter,
3538 &JoinType::Inner,
3539 NullEquality::NullEqualsNothing,
3540 )?;
3541
3542 let columns = columns(&join.schema());
3543 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3544
3545 let stream = join.execute(0, task_ctx)?;
3546 let batches = common::collect(stream).await?;
3547
3548 allow_duplicates! {
3549 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3550 +---+---+---+----+---+---+
3551 | a | b | c | a | b | c |
3552 +---+---+---+----+---+---+
3553 | 2 | 7 | 9 | 10 | 2 | 7 |
3554 | 2 | 7 | 9 | 20 | 2 | 5 |
3555 +---+---+---+----+---+---+
3556 "#);
3557 }
3558
3559 Ok(())
3560 }
3561
3562 #[apply(batch_sizes)]
3563 #[tokio::test]
3564 async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3565 let task_ctx = prepare_task_ctx(batch_size);
3566 let left = build_table(
3567 ("a", &vec![0, 1, 2, 2]),
3568 ("b", &vec![4, 5, 7, 8]),
3569 ("c", &vec![7, 8, 9, 1]),
3570 );
3571 let right = build_table(
3572 ("a", &vec![10, 20, 30, 40]),
3573 ("b", &vec![2, 2, 3, 4]),
3574 ("c", &vec![7, 5, 6, 4]),
3575 );
3576 let on = vec![(
3577 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3578 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3579 )];
3580 let filter = prepare_join_filter();
3581
3582 let join = join_with_filter(
3583 left,
3584 right,
3585 on,
3586 filter,
3587 &JoinType::Left,
3588 NullEquality::NullEqualsNothing,
3589 )?;
3590
3591 let columns = columns(&join.schema());
3592 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3593
3594 let stream = join.execute(0, task_ctx)?;
3595 let batches = common::collect(stream).await?;
3596
3597 allow_duplicates! {
3598 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3599 +---+---+---+----+---+---+
3600 | a | b | c | a | b | c |
3601 +---+---+---+----+---+---+
3602 | 0 | 4 | 7 | | | |
3603 | 1 | 5 | 8 | | | |
3604 | 2 | 7 | 9 | 10 | 2 | 7 |
3605 | 2 | 7 | 9 | 20 | 2 | 5 |
3606 | 2 | 8 | 1 | | | |
3607 +---+---+---+----+---+---+
3608 "#);
3609 }
3610
3611 Ok(())
3612 }
3613
3614 #[apply(batch_sizes)]
3615 #[tokio::test]
3616 async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3617 let task_ctx = prepare_task_ctx(batch_size);
3618 let left = build_table(
3619 ("a", &vec![0, 1, 2, 2]),
3620 ("b", &vec![4, 5, 7, 8]),
3621 ("c", &vec![7, 8, 9, 1]),
3622 );
3623 let right = build_table(
3624 ("a", &vec![10, 20, 30, 40]),
3625 ("b", &vec![2, 2, 3, 4]),
3626 ("c", &vec![7, 5, 6, 4]),
3627 );
3628 let on = vec![(
3629 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3630 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3631 )];
3632 let filter = prepare_join_filter();
3633
3634 let join = join_with_filter(
3635 left,
3636 right,
3637 on,
3638 filter,
3639 &JoinType::Right,
3640 NullEquality::NullEqualsNothing,
3641 )?;
3642
3643 let columns = columns(&join.schema());
3644 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3645
3646 let stream = join.execute(0, task_ctx)?;
3647 let batches = common::collect(stream).await?;
3648
3649 allow_duplicates! {
3650 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3651 +---+---+---+----+---+---+
3652 | a | b | c | a | b | c |
3653 +---+---+---+----+---+---+
3654 | | | | 30 | 3 | 6 |
3655 | | | | 40 | 4 | 4 |
3656 | 2 | 7 | 9 | 10 | 2 | 7 |
3657 | 2 | 7 | 9 | 20 | 2 | 5 |
3658 +---+---+---+----+---+---+
3659 "#);
3660 }
3661
3662 Ok(())
3663 }
3664
3665 #[apply(batch_sizes)]
3666 #[tokio::test]
3667 async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3668 let task_ctx = prepare_task_ctx(batch_size);
3669 let left = build_table(
3670 ("a", &vec![0, 1, 2, 2]),
3671 ("b", &vec![4, 5, 7, 8]),
3672 ("c", &vec![7, 8, 9, 1]),
3673 );
3674 let right = build_table(
3675 ("a", &vec![10, 20, 30, 40]),
3676 ("b", &vec![2, 2, 3, 4]),
3677 ("c", &vec![7, 5, 6, 4]),
3678 );
3679 let on = vec![(
3680 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3681 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3682 )];
3683 let filter = prepare_join_filter();
3684
3685 let join = join_with_filter(
3686 left,
3687 right,
3688 on,
3689 filter,
3690 &JoinType::Full,
3691 NullEquality::NullEqualsNothing,
3692 )?;
3693
3694 let columns = columns(&join.schema());
3695 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3696
3697 let stream = join.execute(0, task_ctx)?;
3698 let batches = common::collect(stream).await?;
3699
3700 let expected = [
3701 "+---+---+---+----+---+---+",
3702 "| a | b | c | a | b | c |",
3703 "+---+---+---+----+---+---+",
3704 "| | | | 30 | 3 | 6 |",
3705 "| | | | 40 | 4 | 4 |",
3706 "| 2 | 7 | 9 | 10 | 2 | 7 |",
3707 "| 2 | 7 | 9 | 20 | 2 | 5 |",
3708 "| 0 | 4 | 7 | | | |",
3709 "| 1 | 5 | 8 | | | |",
3710 "| 2 | 8 | 1 | | | |",
3711 "+---+---+---+----+---+---+",
3712 ];
3713 assert_batches_sorted_eq!(expected, &batches);
3714
3715 Ok(())
3733 }
3734
3735 #[tokio::test]
3737 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3738 let task_ctx = Arc::new(TaskContext::default());
3739 let left = build_table(
3740 ("a1", &vec![1, 2, 3]),
3741 ("b1", &vec![4, 5, 7]),
3742 ("c1", &vec![7, 8, 9]),
3743 );
3744 let right = build_table(
3745 ("a2", &vec![10, 20, 30]),
3746 ("b2", &vec![4, 5, 6]),
3747 ("c2", &vec![70, 80, 90]),
3748 );
3749 let on = vec![(
3750 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3751 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3752 )];
3753
3754 let expected_inner = vec![
3755 "+----+----+----+----+----+----+",
3756 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3757 "+----+----+----+----+----+----+",
3758 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3759 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3760 "+----+----+----+----+----+----+",
3761 ];
3762 let expected_left = vec![
3763 "+----+----+----+----+----+----+",
3764 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3765 "+----+----+----+----+----+----+",
3766 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3767 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3768 "| 3 | 7 | 9 | | | |",
3769 "+----+----+----+----+----+----+",
3770 ];
3771 let expected_right = vec![
3772 "+----+----+----+----+----+----+",
3773 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3774 "+----+----+----+----+----+----+",
3775 "| | | | 30 | 6 | 90 |",
3776 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3777 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3778 "+----+----+----+----+----+----+",
3779 ];
3780 let expected_full = vec![
3781 "+----+----+----+----+----+----+",
3782 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3783 "+----+----+----+----+----+----+",
3784 "| | | | 30 | 6 | 90 |",
3785 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3786 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3787 "| 3 | 7 | 9 | | | |",
3788 "+----+----+----+----+----+----+",
3789 ];
3790 let expected_left_semi = vec![
3791 "+----+----+----+",
3792 "| a1 | b1 | c1 |",
3793 "+----+----+----+",
3794 "| 1 | 4 | 7 |",
3795 "| 2 | 5 | 8 |",
3796 "+----+----+----+",
3797 ];
3798 let expected_left_anti = vec![
3799 "+----+----+----+",
3800 "| a1 | b1 | c1 |",
3801 "+----+----+----+",
3802 "| 3 | 7 | 9 |",
3803 "+----+----+----+",
3804 ];
3805 let expected_right_semi = vec![
3806 "+----+----+----+",
3807 "| a2 | b2 | c2 |",
3808 "+----+----+----+",
3809 "| 10 | 4 | 70 |",
3810 "| 20 | 5 | 80 |",
3811 "+----+----+----+",
3812 ];
3813 let expected_right_anti = vec![
3814 "+----+----+----+",
3815 "| a2 | b2 | c2 |",
3816 "+----+----+----+",
3817 "| 30 | 6 | 90 |",
3818 "+----+----+----+",
3819 ];
3820 let expected_left_mark = vec![
3821 "+----+----+----+-------+",
3822 "| a1 | b1 | c1 | mark |",
3823 "+----+----+----+-------+",
3824 "| 1 | 4 | 7 | true |",
3825 "| 2 | 5 | 8 | true |",
3826 "| 3 | 7 | 9 | false |",
3827 "+----+----+----+-------+",
3828 ];
3829 let expected_right_mark = vec![
3830 "+----+----+----+-------+",
3831 "| a2 | b2 | c2 | mark |",
3832 "+----+----+----+-------+",
3833 "| 10 | 4 | 70 | true |",
3834 "| 20 | 5 | 80 | true |",
3835 "| 30 | 6 | 90 | false |",
3836 "+----+----+----+-------+",
3837 ];
3838
3839 let test_cases = vec![
3840 (JoinType::Inner, expected_inner),
3841 (JoinType::Left, expected_left),
3842 (JoinType::Right, expected_right),
3843 (JoinType::Full, expected_full),
3844 (JoinType::LeftSemi, expected_left_semi),
3845 (JoinType::LeftAnti, expected_left_anti),
3846 (JoinType::RightSemi, expected_right_semi),
3847 (JoinType::RightAnti, expected_right_anti),
3848 (JoinType::LeftMark, expected_left_mark),
3849 (JoinType::RightMark, expected_right_mark),
3850 ];
3851
3852 for (join_type, expected) in test_cases {
3853 let (_, batches, metrics) = join_collect_with_partition_mode(
3854 Arc::clone(&left),
3855 Arc::clone(&right),
3856 on.clone(),
3857 &join_type,
3858 PartitionMode::CollectLeft,
3859 NullEquality::NullEqualsNothing,
3860 Arc::clone(&task_ctx),
3861 )
3862 .await?;
3863 assert_batches_sorted_eq!(expected, &batches);
3864 assert_join_metrics!(metrics, expected.len() - 4);
3865 }
3866
3867 Ok(())
3868 }
3869
3870 #[tokio::test]
3871 async fn join_date32() -> Result<()> {
3872 let schema = Arc::new(Schema::new(vec![
3873 Field::new("date", DataType::Date32, false),
3874 Field::new("n", DataType::Int32, false),
3875 ]));
3876
3877 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
3878 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
3879 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
3880 let left =
3881 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
3882 .unwrap();
3883 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
3884 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
3885 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
3886 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
3887 let on = vec![(
3888 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
3889 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
3890 )];
3891
3892 let join = join(
3893 left,
3894 right,
3895 on,
3896 &JoinType::Inner,
3897 NullEquality::NullEqualsNothing,
3898 )?;
3899
3900 let task_ctx = Arc::new(TaskContext::default());
3901 let stream = join.execute(0, task_ctx)?;
3902 let batches = common::collect(stream).await?;
3903
3904 allow_duplicates! {
3905 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3906 +------------+---+------------+---+
3907 | date | n | date | n |
3908 +------------+---+------------+---+
3909 | 2022-04-26 | 2 | 2022-04-26 | 4 |
3910 | 2022-04-26 | 2 | 2022-04-26 | 5 |
3911 | 2022-04-27 | 3 | 2022-04-27 | 6 |
3912 +------------+---+------------+---+
3913 "#);
3914 }
3915
3916 Ok(())
3917 }
3918
3919 #[tokio::test]
3920 async fn join_with_error_right() {
3921 let left = build_table(
3922 ("a1", &vec![1, 2, 3]),
3923 ("b1", &vec![4, 5, 7]),
3924 ("c1", &vec![7, 8, 9]),
3925 );
3926
3927 let err = exec_err!("bad data error");
3930 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3931
3932 let on = vec![(
3933 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3934 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3935 )];
3936 let schema = right.schema();
3937 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3938 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
3939
3940 let join_types = vec![
3941 JoinType::Inner,
3942 JoinType::Left,
3943 JoinType::Right,
3944 JoinType::Full,
3945 JoinType::LeftSemi,
3946 JoinType::LeftAnti,
3947 JoinType::RightSemi,
3948 JoinType::RightAnti,
3949 ];
3950
3951 for join_type in join_types {
3952 let join = join(
3953 Arc::clone(&left),
3954 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
3955 on.clone(),
3956 &join_type,
3957 NullEquality::NullEqualsNothing,
3958 )
3959 .unwrap();
3960 let task_ctx = Arc::new(TaskContext::default());
3961
3962 let stream = join.execute(0, task_ctx).unwrap();
3963
3964 let result_string = common::collect(stream).await.unwrap_err().to_string();
3966 assert!(
3967 result_string.contains("bad data error"),
3968 "actual: {result_string}"
3969 );
3970 }
3971 }
3972
3973 #[tokio::test]
3974 async fn join_split_batch() {
3975 let left = build_table(
3976 ("a1", &vec![1, 2, 3, 4]),
3977 ("b1", &vec![1, 1, 1, 1]),
3978 ("c1", &vec![0, 0, 0, 0]),
3979 );
3980 let right = build_table(
3981 ("a2", &vec![10, 20, 30, 40, 50]),
3982 ("b2", &vec![1, 1, 1, 1, 1]),
3983 ("c2", &vec![0, 0, 0, 0, 0]),
3984 );
3985 let on = vec![(
3986 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3987 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3988 )];
3989
3990 let join_types = vec![
3991 JoinType::Inner,
3992 JoinType::Left,
3993 JoinType::Right,
3994 JoinType::Full,
3995 JoinType::RightSemi,
3996 JoinType::RightAnti,
3997 JoinType::LeftSemi,
3998 JoinType::LeftAnti,
3999 ];
4000 let expected_resultset_records = 20;
4001 let common_result = [
4002 "+----+----+----+----+----+----+",
4003 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4004 "+----+----+----+----+----+----+",
4005 "| 1 | 1 | 0 | 10 | 1 | 0 |",
4006 "| 2 | 1 | 0 | 10 | 1 | 0 |",
4007 "| 3 | 1 | 0 | 10 | 1 | 0 |",
4008 "| 4 | 1 | 0 | 10 | 1 | 0 |",
4009 "| 1 | 1 | 0 | 20 | 1 | 0 |",
4010 "| 2 | 1 | 0 | 20 | 1 | 0 |",
4011 "| 3 | 1 | 0 | 20 | 1 | 0 |",
4012 "| 4 | 1 | 0 | 20 | 1 | 0 |",
4013 "| 1 | 1 | 0 | 30 | 1 | 0 |",
4014 "| 2 | 1 | 0 | 30 | 1 | 0 |",
4015 "| 3 | 1 | 0 | 30 | 1 | 0 |",
4016 "| 4 | 1 | 0 | 30 | 1 | 0 |",
4017 "| 1 | 1 | 0 | 40 | 1 | 0 |",
4018 "| 2 | 1 | 0 | 40 | 1 | 0 |",
4019 "| 3 | 1 | 0 | 40 | 1 | 0 |",
4020 "| 4 | 1 | 0 | 40 | 1 | 0 |",
4021 "| 1 | 1 | 0 | 50 | 1 | 0 |",
4022 "| 2 | 1 | 0 | 50 | 1 | 0 |",
4023 "| 3 | 1 | 0 | 50 | 1 | 0 |",
4024 "| 4 | 1 | 0 | 50 | 1 | 0 |",
4025 "+----+----+----+----+----+----+",
4026 ];
4027 let left_batch = [
4028 "+----+----+----+",
4029 "| a1 | b1 | c1 |",
4030 "+----+----+----+",
4031 "| 1 | 1 | 0 |",
4032 "| 2 | 1 | 0 |",
4033 "| 3 | 1 | 0 |",
4034 "| 4 | 1 | 0 |",
4035 "+----+----+----+",
4036 ];
4037 let right_batch = [
4038 "+----+----+----+",
4039 "| a2 | b2 | c2 |",
4040 "+----+----+----+",
4041 "| 10 | 1 | 0 |",
4042 "| 20 | 1 | 0 |",
4043 "| 30 | 1 | 0 |",
4044 "| 40 | 1 | 0 |",
4045 "| 50 | 1 | 0 |",
4046 "+----+----+----+",
4047 ];
4048 let right_empty = [
4049 "+----+----+----+",
4050 "| a2 | b2 | c2 |",
4051 "+----+----+----+",
4052 "+----+----+----+",
4053 ];
4054 let left_empty = [
4055 "+----+----+----+",
4056 "| a1 | b1 | c1 |",
4057 "+----+----+----+",
4058 "+----+----+----+",
4059 ];
4060
4061 for join_type in join_types {
4063 for batch_size in (1..21).rev() {
4064 let task_ctx = prepare_task_ctx(batch_size);
4065
4066 let join = join(
4067 Arc::clone(&left),
4068 Arc::clone(&right),
4069 on.clone(),
4070 &join_type,
4071 NullEquality::NullEqualsNothing,
4072 )
4073 .unwrap();
4074
4075 let stream = join.execute(0, task_ctx).unwrap();
4076 let batches = common::collect(stream).await.unwrap();
4077
4078 let expected_batch_count = match join_type {
4083 JoinType::Inner
4084 | JoinType::Right
4085 | JoinType::RightSemi
4086 | JoinType::RightAnti => {
4087 div_ceil(expected_resultset_records, batch_size)
4088 }
4089 _ => div_ceil(expected_resultset_records, batch_size) + 1,
4090 };
4091 assert_eq!(
4092 batches.len(),
4093 expected_batch_count,
4094 "expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4095 );
4096
4097 let expected = match join_type {
4098 JoinType::RightSemi => right_batch.to_vec(),
4099 JoinType::RightAnti => right_empty.to_vec(),
4100 JoinType::LeftSemi => left_batch.to_vec(),
4101 JoinType::LeftAnti => left_empty.to_vec(),
4102 _ => common_result.to_vec(),
4103 };
4104 assert_batches_eq!(expected, &batches);
4105 }
4106 }
4107 }
4108
4109 #[tokio::test]
4110 async fn single_partition_join_overallocation() -> Result<()> {
4111 let left = build_table(
4112 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4113 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4114 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4115 );
4116 let right = build_table(
4117 ("a2", &vec![10, 11]),
4118 ("b2", &vec![12, 13]),
4119 ("c2", &vec![14, 15]),
4120 );
4121 let on = vec![(
4122 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4123 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4124 )];
4125
4126 let join_types = vec![
4127 JoinType::Inner,
4128 JoinType::Left,
4129 JoinType::Right,
4130 JoinType::Full,
4131 JoinType::LeftSemi,
4132 JoinType::LeftAnti,
4133 JoinType::RightSemi,
4134 JoinType::RightAnti,
4135 JoinType::LeftMark,
4136 JoinType::RightMark,
4137 ];
4138
4139 for join_type in join_types {
4140 let runtime = RuntimeEnvBuilder::new()
4141 .with_memory_limit(100, 1.0)
4142 .build_arc()?;
4143 let task_ctx = TaskContext::default().with_runtime(runtime);
4144 let task_ctx = Arc::new(task_ctx);
4145
4146 let join = join(
4147 Arc::clone(&left),
4148 Arc::clone(&right),
4149 on.clone(),
4150 &join_type,
4151 NullEquality::NullEqualsNothing,
4152 )?;
4153
4154 let stream = join.execute(0, task_ctx)?;
4155 let err = common::collect(stream).await.unwrap_err();
4156
4157 assert_contains!(
4159 err.to_string(),
4160 "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput"
4161 );
4162
4163 assert_contains!(
4164 err.to_string(),
4165 "Failed to allocate additional 120.0 B for HashJoinInput"
4166 );
4167 }
4168
4169 Ok(())
4170 }
4171
4172 #[tokio::test]
4173 async fn partitioned_join_overallocation() -> Result<()> {
4174 let left_batch = build_table_i32(
4177 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4178 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4179 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4180 );
4181 let left = TestMemoryExec::try_new_exec(
4182 &[vec![left_batch.clone()], vec![left_batch.clone()]],
4183 left_batch.schema(),
4184 None,
4185 )
4186 .unwrap();
4187 let right_batch = build_table_i32(
4188 ("a2", &vec![10, 11]),
4189 ("b2", &vec![12, 13]),
4190 ("c2", &vec![14, 15]),
4191 );
4192 let right = TestMemoryExec::try_new_exec(
4193 &[vec![right_batch.clone()], vec![right_batch.clone()]],
4194 right_batch.schema(),
4195 None,
4196 )
4197 .unwrap();
4198 let on = vec![(
4199 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4200 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4201 )];
4202
4203 let join_types = vec![
4204 JoinType::Inner,
4205 JoinType::Left,
4206 JoinType::Right,
4207 JoinType::Full,
4208 JoinType::LeftSemi,
4209 JoinType::LeftAnti,
4210 JoinType::RightSemi,
4211 JoinType::RightAnti,
4212 ];
4213
4214 for join_type in join_types {
4215 let runtime = RuntimeEnvBuilder::new()
4216 .with_memory_limit(100, 1.0)
4217 .build_arc()?;
4218 let session_config = SessionConfig::default().with_batch_size(50);
4219 let task_ctx = TaskContext::default()
4220 .with_session_config(session_config)
4221 .with_runtime(runtime);
4222 let task_ctx = Arc::new(task_ctx);
4223
4224 let join = HashJoinExec::try_new(
4225 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4226 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4227 on.clone(),
4228 None,
4229 &join_type,
4230 None,
4231 PartitionMode::Partitioned,
4232 NullEquality::NullEqualsNothing,
4233 )?;
4234
4235 let stream = join.execute(1, task_ctx)?;
4236 let err = common::collect(stream).await.unwrap_err();
4237
4238 assert_contains!(
4240 err.to_string(),
4241 "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput[1]"
4242
4243 );
4244
4245 assert_contains!(
4246 err.to_string(),
4247 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4248 );
4249 }
4250
4251 Ok(())
4252 }
4253
4254 fn build_table_struct(
4255 struct_name: &str,
4256 field_name_and_values: (&str, &Vec<Option<i32>>),
4257 nulls: Option<NullBuffer>,
4258 ) -> Arc<dyn ExecutionPlan> {
4259 let (field_name, values) = field_name_and_values;
4260 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4261 let schema = Schema::new(vec![Field::new(
4262 struct_name,
4263 DataType::Struct(inner_fields.clone().into()),
4264 nulls.is_some(),
4265 )]);
4266
4267 let batch = RecordBatch::try_new(
4268 Arc::new(schema),
4269 vec![Arc::new(StructArray::new(
4270 inner_fields.into(),
4271 vec![Arc::new(Int32Array::from(values.clone()))],
4272 nulls,
4273 ))],
4274 )
4275 .unwrap();
4276 let schema_ref = batch.schema();
4277 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4278 }
4279
4280 #[tokio::test]
4281 async fn join_on_struct() -> Result<()> {
4282 let task_ctx = Arc::new(TaskContext::default());
4283 let left =
4284 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4285 let right =
4286 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4287 let on = vec![(
4288 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4289 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4290 )];
4291
4292 let (columns, batches, metrics) = join_collect(
4293 left,
4294 right,
4295 on,
4296 &JoinType::Inner,
4297 NullEquality::NullEqualsNothing,
4298 task_ctx,
4299 )
4300 .await?;
4301
4302 assert_eq!(columns, vec!["n1", "n2"]);
4303
4304 allow_duplicates! {
4305 assert_snapshot!(batches_to_string(&batches), @r#"
4306 +--------+--------+
4307 | n1 | n2 |
4308 +--------+--------+
4309 | {a: } | {a: } |
4310 | {a: 1} | {a: 1} |
4311 | {a: 2} | {a: 2} |
4312 +--------+--------+
4313 "#);
4314 }
4315
4316 assert_join_metrics!(metrics, 3);
4317
4318 Ok(())
4319 }
4320
4321 #[tokio::test]
4322 async fn join_on_struct_with_nulls() -> Result<()> {
4323 let task_ctx = Arc::new(TaskContext::default());
4324 let left =
4325 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4326 let right =
4327 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4328 let on = vec![(
4329 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4330 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4331 )];
4332
4333 let (_, batches_null_eq, metrics) = join_collect(
4334 Arc::clone(&left),
4335 Arc::clone(&right),
4336 on.clone(),
4337 &JoinType::Inner,
4338 NullEquality::NullEqualsNull,
4339 Arc::clone(&task_ctx),
4340 )
4341 .await?;
4342
4343 allow_duplicates! {
4344 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#"
4345 +----+----+
4346 | n1 | n2 |
4347 +----+----+
4348 | | |
4349 +----+----+
4350 "#);
4351 }
4352
4353 assert_join_metrics!(metrics, 1);
4354
4355 let (_, batches_null_neq, metrics) = join_collect(
4356 left,
4357 right,
4358 on,
4359 &JoinType::Inner,
4360 NullEquality::NullEqualsNothing,
4361 task_ctx,
4362 )
4363 .await?;
4364
4365 assert_join_metrics!(metrics, 0);
4366
4367 let expected_null_neq =
4368 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4369 assert_batches_eq!(expected_null_neq, &batches_null_neq);
4370
4371 Ok(())
4372 }
4373
4374 fn columns(schema: &Schema) -> Vec<String> {
4376 schema.fields().iter().map(|f| f.name().clone()).collect()
4377 }
4378}