1use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::execution_plan::{boundedness_from_children, EmissionType};
25use crate::filter_pushdown::{
26 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
27 FilterPushdownPropagation,
28};
29use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
30use crate::joins::hash_join::stream::{
31 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
32};
33use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
34use crate::joins::utils::{
35 asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection,
36 update_hash, OnceAsync, OnceFut,
37};
38use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
39use crate::projection::{
40 try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
41 ProjectionExec,
42};
43use crate::spill::get_record_batch_memory_size;
44use crate::ExecutionPlanProperties;
45use crate::{
46 common::can_project,
47 joins::utils::{
48 build_join_schema, check_join_is_valid, estimate_join_statistics,
49 need_produce_result_in_final, symmetric_join_output_partitioning,
50 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
51 },
52 metrics::{ExecutionPlanMetricsSet, MetricsSet},
53 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
54 PlanProperties, SendableRecordBatchStream, Statistics,
55};
56
57use arrow::array::{ArrayRef, BooleanBufferBuilder};
58use arrow::compute::concat_batches;
59use arrow::datatypes::SchemaRef;
60use arrow::record_batch::RecordBatch;
61use arrow::util::bit_util;
62use arrow_schema::DataType;
63use datafusion_common::config::ConfigOptions;
64use datafusion_common::utils::memory::estimate_memory_size;
65use datafusion_common::{
66 internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
67};
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_execution::TaskContext;
70use datafusion_expr::Accumulator;
71use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
72use datafusion_physical_expr::equivalence::{
73 join_equivalence_properties, ProjectionMapping,
74};
75use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
76use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
77
78use ahash::RandomState;
79use datafusion_physical_expr_common::physical_expr::fmt_sql;
80use futures::TryStreamExt;
81use parking_lot::Mutex;
82
83const HASH_JOIN_SEED: RandomState =
85 RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
86
87pub(super) struct JoinLeftData {
89 pub(super) hash_map: Box<dyn JoinHashMapType>,
91 batch: RecordBatch,
93 values: Vec<ArrayRef>,
95 visited_indices_bitmap: SharedBitmapBuilder,
97 probe_threads_counter: AtomicUsize,
100 _reservation: MemoryReservation,
105 pub(super) bounds: Option<Vec<ColumnBounds>>,
107}
108
109impl JoinLeftData {
110 pub(super) fn new(
112 hash_map: Box<dyn JoinHashMapType>,
113 batch: RecordBatch,
114 values: Vec<ArrayRef>,
115 visited_indices_bitmap: SharedBitmapBuilder,
116 probe_threads_counter: AtomicUsize,
117 reservation: MemoryReservation,
118 bounds: Option<Vec<ColumnBounds>>,
119 ) -> Self {
120 Self {
121 hash_map,
122 batch,
123 values,
124 visited_indices_bitmap,
125 probe_threads_counter,
126 _reservation: reservation,
127 bounds,
128 }
129 }
130
131 pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
133 &*self.hash_map
134 }
135
136 pub(super) fn batch(&self) -> &RecordBatch {
138 &self.batch
139 }
140
141 pub(super) fn values(&self) -> &[ArrayRef] {
143 &self.values
144 }
145
146 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
148 &self.visited_indices_bitmap
149 }
150
151 pub(super) fn report_probe_completed(&self) -> bool {
154 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
155 }
156}
157
158#[allow(rustdoc::private_intra_doc_links)]
159pub struct HashJoinExec {
321 pub left: Arc<dyn ExecutionPlan>,
323 pub right: Arc<dyn ExecutionPlan>,
325 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
327 pub filter: Option<JoinFilter>,
329 pub join_type: JoinType,
331 join_schema: SchemaRef,
334 left_fut: Arc<OnceAsync<JoinLeftData>>,
341 random_state: RandomState,
343 pub mode: PartitionMode,
345 metrics: ExecutionPlanMetricsSet,
347 pub projection: Option<Vec<usize>>,
349 column_indices: Vec<ColumnIndex>,
351 pub null_equality: NullEquality,
353 cache: PlanProperties,
355 dynamic_filter: Option<HashJoinExecDynamicFilter>,
359}
360
361#[derive(Clone)]
362struct HashJoinExecDynamicFilter {
363 filter: Arc<DynamicFilterPhysicalExpr>,
365 bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
368}
369
370impl fmt::Debug for HashJoinExec {
371 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
372 f.debug_struct("HashJoinExec")
373 .field("left", &self.left)
374 .field("right", &self.right)
375 .field("on", &self.on)
376 .field("filter", &self.filter)
377 .field("join_type", &self.join_type)
378 .field("join_schema", &self.join_schema)
379 .field("left_fut", &self.left_fut)
380 .field("random_state", &self.random_state)
381 .field("mode", &self.mode)
382 .field("metrics", &self.metrics)
383 .field("projection", &self.projection)
384 .field("column_indices", &self.column_indices)
385 .field("null_equality", &self.null_equality)
386 .field("cache", &self.cache)
387 .finish()
389 }
390}
391
392impl EmbeddedProjection for HashJoinExec {
393 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
394 self.with_projection(projection)
395 }
396}
397
398impl HashJoinExec {
399 #[allow(clippy::too_many_arguments)]
404 pub fn try_new(
405 left: Arc<dyn ExecutionPlan>,
406 right: Arc<dyn ExecutionPlan>,
407 on: JoinOn,
408 filter: Option<JoinFilter>,
409 join_type: &JoinType,
410 projection: Option<Vec<usize>>,
411 partition_mode: PartitionMode,
412 null_equality: NullEquality,
413 ) -> Result<Self> {
414 let left_schema = left.schema();
415 let right_schema = right.schema();
416 if on.is_empty() {
417 return plan_err!("On constraints in HashJoinExec should be non-empty");
418 }
419
420 check_join_is_valid(&left_schema, &right_schema, &on)?;
421
422 let (join_schema, column_indices) =
423 build_join_schema(&left_schema, &right_schema, join_type);
424
425 let random_state = HASH_JOIN_SEED;
426
427 let join_schema = Arc::new(join_schema);
428
429 can_project(&join_schema, projection.as_ref())?;
431
432 let cache = Self::compute_properties(
433 &left,
434 &right,
435 Arc::clone(&join_schema),
436 *join_type,
437 &on,
438 partition_mode,
439 projection.as_ref(),
440 )?;
441
442 Ok(HashJoinExec {
446 left,
447 right,
448 on,
449 filter,
450 join_type: *join_type,
451 join_schema,
452 left_fut: Default::default(),
453 random_state,
454 mode: partition_mode,
455 metrics: ExecutionPlanMetricsSet::new(),
456 projection,
457 column_indices,
458 null_equality,
459 cache,
460 dynamic_filter: None,
461 })
462 }
463
464 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
465 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
468 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
470 }
471
472 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
474 &self.left
475 }
476
477 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
479 &self.right
480 }
481
482 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
484 &self.on
485 }
486
487 pub fn filter(&self) -> Option<&JoinFilter> {
489 self.filter.as_ref()
490 }
491
492 pub fn join_type(&self) -> &JoinType {
494 &self.join_type
495 }
496
497 pub fn join_schema(&self) -> &SchemaRef {
500 &self.join_schema
501 }
502
503 pub fn partition_mode(&self) -> &PartitionMode {
505 &self.mode
506 }
507
508 pub fn null_equality(&self) -> NullEquality {
510 self.null_equality
511 }
512
513 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
515 vec![
516 false,
517 matches!(
518 join_type,
519 JoinType::Inner
520 | JoinType::Right
521 | JoinType::RightAnti
522 | JoinType::RightSemi
523 | JoinType::RightMark
524 ),
525 ]
526 }
527
528 pub fn probe_side() -> JoinSide {
530 JoinSide::Right
532 }
533
534 pub fn contains_projection(&self) -> bool {
536 self.projection.is_some()
537 }
538
539 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
541 can_project(&self.schema(), projection.as_ref())?;
543 let projection = match projection {
544 Some(projection) => match &self.projection {
545 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
546 None => Some(projection),
547 },
548 None => None,
549 };
550 Self::try_new(
551 Arc::clone(&self.left),
552 Arc::clone(&self.right),
553 self.on.clone(),
554 self.filter.clone(),
555 &self.join_type,
556 projection,
557 self.mode,
558 self.null_equality,
559 )
560 }
561
562 fn compute_properties(
564 left: &Arc<dyn ExecutionPlan>,
565 right: &Arc<dyn ExecutionPlan>,
566 schema: SchemaRef,
567 join_type: JoinType,
568 on: JoinOnRef,
569 mode: PartitionMode,
570 projection: Option<&Vec<usize>>,
571 ) -> Result<PlanProperties> {
572 let mut eq_properties = join_equivalence_properties(
574 left.equivalence_properties().clone(),
575 right.equivalence_properties().clone(),
576 &join_type,
577 Arc::clone(&schema),
578 &Self::maintains_input_order(join_type),
579 Some(Self::probe_side()),
580 on,
581 )?;
582
583 let mut output_partitioning = match mode {
584 PartitionMode::CollectLeft => {
585 asymmetric_join_output_partitioning(left, right, &join_type)?
586 }
587 PartitionMode::Auto => Partitioning::UnknownPartitioning(
588 right.output_partitioning().partition_count(),
589 ),
590 PartitionMode::Partitioned => {
591 symmetric_join_output_partitioning(left, right, &join_type)?
592 }
593 };
594
595 let emission_type = if left.boundedness().is_unbounded() {
596 EmissionType::Final
597 } else if right.pipeline_behavior() == EmissionType::Incremental {
598 match join_type {
599 JoinType::Inner
602 | JoinType::LeftSemi
603 | JoinType::RightSemi
604 | JoinType::Right
605 | JoinType::RightAnti
606 | JoinType::RightMark => EmissionType::Incremental,
607 JoinType::Left
610 | JoinType::LeftAnti
611 | JoinType::LeftMark
612 | JoinType::Full => EmissionType::Both,
613 }
614 } else {
615 right.pipeline_behavior()
616 };
617
618 if let Some(projection) = projection {
620 let projection_mapping =
622 ProjectionMapping::from_indices(projection, &schema)?;
623 let out_schema = project_schema(&schema, Some(projection))?;
624 output_partitioning =
625 output_partitioning.project(&projection_mapping, &eq_properties);
626 eq_properties = eq_properties.project(&projection_mapping, out_schema);
627 }
628
629 Ok(PlanProperties::new(
630 eq_properties,
631 output_partitioning,
632 emission_type,
633 boundedness_from_children([left, right]),
634 ))
635 }
636
637 pub fn swap_inputs(
661 &self,
662 partition_mode: PartitionMode,
663 ) -> Result<Arc<dyn ExecutionPlan>> {
664 let left = self.left();
665 let right = self.right();
666 let new_join = HashJoinExec::try_new(
667 Arc::clone(right),
668 Arc::clone(left),
669 self.on()
670 .iter()
671 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
672 .collect(),
673 self.filter().map(JoinFilter::swap),
674 &self.join_type().swap(),
675 swap_join_projection(
676 left.schema().fields().len(),
677 right.schema().fields().len(),
678 self.projection.as_ref(),
679 self.join_type(),
680 ),
681 partition_mode,
682 self.null_equality(),
683 )?;
684 if matches!(
686 self.join_type(),
687 JoinType::LeftSemi
688 | JoinType::RightSemi
689 | JoinType::LeftAnti
690 | JoinType::RightAnti
691 | JoinType::LeftMark
692 | JoinType::RightMark
693 ) || self.projection.is_some()
694 {
695 Ok(Arc::new(new_join))
696 } else {
697 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
698 }
699 }
700}
701
702impl DisplayAs for HashJoinExec {
703 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
704 match t {
705 DisplayFormatType::Default | DisplayFormatType::Verbose => {
706 let display_filter = self.filter.as_ref().map_or_else(
707 || "".to_string(),
708 |f| format!(", filter={}", f.expression()),
709 );
710 let display_projections = if self.contains_projection() {
711 format!(
712 ", projection=[{}]",
713 self.projection
714 .as_ref()
715 .unwrap()
716 .iter()
717 .map(|index| format!(
718 "{}@{}",
719 self.join_schema.fields().get(*index).unwrap().name(),
720 index
721 ))
722 .collect::<Vec<_>>()
723 .join(", ")
724 )
725 } else {
726 "".to_string()
727 };
728 let display_null_equality =
729 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
730 ", NullsEqual: true"
731 } else {
732 ""
733 };
734 let on = self
735 .on
736 .iter()
737 .map(|(c1, c2)| format!("({c1}, {c2})"))
738 .collect::<Vec<String>>()
739 .join(", ");
740 write!(
741 f,
742 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
743 self.mode,
744 self.join_type,
745 on,
746 display_filter,
747 display_projections,
748 display_null_equality,
749 )
750 }
751 DisplayFormatType::TreeRender => {
752 let on = self
753 .on
754 .iter()
755 .map(|(c1, c2)| {
756 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
757 })
758 .collect::<Vec<String>>()
759 .join(", ");
760
761 if *self.join_type() != JoinType::Inner {
762 writeln!(f, "join_type={:?}", self.join_type)?;
763 }
764
765 writeln!(f, "on={on}")?;
766
767 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
768 writeln!(f, "NullsEqual: true")?;
769 }
770
771 if let Some(filter) = self.filter.as_ref() {
772 writeln!(f, "filter={filter}")?;
773 }
774
775 Ok(())
776 }
777 }
778 }
779}
780
781impl ExecutionPlan for HashJoinExec {
782 fn name(&self) -> &'static str {
783 "HashJoinExec"
784 }
785
786 fn as_any(&self) -> &dyn Any {
787 self
788 }
789
790 fn properties(&self) -> &PlanProperties {
791 &self.cache
792 }
793
794 fn required_input_distribution(&self) -> Vec<Distribution> {
795 match self.mode {
796 PartitionMode::CollectLeft => vec![
797 Distribution::SinglePartition,
798 Distribution::UnspecifiedDistribution,
799 ],
800 PartitionMode::Partitioned => {
801 let (left_expr, right_expr) = self
802 .on
803 .iter()
804 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
805 .unzip();
806 vec![
807 Distribution::HashPartitioned(left_expr),
808 Distribution::HashPartitioned(right_expr),
809 ]
810 }
811 PartitionMode::Auto => vec![
812 Distribution::UnspecifiedDistribution,
813 Distribution::UnspecifiedDistribution,
814 ],
815 }
816 }
817
818 fn maintains_input_order(&self) -> Vec<bool> {
835 Self::maintains_input_order(self.join_type)
836 }
837
838 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
839 vec![&self.left, &self.right]
840 }
841
842 fn with_new_children(
848 self: Arc<Self>,
849 children: Vec<Arc<dyn ExecutionPlan>>,
850 ) -> Result<Arc<dyn ExecutionPlan>> {
851 Ok(Arc::new(HashJoinExec {
852 left: Arc::clone(&children[0]),
853 right: Arc::clone(&children[1]),
854 on: self.on.clone(),
855 filter: self.filter.clone(),
856 join_type: self.join_type,
857 join_schema: Arc::clone(&self.join_schema),
858 left_fut: Arc::clone(&self.left_fut),
859 random_state: self.random_state.clone(),
860 mode: self.mode,
861 metrics: ExecutionPlanMetricsSet::new(),
862 projection: self.projection.clone(),
863 column_indices: self.column_indices.clone(),
864 null_equality: self.null_equality,
865 cache: Self::compute_properties(
866 &children[0],
867 &children[1],
868 Arc::clone(&self.join_schema),
869 self.join_type,
870 &self.on,
871 self.mode,
872 self.projection.as_ref(),
873 )?,
874 dynamic_filter: self.dynamic_filter.clone(),
876 }))
877 }
878
879 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
880 Ok(Arc::new(HashJoinExec {
881 left: Arc::clone(&self.left),
882 right: Arc::clone(&self.right),
883 on: self.on.clone(),
884 filter: self.filter.clone(),
885 join_type: self.join_type,
886 join_schema: Arc::clone(&self.join_schema),
887 left_fut: Arc::new(OnceAsync::default()),
889 random_state: self.random_state.clone(),
890 mode: self.mode,
891 metrics: ExecutionPlanMetricsSet::new(),
892 projection: self.projection.clone(),
893 column_indices: self.column_indices.clone(),
894 null_equality: self.null_equality,
895 cache: self.cache.clone(),
896 dynamic_filter: None,
898 }))
899 }
900
901 fn execute(
902 &self,
903 partition: usize,
904 context: Arc<TaskContext>,
905 ) -> Result<SendableRecordBatchStream> {
906 let on_left = self
907 .on
908 .iter()
909 .map(|on| Arc::clone(&on.0))
910 .collect::<Vec<_>>();
911 let left_partitions = self.left.output_partitioning().partition_count();
912 let right_partitions = self.right.output_partitioning().partition_count();
913
914 if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions
915 {
916 return internal_err!(
917 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
918 consider using RepartitionExec"
919 );
920 }
921
922 if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
923 return internal_err!(
924 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
925 consider using CoalescePartitionsExec or the EnforceDistribution rule"
926 );
927 }
928
929 let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
930
931 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
932 let left_fut = match self.mode {
933 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
934 let left_stream = self.left.execute(0, Arc::clone(&context))?;
935
936 let reservation =
937 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
938
939 Ok(collect_left_input(
940 self.random_state.clone(),
941 left_stream,
942 on_left.clone(),
943 join_metrics.clone(),
944 reservation,
945 need_produce_result_in_final(self.join_type),
946 self.right().output_partitioning().partition_count(),
947 enable_dynamic_filter_pushdown,
948 ))
949 })?,
950 PartitionMode::Partitioned => {
951 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
952
953 let reservation =
954 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
955 .register(context.memory_pool());
956
957 OnceFut::new(collect_left_input(
958 self.random_state.clone(),
959 left_stream,
960 on_left.clone(),
961 join_metrics.clone(),
962 reservation,
963 need_produce_result_in_final(self.join_type),
964 1,
965 enable_dynamic_filter_pushdown,
966 ))
967 }
968 PartitionMode::Auto => {
969 return plan_err!(
970 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
971 PartitionMode::Auto
972 );
973 }
974 };
975
976 let batch_size = context.session_config().batch_size();
977
978 let bounds_accumulator = enable_dynamic_filter_pushdown
980 .then(|| {
981 self.dynamic_filter.as_ref().map(|df| {
982 let filter = Arc::clone(&df.filter);
983 let on_right = self
984 .on
985 .iter()
986 .map(|(_, right_expr)| Arc::clone(right_expr))
987 .collect::<Vec<_>>();
988 Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
989 Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
990 self.mode,
991 self.left.as_ref(),
992 self.right.as_ref(),
993 filter,
994 on_right,
995 ))
996 })))
997 })
998 })
999 .flatten()
1000 .flatten();
1001
1002 let right_stream = self.right.execute(partition, context)?;
1005
1006 let column_indices_after_projection = match &self.projection {
1008 Some(projection) => projection
1009 .iter()
1010 .map(|i| self.column_indices[*i].clone())
1011 .collect(),
1012 None => self.column_indices.clone(),
1013 };
1014
1015 let on_right = self
1016 .on
1017 .iter()
1018 .map(|(_, right_expr)| Arc::clone(right_expr))
1019 .collect::<Vec<_>>();
1020
1021 Ok(Box::pin(HashJoinStream::new(
1022 partition,
1023 self.schema(),
1024 on_right,
1025 self.filter.clone(),
1026 self.join_type,
1027 right_stream,
1028 self.random_state.clone(),
1029 join_metrics,
1030 column_indices_after_projection,
1031 self.null_equality,
1032 HashJoinStreamState::WaitBuildSide,
1033 BuildSide::Initial(BuildSideInitialState { left_fut }),
1034 batch_size,
1035 vec![],
1036 self.right.output_ordering().is_some(),
1037 bounds_accumulator,
1038 self.mode,
1039 )))
1040 }
1041
1042 fn metrics(&self) -> Option<MetricsSet> {
1043 Some(self.metrics.clone_inner())
1044 }
1045
1046 fn statistics(&self) -> Result<Statistics> {
1047 self.partition_statistics(None)
1048 }
1049
1050 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1051 if partition.is_some() {
1052 return Ok(Statistics::new_unknown(&self.schema()));
1053 }
1054 let stats = estimate_join_statistics(
1058 self.left.partition_statistics(None)?,
1059 self.right.partition_statistics(None)?,
1060 self.on.clone(),
1061 &self.join_type,
1062 &self.join_schema,
1063 )?;
1064 Ok(stats.project(self.projection.as_ref()))
1066 }
1067
1068 fn try_swapping_with_projection(
1072 &self,
1073 projection: &ProjectionExec,
1074 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1075 if self.contains_projection() {
1077 return Ok(None);
1078 }
1079
1080 if let Some(JoinData {
1081 projected_left_child,
1082 projected_right_child,
1083 join_filter,
1084 join_on,
1085 }) = try_pushdown_through_join(
1086 projection,
1087 self.left(),
1088 self.right(),
1089 self.on(),
1090 self.schema(),
1091 self.filter(),
1092 )? {
1093 Ok(Some(Arc::new(HashJoinExec::try_new(
1094 Arc::new(projected_left_child),
1095 Arc::new(projected_right_child),
1096 join_on,
1097 join_filter,
1098 self.join_type(),
1099 None,
1101 *self.partition_mode(),
1102 self.null_equality,
1103 )?)))
1104 } else {
1105 try_embed_projection(projection, self)
1106 }
1107 }
1108
1109 fn gather_filters_for_pushdown(
1110 &self,
1111 phase: FilterPushdownPhase,
1112 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1113 config: &ConfigOptions,
1114 ) -> Result<FilterDescription> {
1115 if self.join_type != JoinType::Inner {
1120 return Ok(FilterDescription::all_unsupported(
1121 &parent_filters,
1122 &self.children(),
1123 ));
1124 }
1125
1126 let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1128 &parent_filters,
1129 self.left(),
1130 )?;
1131 let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1132 &parent_filters,
1133 self.right(),
1134 )?;
1135
1136 if matches!(phase, FilterPushdownPhase::Post)
1138 && config.optimizer.enable_join_dynamic_filter_pushdown
1139 {
1140 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1142 right_child = right_child.with_self_filter(dynamic_filter);
1143 }
1144
1145 Ok(FilterDescription::new()
1146 .with_child(left_child)
1147 .with_child(right_child))
1148 }
1149
1150 fn handle_child_pushdown_result(
1151 &self,
1152 _phase: FilterPushdownPhase,
1153 child_pushdown_result: ChildPushdownResult,
1154 _config: &ConfigOptions,
1155 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1156 if self.join_type != JoinType::Inner {
1161 return Ok(FilterPushdownPropagation::all_unsupported(
1165 child_pushdown_result,
1166 ));
1167 }
1168
1169 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1170 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1174 let predicate = Arc::clone(&filter.predicate);
1177 if let Ok(dynamic_filter) =
1178 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1179 {
1180 let new_node = Arc::new(HashJoinExec {
1182 left: Arc::clone(&self.left),
1183 right: Arc::clone(&self.right),
1184 on: self.on.clone(),
1185 filter: self.filter.clone(),
1186 join_type: self.join_type,
1187 join_schema: Arc::clone(&self.join_schema),
1188 left_fut: Arc::clone(&self.left_fut),
1189 random_state: self.random_state.clone(),
1190 mode: self.mode,
1191 metrics: ExecutionPlanMetricsSet::new(),
1192 projection: self.projection.clone(),
1193 column_indices: self.column_indices.clone(),
1194 null_equality: self.null_equality,
1195 cache: self.cache.clone(),
1196 dynamic_filter: Some(HashJoinExecDynamicFilter {
1197 filter: dynamic_filter,
1198 bounds_accumulator: OnceLock::new(),
1199 }),
1200 });
1201 result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1202 }
1203 }
1204 Ok(result)
1205 }
1206}
1207
1208struct CollectLeftAccumulator {
1218 expr: Arc<dyn PhysicalExpr>,
1220 min: MinAccumulator,
1222 max: MaxAccumulator,
1224}
1225
1226impl CollectLeftAccumulator {
1227 fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1236 fn dictionary_value_type(data_type: &DataType) -> DataType {
1238 match data_type {
1239 DataType::Dictionary(_, value_type) => {
1240 dictionary_value_type(value_type.as_ref())
1241 }
1242 _ => data_type.clone(),
1243 }
1244 }
1245
1246 let data_type = expr
1247 .data_type(schema)
1248 .map(|dt| dictionary_value_type(&dt))?;
1250 Ok(Self {
1251 expr,
1252 min: MinAccumulator::try_new(&data_type)?,
1253 max: MaxAccumulator::try_new(&data_type)?,
1254 })
1255 }
1256
1257 fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1268 let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1269 self.min.update_batch(std::slice::from_ref(&array))?;
1270 self.max.update_batch(std::slice::from_ref(&array))?;
1271 Ok(())
1272 }
1273
1274 fn evaluate(mut self) -> Result<ColumnBounds> {
1281 Ok(ColumnBounds::new(
1282 self.min.evaluate()?,
1283 self.max.evaluate()?,
1284 ))
1285 }
1286}
1287
1288struct BuildSideState {
1290 batches: Vec<RecordBatch>,
1291 num_rows: usize,
1292 metrics: BuildProbeJoinMetrics,
1293 reservation: MemoryReservation,
1294 bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1295}
1296
1297impl BuildSideState {
1298 fn try_new(
1300 metrics: BuildProbeJoinMetrics,
1301 reservation: MemoryReservation,
1302 on_left: Vec<Arc<dyn PhysicalExpr>>,
1303 schema: &SchemaRef,
1304 should_compute_bounds: bool,
1305 ) -> Result<Self> {
1306 Ok(Self {
1307 batches: Vec::new(),
1308 num_rows: 0,
1309 metrics,
1310 reservation,
1311 bounds_accumulators: should_compute_bounds
1312 .then(|| {
1313 on_left
1314 .iter()
1315 .map(|expr| {
1316 CollectLeftAccumulator::try_new(Arc::clone(expr), schema)
1317 })
1318 .collect::<Result<Vec<_>>>()
1319 })
1320 .transpose()?,
1321 })
1322 }
1323}
1324
1325#[allow(clippy::too_many_arguments)]
1354async fn collect_left_input(
1355 random_state: RandomState,
1356 left_stream: SendableRecordBatchStream,
1357 on_left: Vec<PhysicalExprRef>,
1358 metrics: BuildProbeJoinMetrics,
1359 reservation: MemoryReservation,
1360 with_visited_indices_bitmap: bool,
1361 probe_threads_count: usize,
1362 should_compute_bounds: bool,
1363) -> Result<JoinLeftData> {
1364 let schema = left_stream.schema();
1365
1366 let initial = BuildSideState::try_new(
1370 metrics,
1371 reservation,
1372 on_left.clone(),
1373 &schema,
1374 should_compute_bounds,
1375 )?;
1376
1377 let state = left_stream
1378 .try_fold(initial, |mut state, batch| async move {
1379 if let Some(ref mut accumulators) = state.bounds_accumulators {
1381 for accumulator in accumulators {
1382 accumulator.update_batch(&batch)?;
1383 }
1384 }
1385
1386 let batch_size = get_record_batch_memory_size(&batch);
1388 state.reservation.try_grow(batch_size)?;
1390 state.metrics.build_mem_used.add(batch_size);
1392 state.metrics.build_input_batches.add(1);
1393 state.metrics.build_input_rows.add(batch.num_rows());
1394 state.num_rows += batch.num_rows();
1396 state.batches.push(batch);
1398 Ok(state)
1399 })
1400 .await?;
1401
1402 let BuildSideState {
1404 batches,
1405 num_rows,
1406 metrics,
1407 mut reservation,
1408 bounds_accumulators,
1409 } = state;
1410
1411 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1414 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1415
1416 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1419 let estimated_hashtable_size =
1420 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1421 reservation.try_grow(estimated_hashtable_size)?;
1422 metrics.build_mem_used.add(estimated_hashtable_size);
1423 Box::new(JoinHashMapU64::with_capacity(num_rows))
1424 } else {
1425 let estimated_hashtable_size =
1426 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1427 reservation.try_grow(estimated_hashtable_size)?;
1428 metrics.build_mem_used.add(estimated_hashtable_size);
1429 Box::new(JoinHashMapU32::with_capacity(num_rows))
1430 };
1431
1432 let mut hashes_buffer = Vec::new();
1433 let mut offset = 0;
1434
1435 let batches_iter = batches.iter().rev();
1437 for batch in batches_iter.clone() {
1438 hashes_buffer.clear();
1439 hashes_buffer.resize(batch.num_rows(), 0);
1440 update_hash(
1441 &on_left,
1442 batch,
1443 &mut *hashmap,
1444 offset,
1445 &random_state,
1446 &mut hashes_buffer,
1447 0,
1448 true,
1449 )?;
1450 offset += batch.num_rows();
1451 }
1452 let single_batch = concat_batches(&schema, batches_iter)?;
1454
1455 let visited_indices_bitmap = if with_visited_indices_bitmap {
1457 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1458 reservation.try_grow(bitmap_size)?;
1459 metrics.build_mem_used.add(bitmap_size);
1460
1461 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1462 bitmap_buffer.append_n(num_rows, false);
1463 bitmap_buffer
1464 } else {
1465 BooleanBufferBuilder::new(0)
1466 };
1467
1468 let left_values = on_left
1469 .iter()
1470 .map(|c| {
1471 c.evaluate(&single_batch)?
1472 .into_array(single_batch.num_rows())
1473 })
1474 .collect::<Result<Vec<_>>>()?;
1475
1476 let bounds = match bounds_accumulators {
1478 Some(accumulators) if num_rows > 0 => {
1479 let bounds = accumulators
1480 .into_iter()
1481 .map(CollectLeftAccumulator::evaluate)
1482 .collect::<Result<Vec<_>>>()?;
1483 Some(bounds)
1484 }
1485 _ => None,
1486 };
1487
1488 let data = JoinLeftData::new(
1489 hashmap,
1490 single_batch,
1491 left_values.clone(),
1492 Mutex::new(visited_indices_bitmap),
1493 AtomicUsize::new(probe_threads_count),
1494 reservation,
1495 bounds,
1496 );
1497
1498 Ok(data)
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503 use super::*;
1504 use crate::coalesce_partitions::CoalescePartitionsExec;
1505 use crate::joins::hash_join::stream::lookup_join_hashmap;
1506 use crate::test::{assert_join_metrics, TestMemoryExec};
1507 use crate::{
1508 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1509 test::exec::MockExec,
1510 };
1511
1512 use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1513 use arrow::buffer::NullBuffer;
1514 use arrow::datatypes::{DataType, Field};
1515 use arrow_schema::Schema;
1516 use datafusion_common::hash_utils::create_hashes;
1517 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1518 use datafusion_common::{
1519 assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
1520 ScalarValue,
1521 };
1522 use datafusion_execution::config::SessionConfig;
1523 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1524 use datafusion_expr::Operator;
1525 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1526 use datafusion_physical_expr::PhysicalExpr;
1527 use hashbrown::HashTable;
1528 use insta::{allow_duplicates, assert_snapshot};
1529 use rstest::*;
1530 use rstest_reuse::*;
1531
1532 fn div_ceil(a: usize, b: usize) -> usize {
1533 a.div_ceil(b)
1534 }
1535
1536 #[template]
1537 #[rstest]
1538 fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1539
1540 fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1541 let session_config = SessionConfig::default().with_batch_size(batch_size);
1542 Arc::new(TaskContext::default().with_session_config(session_config))
1543 }
1544
1545 fn build_table(
1546 a: (&str, &Vec<i32>),
1547 b: (&str, &Vec<i32>),
1548 c: (&str, &Vec<i32>),
1549 ) -> Arc<dyn ExecutionPlan> {
1550 let batch = build_table_i32(a, b, c);
1551 let schema = batch.schema();
1552 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1553 }
1554
1555 fn join(
1556 left: Arc<dyn ExecutionPlan>,
1557 right: Arc<dyn ExecutionPlan>,
1558 on: JoinOn,
1559 join_type: &JoinType,
1560 null_equality: NullEquality,
1561 ) -> Result<HashJoinExec> {
1562 HashJoinExec::try_new(
1563 left,
1564 right,
1565 on,
1566 None,
1567 join_type,
1568 None,
1569 PartitionMode::CollectLeft,
1570 null_equality,
1571 )
1572 }
1573
1574 fn join_with_filter(
1575 left: Arc<dyn ExecutionPlan>,
1576 right: Arc<dyn ExecutionPlan>,
1577 on: JoinOn,
1578 filter: JoinFilter,
1579 join_type: &JoinType,
1580 null_equality: NullEquality,
1581 ) -> Result<HashJoinExec> {
1582 HashJoinExec::try_new(
1583 left,
1584 right,
1585 on,
1586 Some(filter),
1587 join_type,
1588 None,
1589 PartitionMode::CollectLeft,
1590 null_equality,
1591 )
1592 }
1593
1594 async fn join_collect(
1595 left: Arc<dyn ExecutionPlan>,
1596 right: Arc<dyn ExecutionPlan>,
1597 on: JoinOn,
1598 join_type: &JoinType,
1599 null_equality: NullEquality,
1600 context: Arc<TaskContext>,
1601 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1602 let join = join(left, right, on, join_type, null_equality)?;
1603 let columns_header = columns(&join.schema());
1604
1605 let stream = join.execute(0, context)?;
1606 let batches = common::collect(stream).await?;
1607 let metrics = join.metrics().unwrap();
1608
1609 Ok((columns_header, batches, metrics))
1610 }
1611
1612 async fn partitioned_join_collect(
1613 left: Arc<dyn ExecutionPlan>,
1614 right: Arc<dyn ExecutionPlan>,
1615 on: JoinOn,
1616 join_type: &JoinType,
1617 null_equality: NullEquality,
1618 context: Arc<TaskContext>,
1619 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1620 join_collect_with_partition_mode(
1621 left,
1622 right,
1623 on,
1624 join_type,
1625 PartitionMode::Partitioned,
1626 null_equality,
1627 context,
1628 )
1629 .await
1630 }
1631
1632 async fn join_collect_with_partition_mode(
1633 left: Arc<dyn ExecutionPlan>,
1634 right: Arc<dyn ExecutionPlan>,
1635 on: JoinOn,
1636 join_type: &JoinType,
1637 partition_mode: PartitionMode,
1638 null_equality: NullEquality,
1639 context: Arc<TaskContext>,
1640 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1641 let partition_count = 4;
1642
1643 let (left_expr, right_expr) = on
1644 .iter()
1645 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1646 .unzip();
1647
1648 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1649 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1650 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1651 left,
1652 Partitioning::Hash(left_expr, partition_count),
1653 )?),
1654 PartitionMode::Auto => {
1655 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1656 }
1657 };
1658
1659 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1660 PartitionMode::CollectLeft => {
1661 let partition_column_name = right.schema().field(0).name().clone();
1662 let partition_expr = vec![Arc::new(Column::new_with_schema(
1663 &partition_column_name,
1664 &right.schema(),
1665 )?) as _];
1666 Arc::new(RepartitionExec::try_new(
1667 right,
1668 Partitioning::Hash(partition_expr, partition_count),
1669 )?) as _
1670 }
1671 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1672 right,
1673 Partitioning::Hash(right_expr, partition_count),
1674 )?),
1675 PartitionMode::Auto => {
1676 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1677 }
1678 };
1679
1680 let join = HashJoinExec::try_new(
1681 left_repartitioned,
1682 right_repartitioned,
1683 on,
1684 None,
1685 join_type,
1686 None,
1687 partition_mode,
1688 null_equality,
1689 )?;
1690
1691 let columns = columns(&join.schema());
1692
1693 let mut batches = vec![];
1694 for i in 0..partition_count {
1695 let stream = join.execute(i, Arc::clone(&context))?;
1696 let more_batches = common::collect(stream).await?;
1697 batches.extend(
1698 more_batches
1699 .into_iter()
1700 .filter(|b| b.num_rows() > 0)
1701 .collect::<Vec<_>>(),
1702 );
1703 }
1704 let metrics = join.metrics().unwrap();
1705
1706 Ok((columns, batches, metrics))
1707 }
1708
1709 #[apply(batch_sizes)]
1710 #[tokio::test]
1711 async fn join_inner_one(batch_size: usize) -> Result<()> {
1712 let task_ctx = prepare_task_ctx(batch_size);
1713 let left = build_table(
1714 ("a1", &vec![1, 2, 3]),
1715 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1717 );
1718 let right = build_table(
1719 ("a2", &vec![10, 20, 30]),
1720 ("b1", &vec![4, 5, 6]),
1721 ("c2", &vec![70, 80, 90]),
1722 );
1723
1724 let on = vec![(
1725 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1726 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1727 )];
1728
1729 let (columns, batches, metrics) = join_collect(
1730 Arc::clone(&left),
1731 Arc::clone(&right),
1732 on.clone(),
1733 &JoinType::Inner,
1734 NullEquality::NullEqualsNothing,
1735 task_ctx,
1736 )
1737 .await?;
1738
1739 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1740
1741 allow_duplicates! {
1742 assert_snapshot!(batches_to_string(&batches), @r#"
1744 +----+----+----+----+----+----+
1745 | a1 | b1 | c1 | a2 | b1 | c2 |
1746 +----+----+----+----+----+----+
1747 | 1 | 4 | 7 | 10 | 4 | 70 |
1748 | 2 | 5 | 8 | 20 | 5 | 80 |
1749 | 3 | 5 | 9 | 20 | 5 | 80 |
1750 +----+----+----+----+----+----+
1751 "#);
1752 }
1753
1754 assert_join_metrics!(metrics, 3);
1755
1756 Ok(())
1757 }
1758
1759 #[apply(batch_sizes)]
1760 #[tokio::test]
1761 async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1762 let task_ctx = prepare_task_ctx(batch_size);
1763 let left = build_table(
1764 ("a1", &vec![1, 2, 3]),
1765 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1767 );
1768 let right = build_table(
1769 ("a2", &vec![10, 20, 30]),
1770 ("b1", &vec![4, 5, 6]),
1771 ("c2", &vec![70, 80, 90]),
1772 );
1773 let on = vec![(
1774 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1775 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1776 )];
1777
1778 let (columns, batches, metrics) = partitioned_join_collect(
1779 Arc::clone(&left),
1780 Arc::clone(&right),
1781 on.clone(),
1782 &JoinType::Inner,
1783 NullEquality::NullEqualsNothing,
1784 task_ctx,
1785 )
1786 .await?;
1787
1788 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1789
1790 allow_duplicates! {
1791 assert_snapshot!(batches_to_sort_string(&batches), @r#"
1792 +----+----+----+----+----+----+
1793 | a1 | b1 | c1 | a2 | b1 | c2 |
1794 +----+----+----+----+----+----+
1795 | 1 | 4 | 7 | 10 | 4 | 70 |
1796 | 2 | 5 | 8 | 20 | 5 | 80 |
1797 | 3 | 5 | 9 | 20 | 5 | 80 |
1798 +----+----+----+----+----+----+
1799 "#);
1800 }
1801
1802 assert_join_metrics!(metrics, 3);
1803
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn join_inner_one_no_shared_column_names() -> Result<()> {
1809 let task_ctx = Arc::new(TaskContext::default());
1810 let left = build_table(
1811 ("a1", &vec![1, 2, 3]),
1812 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1814 );
1815 let right = build_table(
1816 ("a2", &vec![10, 20, 30]),
1817 ("b2", &vec![4, 5, 6]),
1818 ("c2", &vec![70, 80, 90]),
1819 );
1820 let on = vec![(
1821 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1822 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1823 )];
1824
1825 let (columns, batches, metrics) = join_collect(
1826 left,
1827 right,
1828 on,
1829 &JoinType::Inner,
1830 NullEquality::NullEqualsNothing,
1831 task_ctx,
1832 )
1833 .await?;
1834
1835 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1836
1837 allow_duplicates! {
1839 assert_snapshot!(batches_to_string(&batches), @r#"
1840 +----+----+----+----+----+----+
1841 | a1 | b1 | c1 | a2 | b2 | c2 |
1842 +----+----+----+----+----+----+
1843 | 1 | 4 | 7 | 10 | 4 | 70 |
1844 | 2 | 5 | 8 | 20 | 5 | 80 |
1845 | 3 | 5 | 9 | 20 | 5 | 80 |
1846 +----+----+----+----+----+----+
1847 "#);
1848 }
1849
1850 assert_join_metrics!(metrics, 3);
1851
1852 Ok(())
1853 }
1854
1855 #[tokio::test]
1856 async fn join_inner_one_randomly_ordered() -> Result<()> {
1857 let task_ctx = Arc::new(TaskContext::default());
1858 let left = build_table(
1859 ("a1", &vec![0, 3, 2, 1]),
1860 ("b1", &vec![4, 5, 5, 4]),
1861 ("c1", &vec![6, 9, 8, 7]),
1862 );
1863 let right = build_table(
1864 ("a2", &vec![20, 30, 10]),
1865 ("b2", &vec![5, 6, 4]),
1866 ("c2", &vec![80, 90, 70]),
1867 );
1868 let on = vec![(
1869 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1870 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1871 )];
1872
1873 let (columns, batches, metrics) = join_collect(
1874 left,
1875 right,
1876 on,
1877 &JoinType::Inner,
1878 NullEquality::NullEqualsNothing,
1879 task_ctx,
1880 )
1881 .await?;
1882
1883 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1884
1885 allow_duplicates! {
1887 assert_snapshot!(batches_to_string(&batches), @r#"
1888 +----+----+----+----+----+----+
1889 | a1 | b1 | c1 | a2 | b2 | c2 |
1890 +----+----+----+----+----+----+
1891 | 3 | 5 | 9 | 20 | 5 | 80 |
1892 | 2 | 5 | 8 | 20 | 5 | 80 |
1893 | 0 | 4 | 6 | 10 | 4 | 70 |
1894 | 1 | 4 | 7 | 10 | 4 | 70 |
1895 +----+----+----+----+----+----+
1896 "#);
1897 }
1898
1899 assert_join_metrics!(metrics, 4);
1900
1901 Ok(())
1902 }
1903
1904 #[apply(batch_sizes)]
1905 #[tokio::test]
1906 async fn join_inner_two(batch_size: usize) -> Result<()> {
1907 let task_ctx = prepare_task_ctx(batch_size);
1908 let left = build_table(
1909 ("a1", &vec![1, 2, 2]),
1910 ("b2", &vec![1, 2, 2]),
1911 ("c1", &vec![7, 8, 9]),
1912 );
1913 let right = build_table(
1914 ("a1", &vec![1, 2, 3]),
1915 ("b2", &vec![1, 2, 2]),
1916 ("c2", &vec![70, 80, 90]),
1917 );
1918 let on = vec![
1919 (
1920 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1921 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1922 ),
1923 (
1924 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1925 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1926 ),
1927 ];
1928
1929 let (columns, batches, metrics) = join_collect(
1930 left,
1931 right,
1932 on,
1933 &JoinType::Inner,
1934 NullEquality::NullEqualsNothing,
1935 task_ctx,
1936 )
1937 .await?;
1938
1939 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1940
1941 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1942 let mut expected_batch_count = div_ceil(3, batch_size);
1945 if batch_size == 1 {
1946 expected_batch_count += 1;
1947 }
1948 expected_batch_count
1949 } else {
1950 div_ceil(9, batch_size)
1953 };
1954
1955 assert_eq!(batches.len(), expected_batch_count);
1956
1957 allow_duplicates! {
1959 assert_snapshot!(batches_to_string(&batches), @r#"
1960 +----+----+----+----+----+----+
1961 | a1 | b2 | c1 | a1 | b2 | c2 |
1962 +----+----+----+----+----+----+
1963 | 1 | 1 | 7 | 1 | 1 | 70 |
1964 | 2 | 2 | 8 | 2 | 2 | 80 |
1965 | 2 | 2 | 9 | 2 | 2 | 80 |
1966 +----+----+----+----+----+----+
1967 "#);
1968 }
1969
1970 assert_join_metrics!(metrics, 3);
1971
1972 Ok(())
1973 }
1974
1975 #[apply(batch_sizes)]
1977 #[tokio::test]
1978 async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
1979 let task_ctx = prepare_task_ctx(batch_size);
1980 let batch1 = build_table_i32(
1981 ("a1", &vec![1, 2]),
1982 ("b2", &vec![1, 2]),
1983 ("c1", &vec![7, 8]),
1984 );
1985 let batch2 =
1986 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
1987 let schema = batch1.schema();
1988 let left =
1989 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1990 .unwrap();
1991 let left = Arc::new(CoalescePartitionsExec::new(left));
1992
1993 let right = build_table(
1994 ("a1", &vec![1, 2, 3]),
1995 ("b2", &vec![1, 2, 2]),
1996 ("c2", &vec![70, 80, 90]),
1997 );
1998 let on = vec![
1999 (
2000 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2001 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2002 ),
2003 (
2004 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2005 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2006 ),
2007 ];
2008
2009 let (columns, batches, metrics) = join_collect(
2010 left,
2011 right,
2012 on,
2013 &JoinType::Inner,
2014 NullEquality::NullEqualsNothing,
2015 task_ctx,
2016 )
2017 .await?;
2018
2019 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2020
2021 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2022 let mut expected_batch_count = div_ceil(3, batch_size);
2025 if batch_size == 1 {
2026 expected_batch_count += 1;
2027 }
2028 expected_batch_count
2029 } else {
2030 div_ceil(9, batch_size)
2033 };
2034
2035 assert_eq!(batches.len(), expected_batch_count);
2036
2037 allow_duplicates! {
2039 assert_snapshot!(batches_to_string(&batches), @r#"
2040 +----+----+----+----+----+----+
2041 | a1 | b2 | c1 | a1 | b2 | c2 |
2042 +----+----+----+----+----+----+
2043 | 1 | 1 | 7 | 1 | 1 | 70 |
2044 | 2 | 2 | 8 | 2 | 2 | 80 |
2045 | 2 | 2 | 9 | 2 | 2 | 80 |
2046 +----+----+----+----+----+----+
2047 "#);
2048 }
2049
2050 assert_join_metrics!(metrics, 3);
2051
2052 Ok(())
2053 }
2054
2055 #[tokio::test]
2056 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2057 let task_ctx = Arc::new(TaskContext::default());
2058 let batch1 = build_table_i32(
2059 ("a1", &vec![0, 3]),
2060 ("b1", &vec![4, 5]),
2061 ("c1", &vec![6, 9]),
2062 );
2063 let batch2 = build_table_i32(
2064 ("a1", &vec![2, 1]),
2065 ("b1", &vec![5, 4]),
2066 ("c1", &vec![8, 7]),
2067 );
2068 let schema = batch1.schema();
2069
2070 let left =
2071 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2072 .unwrap();
2073 let left = Arc::new(CoalescePartitionsExec::new(left));
2074 let right = build_table(
2075 ("a2", &vec![20, 30, 10]),
2076 ("b2", &vec![5, 6, 4]),
2077 ("c2", &vec![80, 90, 70]),
2078 );
2079 let on = vec![(
2080 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2081 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2082 )];
2083
2084 let (columns, batches, metrics) = join_collect(
2085 left,
2086 right,
2087 on,
2088 &JoinType::Inner,
2089 NullEquality::NullEqualsNothing,
2090 task_ctx,
2091 )
2092 .await?;
2093
2094 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2095
2096 allow_duplicates! {
2098 assert_snapshot!(batches_to_string(&batches), @r#"
2099 +----+----+----+----+----+----+
2100 | a1 | b1 | c1 | a2 | b2 | c2 |
2101 +----+----+----+----+----+----+
2102 | 3 | 5 | 9 | 20 | 5 | 80 |
2103 | 2 | 5 | 8 | 20 | 5 | 80 |
2104 | 0 | 4 | 6 | 10 | 4 | 70 |
2105 | 1 | 4 | 7 | 10 | 4 | 70 |
2106 +----+----+----+----+----+----+
2107 "#);
2108 }
2109
2110 assert_join_metrics!(metrics, 4);
2111
2112 Ok(())
2113 }
2114
2115 #[apply(batch_sizes)]
2117 #[tokio::test]
2118 async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
2119 let task_ctx = prepare_task_ctx(batch_size);
2120 let left = build_table(
2121 ("a1", &vec![1, 2, 3]),
2122 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2124 );
2125
2126 let batch1 = build_table_i32(
2127 ("a2", &vec![10, 20]),
2128 ("b1", &vec![4, 6]),
2129 ("c2", &vec![70, 80]),
2130 );
2131 let batch2 =
2132 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2133 let schema = batch1.schema();
2134 let right =
2135 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2136 .unwrap();
2137
2138 let on = vec![(
2139 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2140 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2141 )];
2142
2143 let join = join(
2144 left,
2145 right,
2146 on,
2147 &JoinType::Inner,
2148 NullEquality::NullEqualsNothing,
2149 )?;
2150
2151 let columns = columns(&join.schema());
2152 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2153
2154 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2156 let batches = common::collect(stream).await?;
2157
2158 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2159 let mut expected_batch_count = div_ceil(1, batch_size);
2162 if batch_size == 1 {
2163 expected_batch_count += 1;
2164 }
2165 expected_batch_count
2166 } else {
2167 div_ceil(6, batch_size)
2170 };
2171 assert_eq!(batches.len(), expected_batch_count);
2172
2173 allow_duplicates! {
2175 assert_snapshot!(batches_to_string(&batches), @r#"
2176 +----+----+----+----+----+----+
2177 | a1 | b1 | c1 | a2 | b1 | c2 |
2178 +----+----+----+----+----+----+
2179 | 1 | 4 | 7 | 10 | 4 | 70 |
2180 +----+----+----+----+----+----+
2181 "#);
2182 }
2183
2184 let stream = join.execute(1, Arc::clone(&task_ctx))?;
2186 let batches = common::collect(stream).await?;
2187
2188 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2189 div_ceil(2, batch_size)
2191 } else {
2192 div_ceil(3, batch_size)
2195 };
2196 assert_eq!(batches.len(), expected_batch_count);
2197
2198 allow_duplicates! {
2200 assert_snapshot!(batches_to_string(&batches), @r#"
2201 +----+----+----+----+----+----+
2202 | a1 | b1 | c1 | a2 | b1 | c2 |
2203 +----+----+----+----+----+----+
2204 | 2 | 5 | 8 | 30 | 5 | 90 |
2205 | 3 | 5 | 9 | 30 | 5 | 90 |
2206 +----+----+----+----+----+----+
2207 "#);
2208 }
2209
2210 Ok(())
2211 }
2212
2213 fn build_table_two_batches(
2214 a: (&str, &Vec<i32>),
2215 b: (&str, &Vec<i32>),
2216 c: (&str, &Vec<i32>),
2217 ) -> Arc<dyn ExecutionPlan> {
2218 let batch = build_table_i32(a, b, c);
2219 let schema = batch.schema();
2220 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2221 }
2222
2223 #[apply(batch_sizes)]
2224 #[tokio::test]
2225 async fn join_left_multi_batch(batch_size: usize) {
2226 let task_ctx = prepare_task_ctx(batch_size);
2227 let left = build_table(
2228 ("a1", &vec![1, 2, 3]),
2229 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2231 );
2232 let right = build_table_two_batches(
2233 ("a2", &vec![10, 20, 30]),
2234 ("b1", &vec![4, 5, 6]),
2235 ("c2", &vec![70, 80, 90]),
2236 );
2237 let on = vec![(
2238 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2239 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2240 )];
2241
2242 let join = join(
2243 left,
2244 right,
2245 on,
2246 &JoinType::Left,
2247 NullEquality::NullEqualsNothing,
2248 )
2249 .unwrap();
2250
2251 let columns = columns(&join.schema());
2252 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2253
2254 let stream = join.execute(0, task_ctx).unwrap();
2255 let batches = common::collect(stream).await.unwrap();
2256
2257 allow_duplicates! {
2258 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2259 +----+----+----+----+----+----+
2260 | a1 | b1 | c1 | a2 | b1 | c2 |
2261 +----+----+----+----+----+----+
2262 | 1 | 4 | 7 | 10 | 4 | 70 |
2263 | 1 | 4 | 7 | 10 | 4 | 70 |
2264 | 2 | 5 | 8 | 20 | 5 | 80 |
2265 | 2 | 5 | 8 | 20 | 5 | 80 |
2266 | 3 | 7 | 9 | | | |
2267 +----+----+----+----+----+----+
2268 "#);
2269 }
2270 }
2271
2272 #[apply(batch_sizes)]
2273 #[tokio::test]
2274 async fn join_full_multi_batch(batch_size: usize) {
2275 let task_ctx = prepare_task_ctx(batch_size);
2276 let left = build_table(
2277 ("a1", &vec![1, 2, 3]),
2278 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2280 );
2281 let right = build_table_two_batches(
2283 ("a2", &vec![10, 20, 30]),
2284 ("b2", &vec![4, 5, 6]),
2285 ("c2", &vec![70, 80, 90]),
2286 );
2287 let on = vec![(
2288 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2289 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2290 )];
2291
2292 let join = join(
2293 left,
2294 right,
2295 on,
2296 &JoinType::Full,
2297 NullEquality::NullEqualsNothing,
2298 )
2299 .unwrap();
2300
2301 let columns = columns(&join.schema());
2302 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2303
2304 let stream = join.execute(0, task_ctx).unwrap();
2305 let batches = common::collect(stream).await.unwrap();
2306
2307 allow_duplicates! {
2308 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2309 +----+----+----+----+----+----+
2310 | a1 | b1 | c1 | a2 | b2 | c2 |
2311 +----+----+----+----+----+----+
2312 | | | | 30 | 6 | 90 |
2313 | | | | 30 | 6 | 90 |
2314 | 1 | 4 | 7 | 10 | 4 | 70 |
2315 | 1 | 4 | 7 | 10 | 4 | 70 |
2316 | 2 | 5 | 8 | 20 | 5 | 80 |
2317 | 2 | 5 | 8 | 20 | 5 | 80 |
2318 | 3 | 7 | 9 | | | |
2319 +----+----+----+----+----+----+
2320 "#);
2321 }
2322 }
2323
2324 #[apply(batch_sizes)]
2325 #[tokio::test]
2326 async fn join_left_empty_right(batch_size: usize) {
2327 let task_ctx = prepare_task_ctx(batch_size);
2328 let left = build_table(
2329 ("a1", &vec![1, 2, 3]),
2330 ("b1", &vec![4, 5, 7]),
2331 ("c1", &vec![7, 8, 9]),
2332 );
2333 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2334 let on = vec![(
2335 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2336 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2337 )];
2338 let schema = right.schema();
2339 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2340 let join = join(
2341 left,
2342 right,
2343 on,
2344 &JoinType::Left,
2345 NullEquality::NullEqualsNothing,
2346 )
2347 .unwrap();
2348
2349 let columns = columns(&join.schema());
2350 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2351
2352 let stream = join.execute(0, task_ctx).unwrap();
2353 let batches = common::collect(stream).await.unwrap();
2354
2355 allow_duplicates! {
2356 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2357 +----+----+----+----+----+----+
2358 | a1 | b1 | c1 | a2 | b1 | c2 |
2359 +----+----+----+----+----+----+
2360 | 1 | 4 | 7 | | | |
2361 | 2 | 5 | 8 | | | |
2362 | 3 | 7 | 9 | | | |
2363 +----+----+----+----+----+----+
2364 "#);
2365 }
2366 }
2367
2368 #[apply(batch_sizes)]
2369 #[tokio::test]
2370 async fn join_full_empty_right(batch_size: usize) {
2371 let task_ctx = prepare_task_ctx(batch_size);
2372 let left = build_table(
2373 ("a1", &vec![1, 2, 3]),
2374 ("b1", &vec![4, 5, 7]),
2375 ("c1", &vec![7, 8, 9]),
2376 );
2377 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2378 let on = vec![(
2379 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2380 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2381 )];
2382 let schema = right.schema();
2383 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2384 let join = join(
2385 left,
2386 right,
2387 on,
2388 &JoinType::Full,
2389 NullEquality::NullEqualsNothing,
2390 )
2391 .unwrap();
2392
2393 let columns = columns(&join.schema());
2394 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2395
2396 let stream = join.execute(0, task_ctx).unwrap();
2397 let batches = common::collect(stream).await.unwrap();
2398
2399 allow_duplicates! {
2400 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2401 +----+----+----+----+----+----+
2402 | a1 | b1 | c1 | a2 | b2 | c2 |
2403 +----+----+----+----+----+----+
2404 | 1 | 4 | 7 | | | |
2405 | 2 | 5 | 8 | | | |
2406 | 3 | 7 | 9 | | | |
2407 +----+----+----+----+----+----+
2408 "#);
2409 }
2410 }
2411
2412 #[apply(batch_sizes)]
2413 #[tokio::test]
2414 async fn join_left_one(batch_size: usize) -> Result<()> {
2415 let task_ctx = prepare_task_ctx(batch_size);
2416 let left = build_table(
2417 ("a1", &vec![1, 2, 3]),
2418 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2420 );
2421 let right = build_table(
2422 ("a2", &vec![10, 20, 30]),
2423 ("b1", &vec![4, 5, 6]),
2424 ("c2", &vec![70, 80, 90]),
2425 );
2426 let on = vec![(
2427 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2428 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2429 )];
2430
2431 let (columns, batches, metrics) = join_collect(
2432 Arc::clone(&left),
2433 Arc::clone(&right),
2434 on.clone(),
2435 &JoinType::Left,
2436 NullEquality::NullEqualsNothing,
2437 task_ctx,
2438 )
2439 .await?;
2440
2441 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2442
2443 allow_duplicates! {
2444 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2445 +----+----+----+----+----+----+
2446 | a1 | b1 | c1 | a2 | b1 | c2 |
2447 +----+----+----+----+----+----+
2448 | 1 | 4 | 7 | 10 | 4 | 70 |
2449 | 2 | 5 | 8 | 20 | 5 | 80 |
2450 | 3 | 7 | 9 | | | |
2451 +----+----+----+----+----+----+
2452 "#);
2453 }
2454
2455 assert_join_metrics!(metrics, 3);
2456
2457 Ok(())
2458 }
2459
2460 #[apply(batch_sizes)]
2461 #[tokio::test]
2462 async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2463 let task_ctx = prepare_task_ctx(batch_size);
2464 let left = build_table(
2465 ("a1", &vec![1, 2, 3]),
2466 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2468 );
2469 let right = build_table(
2470 ("a2", &vec![10, 20, 30]),
2471 ("b1", &vec![4, 5, 6]),
2472 ("c2", &vec![70, 80, 90]),
2473 );
2474 let on = vec![(
2475 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2476 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2477 )];
2478
2479 let (columns, batches, metrics) = partitioned_join_collect(
2480 Arc::clone(&left),
2481 Arc::clone(&right),
2482 on.clone(),
2483 &JoinType::Left,
2484 NullEquality::NullEqualsNothing,
2485 task_ctx,
2486 )
2487 .await?;
2488
2489 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2490
2491 allow_duplicates! {
2492 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2493 +----+----+----+----+----+----+
2494 | a1 | b1 | c1 | a2 | b1 | c2 |
2495 +----+----+----+----+----+----+
2496 | 1 | 4 | 7 | 10 | 4 | 70 |
2497 | 2 | 5 | 8 | 20 | 5 | 80 |
2498 | 3 | 7 | 9 | | | |
2499 +----+----+----+----+----+----+
2500 "#);
2501 }
2502
2503 assert_join_metrics!(metrics, 3);
2504
2505 Ok(())
2506 }
2507
2508 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2509 build_table(
2512 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2513 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2514 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2515 )
2516 }
2517
2518 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2519 build_table(
2522 ("a2", &vec![8, 12, 6, 2, 10, 4]),
2523 ("b2", &vec![8, 10, 6, 2, 10, 4]),
2524 ("c2", &vec![20, 40, 60, 80, 100, 120]),
2525 )
2526 }
2527
2528 #[apply(batch_sizes)]
2529 #[tokio::test]
2530 async fn join_left_semi(batch_size: usize) -> Result<()> {
2531 let task_ctx = prepare_task_ctx(batch_size);
2532 let left = build_semi_anti_left_table();
2533 let right = build_semi_anti_right_table();
2534 let on = vec![(
2536 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2537 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2538 )];
2539
2540 let join = join(
2541 left,
2542 right,
2543 on,
2544 &JoinType::LeftSemi,
2545 NullEquality::NullEqualsNothing,
2546 )?;
2547
2548 let columns = columns(&join.schema());
2549 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2550
2551 let stream = join.execute(0, task_ctx)?;
2552 let batches = common::collect(stream).await?;
2553
2554 allow_duplicates! {
2556 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2557 +----+----+-----+
2558 | a1 | b1 | c1 |
2559 +----+----+-----+
2560 | 11 | 8 | 110 |
2561 | 13 | 10 | 130 |
2562 | 9 | 8 | 90 |
2563 +----+----+-----+
2564 "#);
2565 }
2566
2567 Ok(())
2568 }
2569
2570 #[apply(batch_sizes)]
2571 #[tokio::test]
2572 async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2573 let task_ctx = prepare_task_ctx(batch_size);
2574 let left = build_semi_anti_left_table();
2575 let right = build_semi_anti_right_table();
2576
2577 let on = vec![(
2579 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2580 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2581 )];
2582
2583 let column_indices = vec![ColumnIndex {
2584 index: 0,
2585 side: JoinSide::Right,
2586 }];
2587 let intermediate_schema =
2588 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2589
2590 let filter_expression = Arc::new(BinaryExpr::new(
2591 Arc::new(Column::new("x", 0)),
2592 Operator::NotEq,
2593 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2594 )) as Arc<dyn PhysicalExpr>;
2595
2596 let filter = JoinFilter::new(
2597 filter_expression,
2598 column_indices.clone(),
2599 Arc::new(intermediate_schema.clone()),
2600 );
2601
2602 let join = join_with_filter(
2603 Arc::clone(&left),
2604 Arc::clone(&right),
2605 on.clone(),
2606 filter,
2607 &JoinType::LeftSemi,
2608 NullEquality::NullEqualsNothing,
2609 )?;
2610
2611 let columns_header = columns(&join.schema());
2612 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2613
2614 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2615 let batches = common::collect(stream).await?;
2616
2617 allow_duplicates! {
2618 assert_snapshot!(batches_to_sort_string(&batches), @r"
2619 +----+----+-----+
2620 | a1 | b1 | c1 |
2621 +----+----+-----+
2622 | 11 | 8 | 110 |
2623 | 13 | 10 | 130 |
2624 | 9 | 8 | 90 |
2625 +----+----+-----+
2626 ");
2627 }
2628
2629 let filter_expression = Arc::new(BinaryExpr::new(
2631 Arc::new(Column::new("x", 0)),
2632 Operator::Gt,
2633 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2634 )) as Arc<dyn PhysicalExpr>;
2635 let filter = JoinFilter::new(
2636 filter_expression,
2637 column_indices,
2638 Arc::new(intermediate_schema),
2639 );
2640
2641 let join = join_with_filter(
2642 left,
2643 right,
2644 on,
2645 filter,
2646 &JoinType::LeftSemi,
2647 NullEquality::NullEqualsNothing,
2648 )?;
2649
2650 let columns_header = columns(&join.schema());
2651 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2652
2653 let stream = join.execute(0, task_ctx)?;
2654 let batches = common::collect(stream).await?;
2655
2656 allow_duplicates! {
2657 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2658 +----+----+-----+
2659 | a1 | b1 | c1 |
2660 +----+----+-----+
2661 | 13 | 10 | 130 |
2662 +----+----+-----+
2663 "#);
2664 }
2665
2666 Ok(())
2667 }
2668
2669 #[apply(batch_sizes)]
2670 #[tokio::test]
2671 async fn join_right_semi(batch_size: usize) -> Result<()> {
2672 let task_ctx = prepare_task_ctx(batch_size);
2673 let left = build_semi_anti_left_table();
2674 let right = build_semi_anti_right_table();
2675
2676 let on = vec![(
2678 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2679 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2680 )];
2681
2682 let join = join(
2683 left,
2684 right,
2685 on,
2686 &JoinType::RightSemi,
2687 NullEquality::NullEqualsNothing,
2688 )?;
2689
2690 let columns = columns(&join.schema());
2691 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2692
2693 let stream = join.execute(0, task_ctx)?;
2694 let batches = common::collect(stream).await?;
2695
2696 allow_duplicates! {
2698 assert_snapshot!(batches_to_string(&batches), @r#"
2699 +----+----+-----+
2700 | a2 | b2 | c2 |
2701 +----+----+-----+
2702 | 8 | 8 | 20 |
2703 | 12 | 10 | 40 |
2704 | 10 | 10 | 100 |
2705 +----+----+-----+
2706 "#);
2707 }
2708
2709 Ok(())
2710 }
2711
2712 #[apply(batch_sizes)]
2713 #[tokio::test]
2714 async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2715 let task_ctx = prepare_task_ctx(batch_size);
2716 let left = build_semi_anti_left_table();
2717 let right = build_semi_anti_right_table();
2718
2719 let on = vec![(
2721 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2722 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2723 )];
2724
2725 let column_indices = vec![ColumnIndex {
2726 index: 0,
2727 side: JoinSide::Left,
2728 }];
2729 let intermediate_schema =
2730 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2731
2732 let filter_expression = Arc::new(BinaryExpr::new(
2733 Arc::new(Column::new("x", 0)),
2734 Operator::NotEq,
2735 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2736 )) as Arc<dyn PhysicalExpr>;
2737
2738 let filter = JoinFilter::new(
2739 filter_expression,
2740 column_indices.clone(),
2741 Arc::new(intermediate_schema.clone()),
2742 );
2743
2744 let join = join_with_filter(
2745 Arc::clone(&left),
2746 Arc::clone(&right),
2747 on.clone(),
2748 filter,
2749 &JoinType::RightSemi,
2750 NullEquality::NullEqualsNothing,
2751 )?;
2752
2753 let columns = columns(&join.schema());
2754 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2755
2756 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2757 let batches = common::collect(stream).await?;
2758
2759 allow_duplicates! {
2761 assert_snapshot!(batches_to_string(&batches), @r#"
2762 +----+----+-----+
2763 | a2 | b2 | c2 |
2764 +----+----+-----+
2765 | 8 | 8 | 20 |
2766 | 12 | 10 | 40 |
2767 | 10 | 10 | 100 |
2768 +----+----+-----+
2769 "#);
2770 }
2771
2772 let filter_expression = Arc::new(BinaryExpr::new(
2774 Arc::new(Column::new("x", 0)),
2775 Operator::Gt,
2776 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2777 )) as Arc<dyn PhysicalExpr>;
2778
2779 let filter = JoinFilter::new(
2780 filter_expression,
2781 column_indices,
2782 Arc::new(intermediate_schema.clone()),
2783 );
2784
2785 let join = join_with_filter(
2786 left,
2787 right,
2788 on,
2789 filter,
2790 &JoinType::RightSemi,
2791 NullEquality::NullEqualsNothing,
2792 )?;
2793 let stream = join.execute(0, task_ctx)?;
2794 let batches = common::collect(stream).await?;
2795
2796 allow_duplicates! {
2798 assert_snapshot!(batches_to_string(&batches), @r#"
2799 +----+----+-----+
2800 | a2 | b2 | c2 |
2801 +----+----+-----+
2802 | 12 | 10 | 40 |
2803 | 10 | 10 | 100 |
2804 +----+----+-----+
2805 "#);
2806 }
2807
2808 Ok(())
2809 }
2810
2811 #[apply(batch_sizes)]
2812 #[tokio::test]
2813 async fn join_left_anti(batch_size: usize) -> Result<()> {
2814 let task_ctx = prepare_task_ctx(batch_size);
2815 let left = build_semi_anti_left_table();
2816 let right = build_semi_anti_right_table();
2817 let on = vec![(
2819 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2820 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2821 )];
2822
2823 let join = join(
2824 left,
2825 right,
2826 on,
2827 &JoinType::LeftAnti,
2828 NullEquality::NullEqualsNothing,
2829 )?;
2830
2831 let columns = columns(&join.schema());
2832 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2833
2834 let stream = join.execute(0, task_ctx)?;
2835 let batches = common::collect(stream).await?;
2836
2837 allow_duplicates! {
2838 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2839 +----+----+----+
2840 | a1 | b1 | c1 |
2841 +----+----+----+
2842 | 1 | 1 | 10 |
2843 | 3 | 3 | 30 |
2844 | 5 | 5 | 50 |
2845 | 7 | 7 | 70 |
2846 +----+----+----+
2847 "#);
2848 }
2849 Ok(())
2850 }
2851
2852 #[apply(batch_sizes)]
2853 #[tokio::test]
2854 async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2855 let task_ctx = prepare_task_ctx(batch_size);
2856 let left = build_semi_anti_left_table();
2857 let right = build_semi_anti_right_table();
2858 let on = vec![(
2860 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2861 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2862 )];
2863
2864 let column_indices = vec![ColumnIndex {
2865 index: 0,
2866 side: JoinSide::Right,
2867 }];
2868 let intermediate_schema =
2869 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2870 let filter_expression = Arc::new(BinaryExpr::new(
2871 Arc::new(Column::new("x", 0)),
2872 Operator::NotEq,
2873 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2874 )) as Arc<dyn PhysicalExpr>;
2875
2876 let filter = JoinFilter::new(
2877 filter_expression,
2878 column_indices.clone(),
2879 Arc::new(intermediate_schema.clone()),
2880 );
2881
2882 let join = join_with_filter(
2883 Arc::clone(&left),
2884 Arc::clone(&right),
2885 on.clone(),
2886 filter,
2887 &JoinType::LeftAnti,
2888 NullEquality::NullEqualsNothing,
2889 )?;
2890
2891 let columns_header = columns(&join.schema());
2892 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2893
2894 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2895 let batches = common::collect(stream).await?;
2896
2897 allow_duplicates! {
2898 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2899 +----+----+-----+
2900 | a1 | b1 | c1 |
2901 +----+----+-----+
2902 | 1 | 1 | 10 |
2903 | 11 | 8 | 110 |
2904 | 3 | 3 | 30 |
2905 | 5 | 5 | 50 |
2906 | 7 | 7 | 70 |
2907 | 9 | 8 | 90 |
2908 +----+----+-----+
2909 "#);
2910 }
2911
2912 let filter_expression = Arc::new(BinaryExpr::new(
2914 Arc::new(Column::new("x", 0)),
2915 Operator::NotEq,
2916 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2917 )) as Arc<dyn PhysicalExpr>;
2918
2919 let filter = JoinFilter::new(
2920 filter_expression,
2921 column_indices,
2922 Arc::new(intermediate_schema),
2923 );
2924
2925 let join = join_with_filter(
2926 left,
2927 right,
2928 on,
2929 filter,
2930 &JoinType::LeftAnti,
2931 NullEquality::NullEqualsNothing,
2932 )?;
2933
2934 let columns_header = columns(&join.schema());
2935 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2936
2937 let stream = join.execute(0, task_ctx)?;
2938 let batches = common::collect(stream).await?;
2939
2940 allow_duplicates! {
2941 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2942 +----+----+-----+
2943 | a1 | b1 | c1 |
2944 +----+----+-----+
2945 | 1 | 1 | 10 |
2946 | 11 | 8 | 110 |
2947 | 3 | 3 | 30 |
2948 | 5 | 5 | 50 |
2949 | 7 | 7 | 70 |
2950 | 9 | 8 | 90 |
2951 +----+----+-----+
2952 "#);
2953 }
2954
2955 Ok(())
2956 }
2957
2958 #[apply(batch_sizes)]
2959 #[tokio::test]
2960 async fn join_right_anti(batch_size: usize) -> Result<()> {
2961 let task_ctx = prepare_task_ctx(batch_size);
2962 let left = build_semi_anti_left_table();
2963 let right = build_semi_anti_right_table();
2964 let on = vec![(
2965 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2966 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2967 )];
2968
2969 let join = join(
2970 left,
2971 right,
2972 on,
2973 &JoinType::RightAnti,
2974 NullEquality::NullEqualsNothing,
2975 )?;
2976
2977 let columns = columns(&join.schema());
2978 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2979
2980 let stream = join.execute(0, task_ctx)?;
2981 let batches = common::collect(stream).await?;
2982
2983 allow_duplicates! {
2985 assert_snapshot!(batches_to_string(&batches), @r#"
2986 +----+----+-----+
2987 | a2 | b2 | c2 |
2988 +----+----+-----+
2989 | 6 | 6 | 60 |
2990 | 2 | 2 | 80 |
2991 | 4 | 4 | 120 |
2992 +----+----+-----+
2993 "#);
2994 }
2995 Ok(())
2996 }
2997
2998 #[apply(batch_sizes)]
2999 #[tokio::test]
3000 async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
3001 let task_ctx = prepare_task_ctx(batch_size);
3002 let left = build_semi_anti_left_table();
3003 let right = build_semi_anti_right_table();
3004 let on = vec![(
3006 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3007 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3008 )];
3009
3010 let column_indices = vec![ColumnIndex {
3011 index: 0,
3012 side: JoinSide::Left,
3013 }];
3014 let intermediate_schema =
3015 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3016
3017 let filter_expression = Arc::new(BinaryExpr::new(
3018 Arc::new(Column::new("x", 0)),
3019 Operator::NotEq,
3020 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3021 )) as Arc<dyn PhysicalExpr>;
3022
3023 let filter = JoinFilter::new(
3024 filter_expression,
3025 column_indices,
3026 Arc::new(intermediate_schema.clone()),
3027 );
3028
3029 let join = join_with_filter(
3030 Arc::clone(&left),
3031 Arc::clone(&right),
3032 on.clone(),
3033 filter,
3034 &JoinType::RightAnti,
3035 NullEquality::NullEqualsNothing,
3036 )?;
3037
3038 let columns_header = columns(&join.schema());
3039 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3040
3041 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3042 let batches = common::collect(stream).await?;
3043
3044 allow_duplicates! {
3046 assert_snapshot!(batches_to_string(&batches), @r#"
3047 +----+----+-----+
3048 | a2 | b2 | c2 |
3049 +----+----+-----+
3050 | 12 | 10 | 40 |
3051 | 6 | 6 | 60 |
3052 | 2 | 2 | 80 |
3053 | 10 | 10 | 100 |
3054 | 4 | 4 | 120 |
3055 +----+----+-----+
3056 "#);
3057 }
3058
3059 let column_indices = vec![ColumnIndex {
3061 index: 1,
3062 side: JoinSide::Right,
3063 }];
3064 let filter_expression = Arc::new(BinaryExpr::new(
3065 Arc::new(Column::new("x", 0)),
3066 Operator::NotEq,
3067 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3068 )) as Arc<dyn PhysicalExpr>;
3069
3070 let filter = JoinFilter::new(
3071 filter_expression,
3072 column_indices,
3073 Arc::new(intermediate_schema),
3074 );
3075
3076 let join = join_with_filter(
3077 left,
3078 right,
3079 on,
3080 filter,
3081 &JoinType::RightAnti,
3082 NullEquality::NullEqualsNothing,
3083 )?;
3084
3085 let columns_header = columns(&join.schema());
3086 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3087
3088 let stream = join.execute(0, task_ctx)?;
3089 let batches = common::collect(stream).await?;
3090
3091 allow_duplicates! {
3093 assert_snapshot!(batches_to_string(&batches), @r#"
3094 +----+----+-----+
3095 | a2 | b2 | c2 |
3096 +----+----+-----+
3097 | 8 | 8 | 20 |
3098 | 6 | 6 | 60 |
3099 | 2 | 2 | 80 |
3100 | 4 | 4 | 120 |
3101 +----+----+-----+
3102 "#);
3103 }
3104
3105 Ok(())
3106 }
3107
3108 #[apply(batch_sizes)]
3109 #[tokio::test]
3110 async fn join_right_one(batch_size: usize) -> Result<()> {
3111 let task_ctx = prepare_task_ctx(batch_size);
3112 let left = build_table(
3113 ("a1", &vec![1, 2, 3]),
3114 ("b1", &vec![4, 5, 7]),
3115 ("c1", &vec![7, 8, 9]),
3116 );
3117 let right = build_table(
3118 ("a2", &vec![10, 20, 30]),
3119 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3121 );
3122 let on = vec![(
3123 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3124 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3125 )];
3126
3127 let (columns, batches, metrics) = join_collect(
3128 left,
3129 right,
3130 on,
3131 &JoinType::Right,
3132 NullEquality::NullEqualsNothing,
3133 task_ctx,
3134 )
3135 .await?;
3136
3137 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3138
3139 allow_duplicates! {
3140 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3141 +----+----+----+----+----+----+
3142 | a1 | b1 | c1 | a2 | b1 | c2 |
3143 +----+----+----+----+----+----+
3144 | | | | 30 | 6 | 90 |
3145 | 1 | 4 | 7 | 10 | 4 | 70 |
3146 | 2 | 5 | 8 | 20 | 5 | 80 |
3147 +----+----+----+----+----+----+
3148 "#);
3149 }
3150
3151 assert_join_metrics!(metrics, 3);
3152
3153 Ok(())
3154 }
3155
3156 #[apply(batch_sizes)]
3157 #[tokio::test]
3158 async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3159 let task_ctx = prepare_task_ctx(batch_size);
3160 let left = build_table(
3161 ("a1", &vec![1, 2, 3]),
3162 ("b1", &vec![4, 5, 7]),
3163 ("c1", &vec![7, 8, 9]),
3164 );
3165 let right = build_table(
3166 ("a2", &vec![10, 20, 30]),
3167 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3169 );
3170 let on = vec![(
3171 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3172 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3173 )];
3174
3175 let (columns, batches, metrics) = partitioned_join_collect(
3176 left,
3177 right,
3178 on,
3179 &JoinType::Right,
3180 NullEquality::NullEqualsNothing,
3181 task_ctx,
3182 )
3183 .await?;
3184
3185 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3186
3187 allow_duplicates! {
3188 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3189 +----+----+----+----+----+----+
3190 | a1 | b1 | c1 | a2 | b1 | c2 |
3191 +----+----+----+----+----+----+
3192 | | | | 30 | 6 | 90 |
3193 | 1 | 4 | 7 | 10 | 4 | 70 |
3194 | 2 | 5 | 8 | 20 | 5 | 80 |
3195 +----+----+----+----+----+----+
3196 "#);
3197 }
3198
3199 assert_join_metrics!(metrics, 3);
3200
3201 Ok(())
3202 }
3203
3204 #[apply(batch_sizes)]
3205 #[tokio::test]
3206 async fn join_full_one(batch_size: usize) -> Result<()> {
3207 let task_ctx = prepare_task_ctx(batch_size);
3208 let left = build_table(
3209 ("a1", &vec![1, 2, 3]),
3210 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3212 );
3213 let right = build_table(
3214 ("a2", &vec![10, 20, 30]),
3215 ("b2", &vec![4, 5, 6]),
3216 ("c2", &vec![70, 80, 90]),
3217 );
3218 let on = vec![(
3219 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3220 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3221 )];
3222
3223 let join = join(
3224 left,
3225 right,
3226 on,
3227 &JoinType::Full,
3228 NullEquality::NullEqualsNothing,
3229 )?;
3230
3231 let columns = columns(&join.schema());
3232 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3233
3234 let stream = join.execute(0, task_ctx)?;
3235 let batches = common::collect(stream).await?;
3236
3237 allow_duplicates! {
3238 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3239 +----+----+----+----+----+----+
3240 | a1 | b1 | c1 | a2 | b2 | c2 |
3241 +----+----+----+----+----+----+
3242 | | | | 30 | 6 | 90 |
3243 | 1 | 4 | 7 | 10 | 4 | 70 |
3244 | 2 | 5 | 8 | 20 | 5 | 80 |
3245 | 3 | 7 | 9 | | | |
3246 +----+----+----+----+----+----+
3247 "#);
3248 }
3249
3250 Ok(())
3251 }
3252
3253 #[apply(batch_sizes)]
3254 #[tokio::test]
3255 async fn join_left_mark(batch_size: usize) -> Result<()> {
3256 let task_ctx = prepare_task_ctx(batch_size);
3257 let left = build_table(
3258 ("a1", &vec![1, 2, 3]),
3259 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3261 );
3262 let right = build_table(
3263 ("a2", &vec![10, 20, 30]),
3264 ("b1", &vec![4, 5, 6]),
3265 ("c2", &vec![70, 80, 90]),
3266 );
3267 let on = vec![(
3268 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3269 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3270 )];
3271
3272 let (columns, batches, metrics) = join_collect(
3273 Arc::clone(&left),
3274 Arc::clone(&right),
3275 on.clone(),
3276 &JoinType::LeftMark,
3277 NullEquality::NullEqualsNothing,
3278 task_ctx,
3279 )
3280 .await?;
3281
3282 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3283
3284 allow_duplicates! {
3285 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3286 +----+----+----+-------+
3287 | a1 | b1 | c1 | mark |
3288 +----+----+----+-------+
3289 | 1 | 4 | 7 | true |
3290 | 2 | 5 | 8 | true |
3291 | 3 | 7 | 9 | false |
3292 +----+----+----+-------+
3293 "#);
3294 }
3295
3296 assert_join_metrics!(metrics, 3);
3297
3298 Ok(())
3299 }
3300
3301 #[apply(batch_sizes)]
3302 #[tokio::test]
3303 async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3304 let task_ctx = prepare_task_ctx(batch_size);
3305 let left = build_table(
3306 ("a1", &vec![1, 2, 3]),
3307 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3309 );
3310 let right = build_table(
3311 ("a2", &vec![10, 20, 30, 40]),
3312 ("b1", &vec![4, 4, 5, 6]),
3313 ("c2", &vec![60, 70, 80, 90]),
3314 );
3315 let on = vec![(
3316 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3317 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3318 )];
3319
3320 let (columns, batches, metrics) = partitioned_join_collect(
3321 Arc::clone(&left),
3322 Arc::clone(&right),
3323 on.clone(),
3324 &JoinType::LeftMark,
3325 NullEquality::NullEqualsNothing,
3326 task_ctx,
3327 )
3328 .await?;
3329
3330 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3331
3332 allow_duplicates! {
3333 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3334 +----+----+----+-------+
3335 | a1 | b1 | c1 | mark |
3336 +----+----+----+-------+
3337 | 1 | 4 | 7 | true |
3338 | 2 | 5 | 8 | true |
3339 | 3 | 7 | 9 | false |
3340 +----+----+----+-------+
3341 "#);
3342 }
3343
3344 assert_join_metrics!(metrics, 3);
3345
3346 Ok(())
3347 }
3348
3349 #[apply(batch_sizes)]
3350 #[tokio::test]
3351 async fn join_right_mark(batch_size: usize) -> Result<()> {
3352 let task_ctx = prepare_task_ctx(batch_size);
3353 let left = build_table(
3354 ("a1", &vec![1, 2, 3]),
3355 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3357 );
3358 let right = build_table(
3359 ("a2", &vec![10, 20, 30]),
3360 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3362 );
3363 let on = vec![(
3364 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3365 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3366 )];
3367
3368 let (columns, batches, metrics) = join_collect(
3369 Arc::clone(&left),
3370 Arc::clone(&right),
3371 on.clone(),
3372 &JoinType::RightMark,
3373 NullEquality::NullEqualsNothing,
3374 task_ctx,
3375 )
3376 .await?;
3377
3378 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3379
3380 let expected = [
3381 "+----+----+----+-------+",
3382 "| a2 | b1 | c2 | mark |",
3383 "+----+----+----+-------+",
3384 "| 10 | 4 | 70 | true |",
3385 "| 20 | 5 | 80 | true |",
3386 "| 30 | 6 | 90 | false |",
3387 "+----+----+----+-------+",
3388 ];
3389 assert_batches_sorted_eq!(expected, &batches);
3390
3391 assert_join_metrics!(metrics, 3);
3392
3393 Ok(())
3394 }
3395
3396 #[apply(batch_sizes)]
3397 #[tokio::test]
3398 async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3399 let task_ctx = prepare_task_ctx(batch_size);
3400 let left = build_table(
3401 ("a1", &vec![1, 2, 3]),
3402 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3404 );
3405 let right = build_table(
3406 ("a2", &vec![10, 20, 30, 40]),
3407 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
3409 );
3410 let on = vec![(
3411 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3412 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3413 )];
3414
3415 let (columns, batches, metrics) = partitioned_join_collect(
3416 Arc::clone(&left),
3417 Arc::clone(&right),
3418 on.clone(),
3419 &JoinType::RightMark,
3420 NullEquality::NullEqualsNothing,
3421 task_ctx,
3422 )
3423 .await?;
3424
3425 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3426
3427 let expected = [
3428 "+----+----+----+-------+",
3429 "| a2 | b1 | c2 | mark |",
3430 "+----+----+----+-------+",
3431 "| 10 | 4 | 60 | true |",
3432 "| 20 | 4 | 70 | true |",
3433 "| 30 | 5 | 80 | true |",
3434 "| 40 | 6 | 90 | false |",
3435 "+----+----+----+-------+",
3436 ];
3437 assert_batches_sorted_eq!(expected, &batches);
3438
3439 assert_join_metrics!(metrics, 4);
3440
3441 Ok(())
3442 }
3443
3444 #[test]
3445 fn join_with_hash_collisions_64() -> Result<()> {
3446 let mut hashmap_left = HashTable::with_capacity(4);
3447 let left = build_table_i32(
3448 ("a", &vec![10, 20]),
3449 ("x", &vec![100, 200]),
3450 ("y", &vec![200, 300]),
3451 );
3452
3453 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3454 let hashes_buff = &mut vec![0; left.num_rows()];
3455 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3456
3457 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3462 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3463
3464 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3465 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3466
3467 let next = vec![2, 0];
3468
3469 let right = build_table_i32(
3470 ("a", &vec![10, 20]),
3471 ("b", &vec![0, 0]),
3472 ("c", &vec![30, 40]),
3473 );
3474
3475 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3477
3478 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3479
3480 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3481 let right_keys_values =
3482 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3483 let mut hashes_buffer = vec![0; right.num_rows()];
3484 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3485
3486 let (l, r, _) = lookup_join_hashmap(
3487 &join_hash_map,
3488 &[left_keys_values],
3489 &[right_keys_values],
3490 NullEquality::NullEqualsNothing,
3491 &hashes_buffer,
3492 8192,
3493 (0, None),
3494 )?;
3495
3496 let left_ids: UInt64Array = vec![0, 1].into();
3497
3498 let right_ids: UInt32Array = vec![0, 1].into();
3499
3500 assert_eq!(left_ids, l);
3501
3502 assert_eq!(right_ids, r);
3503
3504 Ok(())
3505 }
3506
3507 #[test]
3508 fn join_with_hash_collisions_u32() -> Result<()> {
3509 let mut hashmap_left = HashTable::with_capacity(4);
3510 let left = build_table_i32(
3511 ("a", &vec![10, 20]),
3512 ("x", &vec![100, 200]),
3513 ("y", &vec![200, 300]),
3514 );
3515
3516 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3517 let hashes_buff = &mut vec![0; left.num_rows()];
3518 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3519
3520 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3521 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3522 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3523 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3524
3525 let next: Vec<u32> = vec![2, 0];
3526
3527 let right = build_table_i32(
3528 ("a", &vec![10, 20]),
3529 ("b", &vec![0, 0]),
3530 ("c", &vec![30, 40]),
3531 );
3532
3533 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3534
3535 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3536
3537 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3538 let right_keys_values =
3539 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3540 let mut hashes_buffer = vec![0; right.num_rows()];
3541 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3542
3543 let (l, r, _) = lookup_join_hashmap(
3544 &join_hash_map,
3545 &[left_keys_values],
3546 &[right_keys_values],
3547 NullEquality::NullEqualsNothing,
3548 &hashes_buffer,
3549 8192,
3550 (0, None),
3551 )?;
3552
3553 let left_ids: UInt64Array = vec![0, 1].into();
3555 let right_ids: UInt32Array = vec![0, 1].into();
3556
3557 assert_eq!(left_ids, l);
3558 assert_eq!(right_ids, r);
3559
3560 Ok(())
3561 }
3562
3563 #[tokio::test]
3564 async fn join_with_duplicated_column_names() -> Result<()> {
3565 let task_ctx = Arc::new(TaskContext::default());
3566 let left = build_table(
3567 ("a", &vec![1, 2, 3]),
3568 ("b", &vec![4, 5, 7]),
3569 ("c", &vec![7, 8, 9]),
3570 );
3571 let right = build_table(
3572 ("a", &vec![10, 20, 30]),
3573 ("b", &vec![1, 2, 7]),
3574 ("c", &vec![70, 80, 90]),
3575 );
3576 let on = vec![(
3577 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3579 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3580 )];
3581
3582 let join = join(
3583 left,
3584 right,
3585 on,
3586 &JoinType::Inner,
3587 NullEquality::NullEqualsNothing,
3588 )?;
3589
3590 let columns = columns(&join.schema());
3591 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3592
3593 let stream = join.execute(0, task_ctx)?;
3594 let batches = common::collect(stream).await?;
3595
3596 allow_duplicates! {
3597 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3598 +---+---+---+----+---+----+
3599 | a | b | c | a | b | c |
3600 +---+---+---+----+---+----+
3601 | 1 | 4 | 7 | 10 | 1 | 70 |
3602 | 2 | 5 | 8 | 20 | 2 | 80 |
3603 +---+---+---+----+---+----+
3604 "#);
3605 }
3606
3607 Ok(())
3608 }
3609
3610 fn prepare_join_filter() -> JoinFilter {
3611 let column_indices = vec![
3612 ColumnIndex {
3613 index: 2,
3614 side: JoinSide::Left,
3615 },
3616 ColumnIndex {
3617 index: 2,
3618 side: JoinSide::Right,
3619 },
3620 ];
3621 let intermediate_schema = Schema::new(vec![
3622 Field::new("c", DataType::Int32, true),
3623 Field::new("c", DataType::Int32, true),
3624 ]);
3625 let filter_expression = Arc::new(BinaryExpr::new(
3626 Arc::new(Column::new("c", 0)),
3627 Operator::Gt,
3628 Arc::new(Column::new("c", 1)),
3629 )) as Arc<dyn PhysicalExpr>;
3630
3631 JoinFilter::new(
3632 filter_expression,
3633 column_indices,
3634 Arc::new(intermediate_schema),
3635 )
3636 }
3637
3638 #[apply(batch_sizes)]
3639 #[tokio::test]
3640 async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3641 let task_ctx = prepare_task_ctx(batch_size);
3642 let left = build_table(
3643 ("a", &vec![0, 1, 2, 2]),
3644 ("b", &vec![4, 5, 7, 8]),
3645 ("c", &vec![7, 8, 9, 1]),
3646 );
3647 let right = build_table(
3648 ("a", &vec![10, 20, 30, 40]),
3649 ("b", &vec![2, 2, 3, 4]),
3650 ("c", &vec![7, 5, 6, 4]),
3651 );
3652 let on = vec![(
3653 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3654 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3655 )];
3656 let filter = prepare_join_filter();
3657
3658 let join = join_with_filter(
3659 left,
3660 right,
3661 on,
3662 filter,
3663 &JoinType::Inner,
3664 NullEquality::NullEqualsNothing,
3665 )?;
3666
3667 let columns = columns(&join.schema());
3668 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3669
3670 let stream = join.execute(0, task_ctx)?;
3671 let batches = common::collect(stream).await?;
3672
3673 allow_duplicates! {
3674 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3675 +---+---+---+----+---+---+
3676 | a | b | c | a | b | c |
3677 +---+---+---+----+---+---+
3678 | 2 | 7 | 9 | 10 | 2 | 7 |
3679 | 2 | 7 | 9 | 20 | 2 | 5 |
3680 +---+---+---+----+---+---+
3681 "#);
3682 }
3683
3684 Ok(())
3685 }
3686
3687 #[apply(batch_sizes)]
3688 #[tokio::test]
3689 async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3690 let task_ctx = prepare_task_ctx(batch_size);
3691 let left = build_table(
3692 ("a", &vec![0, 1, 2, 2]),
3693 ("b", &vec![4, 5, 7, 8]),
3694 ("c", &vec![7, 8, 9, 1]),
3695 );
3696 let right = build_table(
3697 ("a", &vec![10, 20, 30, 40]),
3698 ("b", &vec![2, 2, 3, 4]),
3699 ("c", &vec![7, 5, 6, 4]),
3700 );
3701 let on = vec![(
3702 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3703 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3704 )];
3705 let filter = prepare_join_filter();
3706
3707 let join = join_with_filter(
3708 left,
3709 right,
3710 on,
3711 filter,
3712 &JoinType::Left,
3713 NullEquality::NullEqualsNothing,
3714 )?;
3715
3716 let columns = columns(&join.schema());
3717 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3718
3719 let stream = join.execute(0, task_ctx)?;
3720 let batches = common::collect(stream).await?;
3721
3722 allow_duplicates! {
3723 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3724 +---+---+---+----+---+---+
3725 | a | b | c | a | b | c |
3726 +---+---+---+----+---+---+
3727 | 0 | 4 | 7 | | | |
3728 | 1 | 5 | 8 | | | |
3729 | 2 | 7 | 9 | 10 | 2 | 7 |
3730 | 2 | 7 | 9 | 20 | 2 | 5 |
3731 | 2 | 8 | 1 | | | |
3732 +---+---+---+----+---+---+
3733 "#);
3734 }
3735
3736 Ok(())
3737 }
3738
3739 #[apply(batch_sizes)]
3740 #[tokio::test]
3741 async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3742 let task_ctx = prepare_task_ctx(batch_size);
3743 let left = build_table(
3744 ("a", &vec![0, 1, 2, 2]),
3745 ("b", &vec![4, 5, 7, 8]),
3746 ("c", &vec![7, 8, 9, 1]),
3747 );
3748 let right = build_table(
3749 ("a", &vec![10, 20, 30, 40]),
3750 ("b", &vec![2, 2, 3, 4]),
3751 ("c", &vec![7, 5, 6, 4]),
3752 );
3753 let on = vec![(
3754 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3755 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3756 )];
3757 let filter = prepare_join_filter();
3758
3759 let join = join_with_filter(
3760 left,
3761 right,
3762 on,
3763 filter,
3764 &JoinType::Right,
3765 NullEquality::NullEqualsNothing,
3766 )?;
3767
3768 let columns = columns(&join.schema());
3769 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3770
3771 let stream = join.execute(0, task_ctx)?;
3772 let batches = common::collect(stream).await?;
3773
3774 allow_duplicates! {
3775 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3776 +---+---+---+----+---+---+
3777 | a | b | c | a | b | c |
3778 +---+---+---+----+---+---+
3779 | | | | 30 | 3 | 6 |
3780 | | | | 40 | 4 | 4 |
3781 | 2 | 7 | 9 | 10 | 2 | 7 |
3782 | 2 | 7 | 9 | 20 | 2 | 5 |
3783 +---+---+---+----+---+---+
3784 "#);
3785 }
3786
3787 Ok(())
3788 }
3789
3790 #[apply(batch_sizes)]
3791 #[tokio::test]
3792 async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3793 let task_ctx = prepare_task_ctx(batch_size);
3794 let left = build_table(
3795 ("a", &vec![0, 1, 2, 2]),
3796 ("b", &vec![4, 5, 7, 8]),
3797 ("c", &vec![7, 8, 9, 1]),
3798 );
3799 let right = build_table(
3800 ("a", &vec![10, 20, 30, 40]),
3801 ("b", &vec![2, 2, 3, 4]),
3802 ("c", &vec![7, 5, 6, 4]),
3803 );
3804 let on = vec![(
3805 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3806 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3807 )];
3808 let filter = prepare_join_filter();
3809
3810 let join = join_with_filter(
3811 left,
3812 right,
3813 on,
3814 filter,
3815 &JoinType::Full,
3816 NullEquality::NullEqualsNothing,
3817 )?;
3818
3819 let columns = columns(&join.schema());
3820 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3821
3822 let stream = join.execute(0, task_ctx)?;
3823 let batches = common::collect(stream).await?;
3824
3825 let expected = [
3826 "+---+---+---+----+---+---+",
3827 "| a | b | c | a | b | c |",
3828 "+---+---+---+----+---+---+",
3829 "| | | | 30 | 3 | 6 |",
3830 "| | | | 40 | 4 | 4 |",
3831 "| 2 | 7 | 9 | 10 | 2 | 7 |",
3832 "| 2 | 7 | 9 | 20 | 2 | 5 |",
3833 "| 0 | 4 | 7 | | | |",
3834 "| 1 | 5 | 8 | | | |",
3835 "| 2 | 8 | 1 | | | |",
3836 "+---+---+---+----+---+---+",
3837 ];
3838 assert_batches_sorted_eq!(expected, &batches);
3839
3840 Ok(())
3858 }
3859
3860 #[tokio::test]
3862 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3863 let task_ctx = Arc::new(TaskContext::default());
3864 let left = build_table(
3865 ("a1", &vec![1, 2, 3]),
3866 ("b1", &vec![4, 5, 7]),
3867 ("c1", &vec![7, 8, 9]),
3868 );
3869 let right = build_table(
3870 ("a2", &vec![10, 20, 30]),
3871 ("b2", &vec![4, 5, 6]),
3872 ("c2", &vec![70, 80, 90]),
3873 );
3874 let on = vec![(
3875 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3876 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3877 )];
3878
3879 let expected_inner = vec![
3880 "+----+----+----+----+----+----+",
3881 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3882 "+----+----+----+----+----+----+",
3883 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3884 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3885 "+----+----+----+----+----+----+",
3886 ];
3887 let expected_left = vec![
3888 "+----+----+----+----+----+----+",
3889 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3890 "+----+----+----+----+----+----+",
3891 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3892 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3893 "| 3 | 7 | 9 | | | |",
3894 "+----+----+----+----+----+----+",
3895 ];
3896 let expected_right = vec![
3897 "+----+----+----+----+----+----+",
3898 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3899 "+----+----+----+----+----+----+",
3900 "| | | | 30 | 6 | 90 |",
3901 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3902 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3903 "+----+----+----+----+----+----+",
3904 ];
3905 let expected_full = vec![
3906 "+----+----+----+----+----+----+",
3907 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3908 "+----+----+----+----+----+----+",
3909 "| | | | 30 | 6 | 90 |",
3910 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3911 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3912 "| 3 | 7 | 9 | | | |",
3913 "+----+----+----+----+----+----+",
3914 ];
3915 let expected_left_semi = vec![
3916 "+----+----+----+",
3917 "| a1 | b1 | c1 |",
3918 "+----+----+----+",
3919 "| 1 | 4 | 7 |",
3920 "| 2 | 5 | 8 |",
3921 "+----+----+----+",
3922 ];
3923 let expected_left_anti = vec![
3924 "+----+----+----+",
3925 "| a1 | b1 | c1 |",
3926 "+----+----+----+",
3927 "| 3 | 7 | 9 |",
3928 "+----+----+----+",
3929 ];
3930 let expected_right_semi = vec![
3931 "+----+----+----+",
3932 "| a2 | b2 | c2 |",
3933 "+----+----+----+",
3934 "| 10 | 4 | 70 |",
3935 "| 20 | 5 | 80 |",
3936 "+----+----+----+",
3937 ];
3938 let expected_right_anti = vec![
3939 "+----+----+----+",
3940 "| a2 | b2 | c2 |",
3941 "+----+----+----+",
3942 "| 30 | 6 | 90 |",
3943 "+----+----+----+",
3944 ];
3945 let expected_left_mark = vec![
3946 "+----+----+----+-------+",
3947 "| a1 | b1 | c1 | mark |",
3948 "+----+----+----+-------+",
3949 "| 1 | 4 | 7 | true |",
3950 "| 2 | 5 | 8 | true |",
3951 "| 3 | 7 | 9 | false |",
3952 "+----+----+----+-------+",
3953 ];
3954 let expected_right_mark = vec![
3955 "+----+----+----+-------+",
3956 "| a2 | b2 | c2 | mark |",
3957 "+----+----+----+-------+",
3958 "| 10 | 4 | 70 | true |",
3959 "| 20 | 5 | 80 | true |",
3960 "| 30 | 6 | 90 | false |",
3961 "+----+----+----+-------+",
3962 ];
3963
3964 let test_cases = vec![
3965 (JoinType::Inner, expected_inner),
3966 (JoinType::Left, expected_left),
3967 (JoinType::Right, expected_right),
3968 (JoinType::Full, expected_full),
3969 (JoinType::LeftSemi, expected_left_semi),
3970 (JoinType::LeftAnti, expected_left_anti),
3971 (JoinType::RightSemi, expected_right_semi),
3972 (JoinType::RightAnti, expected_right_anti),
3973 (JoinType::LeftMark, expected_left_mark),
3974 (JoinType::RightMark, expected_right_mark),
3975 ];
3976
3977 for (join_type, expected) in test_cases {
3978 let (_, batches, metrics) = join_collect_with_partition_mode(
3979 Arc::clone(&left),
3980 Arc::clone(&right),
3981 on.clone(),
3982 &join_type,
3983 PartitionMode::CollectLeft,
3984 NullEquality::NullEqualsNothing,
3985 Arc::clone(&task_ctx),
3986 )
3987 .await?;
3988 assert_batches_sorted_eq!(expected, &batches);
3989 assert_join_metrics!(metrics, expected.len() - 4);
3990 }
3991
3992 Ok(())
3993 }
3994
3995 #[tokio::test]
3996 async fn join_date32() -> Result<()> {
3997 let schema = Arc::new(Schema::new(vec![
3998 Field::new("date", DataType::Date32, false),
3999 Field::new("n", DataType::Int32, false),
4000 ]));
4001
4002 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4003 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4004 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4005 let left =
4006 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4007 .unwrap();
4008 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4009 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4010 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4011 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4012 let on = vec![(
4013 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4014 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4015 )];
4016
4017 let join = join(
4018 left,
4019 right,
4020 on,
4021 &JoinType::Inner,
4022 NullEquality::NullEqualsNothing,
4023 )?;
4024
4025 let task_ctx = Arc::new(TaskContext::default());
4026 let stream = join.execute(0, task_ctx)?;
4027 let batches = common::collect(stream).await?;
4028
4029 allow_duplicates! {
4030 assert_snapshot!(batches_to_sort_string(&batches), @r#"
4031 +------------+---+------------+---+
4032 | date | n | date | n |
4033 +------------+---+------------+---+
4034 | 2022-04-26 | 2 | 2022-04-26 | 4 |
4035 | 2022-04-26 | 2 | 2022-04-26 | 5 |
4036 | 2022-04-27 | 3 | 2022-04-27 | 6 |
4037 +------------+---+------------+---+
4038 "#);
4039 }
4040
4041 Ok(())
4042 }
4043
4044 #[tokio::test]
4045 async fn join_with_error_right() {
4046 let left = build_table(
4047 ("a1", &vec![1, 2, 3]),
4048 ("b1", &vec![4, 5, 7]),
4049 ("c1", &vec![7, 8, 9]),
4050 );
4051
4052 let err = exec_err!("bad data error");
4055 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4056
4057 let on = vec![(
4058 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4059 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4060 )];
4061 let schema = right.schema();
4062 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4063 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4064
4065 let join_types = vec![
4066 JoinType::Inner,
4067 JoinType::Left,
4068 JoinType::Right,
4069 JoinType::Full,
4070 JoinType::LeftSemi,
4071 JoinType::LeftAnti,
4072 JoinType::RightSemi,
4073 JoinType::RightAnti,
4074 ];
4075
4076 for join_type in join_types {
4077 let join = join(
4078 Arc::clone(&left),
4079 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4080 on.clone(),
4081 &join_type,
4082 NullEquality::NullEqualsNothing,
4083 )
4084 .unwrap();
4085 let task_ctx = Arc::new(TaskContext::default());
4086
4087 let stream = join.execute(0, task_ctx).unwrap();
4088
4089 let result_string = common::collect(stream).await.unwrap_err().to_string();
4091 assert!(
4092 result_string.contains("bad data error"),
4093 "actual: {result_string}"
4094 );
4095 }
4096 }
4097
4098 #[tokio::test]
4099 async fn join_split_batch() {
4100 let left = build_table(
4101 ("a1", &vec![1, 2, 3, 4]),
4102 ("b1", &vec![1, 1, 1, 1]),
4103 ("c1", &vec![0, 0, 0, 0]),
4104 );
4105 let right = build_table(
4106 ("a2", &vec![10, 20, 30, 40, 50]),
4107 ("b2", &vec![1, 1, 1, 1, 1]),
4108 ("c2", &vec![0, 0, 0, 0, 0]),
4109 );
4110 let on = vec![(
4111 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4112 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4113 )];
4114
4115 let join_types = vec![
4116 JoinType::Inner,
4117 JoinType::Left,
4118 JoinType::Right,
4119 JoinType::Full,
4120 JoinType::RightSemi,
4121 JoinType::RightAnti,
4122 JoinType::LeftSemi,
4123 JoinType::LeftAnti,
4124 ];
4125 let expected_resultset_records = 20;
4126 let common_result = [
4127 "+----+----+----+----+----+----+",
4128 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4129 "+----+----+----+----+----+----+",
4130 "| 1 | 1 | 0 | 10 | 1 | 0 |",
4131 "| 2 | 1 | 0 | 10 | 1 | 0 |",
4132 "| 3 | 1 | 0 | 10 | 1 | 0 |",
4133 "| 4 | 1 | 0 | 10 | 1 | 0 |",
4134 "| 1 | 1 | 0 | 20 | 1 | 0 |",
4135 "| 2 | 1 | 0 | 20 | 1 | 0 |",
4136 "| 3 | 1 | 0 | 20 | 1 | 0 |",
4137 "| 4 | 1 | 0 | 20 | 1 | 0 |",
4138 "| 1 | 1 | 0 | 30 | 1 | 0 |",
4139 "| 2 | 1 | 0 | 30 | 1 | 0 |",
4140 "| 3 | 1 | 0 | 30 | 1 | 0 |",
4141 "| 4 | 1 | 0 | 30 | 1 | 0 |",
4142 "| 1 | 1 | 0 | 40 | 1 | 0 |",
4143 "| 2 | 1 | 0 | 40 | 1 | 0 |",
4144 "| 3 | 1 | 0 | 40 | 1 | 0 |",
4145 "| 4 | 1 | 0 | 40 | 1 | 0 |",
4146 "| 1 | 1 | 0 | 50 | 1 | 0 |",
4147 "| 2 | 1 | 0 | 50 | 1 | 0 |",
4148 "| 3 | 1 | 0 | 50 | 1 | 0 |",
4149 "| 4 | 1 | 0 | 50 | 1 | 0 |",
4150 "+----+----+----+----+----+----+",
4151 ];
4152 let left_batch = [
4153 "+----+----+----+",
4154 "| a1 | b1 | c1 |",
4155 "+----+----+----+",
4156 "| 1 | 1 | 0 |",
4157 "| 2 | 1 | 0 |",
4158 "| 3 | 1 | 0 |",
4159 "| 4 | 1 | 0 |",
4160 "+----+----+----+",
4161 ];
4162 let right_batch = [
4163 "+----+----+----+",
4164 "| a2 | b2 | c2 |",
4165 "+----+----+----+",
4166 "| 10 | 1 | 0 |",
4167 "| 20 | 1 | 0 |",
4168 "| 30 | 1 | 0 |",
4169 "| 40 | 1 | 0 |",
4170 "| 50 | 1 | 0 |",
4171 "+----+----+----+",
4172 ];
4173 let right_empty = [
4174 "+----+----+----+",
4175 "| a2 | b2 | c2 |",
4176 "+----+----+----+",
4177 "+----+----+----+",
4178 ];
4179 let left_empty = [
4180 "+----+----+----+",
4181 "| a1 | b1 | c1 |",
4182 "+----+----+----+",
4183 "+----+----+----+",
4184 ];
4185
4186 for join_type in join_types {
4188 for batch_size in (1..21).rev() {
4189 let task_ctx = prepare_task_ctx(batch_size);
4190
4191 let join = join(
4192 Arc::clone(&left),
4193 Arc::clone(&right),
4194 on.clone(),
4195 &join_type,
4196 NullEquality::NullEqualsNothing,
4197 )
4198 .unwrap();
4199
4200 let stream = join.execute(0, task_ctx).unwrap();
4201 let batches = common::collect(stream).await.unwrap();
4202
4203 let expected_batch_count = match join_type {
4208 JoinType::Inner
4209 | JoinType::Right
4210 | JoinType::RightSemi
4211 | JoinType::RightAnti => {
4212 div_ceil(expected_resultset_records, batch_size)
4213 }
4214 _ => div_ceil(expected_resultset_records, batch_size) + 1,
4215 };
4216 assert_eq!(
4217 batches.len(),
4218 expected_batch_count,
4219 "expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4220 );
4221
4222 let expected = match join_type {
4223 JoinType::RightSemi => right_batch.to_vec(),
4224 JoinType::RightAnti => right_empty.to_vec(),
4225 JoinType::LeftSemi => left_batch.to_vec(),
4226 JoinType::LeftAnti => left_empty.to_vec(),
4227 _ => common_result.to_vec(),
4228 };
4229 assert_batches_eq!(expected, &batches);
4230 }
4231 }
4232 }
4233
4234 #[tokio::test]
4235 async fn single_partition_join_overallocation() -> Result<()> {
4236 let left = build_table(
4237 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4238 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4239 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4240 );
4241 let right = build_table(
4242 ("a2", &vec![10, 11]),
4243 ("b2", &vec![12, 13]),
4244 ("c2", &vec![14, 15]),
4245 );
4246 let on = vec![(
4247 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4248 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4249 )];
4250
4251 let join_types = vec![
4252 JoinType::Inner,
4253 JoinType::Left,
4254 JoinType::Right,
4255 JoinType::Full,
4256 JoinType::LeftSemi,
4257 JoinType::LeftAnti,
4258 JoinType::RightSemi,
4259 JoinType::RightAnti,
4260 JoinType::LeftMark,
4261 JoinType::RightMark,
4262 ];
4263
4264 for join_type in join_types {
4265 let runtime = RuntimeEnvBuilder::new()
4266 .with_memory_limit(100, 1.0)
4267 .build_arc()?;
4268 let task_ctx = TaskContext::default().with_runtime(runtime);
4269 let task_ctx = Arc::new(task_ctx);
4270
4271 let join = join(
4272 Arc::clone(&left),
4273 Arc::clone(&right),
4274 on.clone(),
4275 &join_type,
4276 NullEquality::NullEqualsNothing,
4277 )?;
4278
4279 let stream = join.execute(0, task_ctx)?;
4280 let err = common::collect(stream).await.unwrap_err();
4281
4282 assert_contains!(
4284 err.to_string(),
4285 "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
4286 );
4287
4288 assert_contains!(
4289 err.to_string(),
4290 "Failed to allocate additional 120.0 B for HashJoinInput"
4291 );
4292 }
4293
4294 Ok(())
4295 }
4296
4297 #[tokio::test]
4298 async fn partitioned_join_overallocation() -> Result<()> {
4299 let left_batch = build_table_i32(
4302 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4303 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4304 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4305 );
4306 let left = TestMemoryExec::try_new_exec(
4307 &[vec![left_batch.clone()], vec![left_batch.clone()]],
4308 left_batch.schema(),
4309 None,
4310 )
4311 .unwrap();
4312 let right_batch = build_table_i32(
4313 ("a2", &vec![10, 11]),
4314 ("b2", &vec![12, 13]),
4315 ("c2", &vec![14, 15]),
4316 );
4317 let right = TestMemoryExec::try_new_exec(
4318 &[vec![right_batch.clone()], vec![right_batch.clone()]],
4319 right_batch.schema(),
4320 None,
4321 )
4322 .unwrap();
4323 let on = vec![(
4324 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4325 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4326 )];
4327
4328 let join_types = vec![
4329 JoinType::Inner,
4330 JoinType::Left,
4331 JoinType::Right,
4332 JoinType::Full,
4333 JoinType::LeftSemi,
4334 JoinType::LeftAnti,
4335 JoinType::RightSemi,
4336 JoinType::RightAnti,
4337 ];
4338
4339 for join_type in join_types {
4340 let runtime = RuntimeEnvBuilder::new()
4341 .with_memory_limit(100, 1.0)
4342 .build_arc()?;
4343 let session_config = SessionConfig::default().with_batch_size(50);
4344 let task_ctx = TaskContext::default()
4345 .with_session_config(session_config)
4346 .with_runtime(runtime);
4347 let task_ctx = Arc::new(task_ctx);
4348
4349 let join = HashJoinExec::try_new(
4350 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4351 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4352 on.clone(),
4353 None,
4354 &join_type,
4355 None,
4356 PartitionMode::Partitioned,
4357 NullEquality::NullEqualsNothing,
4358 )?;
4359
4360 let stream = join.execute(1, task_ctx)?;
4361 let err = common::collect(stream).await.unwrap_err();
4362
4363 assert_contains!(
4365 err.to_string(),
4366 "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
4367
4368 );
4369
4370 assert_contains!(
4371 err.to_string(),
4372 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4373 );
4374 }
4375
4376 Ok(())
4377 }
4378
4379 fn build_table_struct(
4380 struct_name: &str,
4381 field_name_and_values: (&str, &Vec<Option<i32>>),
4382 nulls: Option<NullBuffer>,
4383 ) -> Arc<dyn ExecutionPlan> {
4384 let (field_name, values) = field_name_and_values;
4385 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4386 let schema = Schema::new(vec![Field::new(
4387 struct_name,
4388 DataType::Struct(inner_fields.clone().into()),
4389 nulls.is_some(),
4390 )]);
4391
4392 let batch = RecordBatch::try_new(
4393 Arc::new(schema),
4394 vec![Arc::new(StructArray::new(
4395 inner_fields.into(),
4396 vec![Arc::new(Int32Array::from(values.clone()))],
4397 nulls,
4398 ))],
4399 )
4400 .unwrap();
4401 let schema_ref = batch.schema();
4402 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4403 }
4404
4405 #[tokio::test]
4406 async fn join_on_struct() -> Result<()> {
4407 let task_ctx = Arc::new(TaskContext::default());
4408 let left =
4409 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4410 let right =
4411 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4412 let on = vec![(
4413 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4414 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4415 )];
4416
4417 let (columns, batches, metrics) = join_collect(
4418 left,
4419 right,
4420 on,
4421 &JoinType::Inner,
4422 NullEquality::NullEqualsNothing,
4423 task_ctx,
4424 )
4425 .await?;
4426
4427 assert_eq!(columns, vec!["n1", "n2"]);
4428
4429 allow_duplicates! {
4430 assert_snapshot!(batches_to_string(&batches), @r#"
4431 +--------+--------+
4432 | n1 | n2 |
4433 +--------+--------+
4434 | {a: } | {a: } |
4435 | {a: 1} | {a: 1} |
4436 | {a: 2} | {a: 2} |
4437 +--------+--------+
4438 "#);
4439 }
4440
4441 assert_join_metrics!(metrics, 3);
4442
4443 Ok(())
4444 }
4445
4446 #[tokio::test]
4447 async fn join_on_struct_with_nulls() -> Result<()> {
4448 let task_ctx = Arc::new(TaskContext::default());
4449 let left =
4450 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4451 let right =
4452 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4453 let on = vec![(
4454 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4455 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4456 )];
4457
4458 let (_, batches_null_eq, metrics) = join_collect(
4459 Arc::clone(&left),
4460 Arc::clone(&right),
4461 on.clone(),
4462 &JoinType::Inner,
4463 NullEquality::NullEqualsNull,
4464 Arc::clone(&task_ctx),
4465 )
4466 .await?;
4467
4468 allow_duplicates! {
4469 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#"
4470 +----+----+
4471 | n1 | n2 |
4472 +----+----+
4473 | | |
4474 +----+----+
4475 "#);
4476 }
4477
4478 assert_join_metrics!(metrics, 1);
4479
4480 let (_, batches_null_neq, metrics) = join_collect(
4481 left,
4482 right,
4483 on,
4484 &JoinType::Inner,
4485 NullEquality::NullEqualsNothing,
4486 task_ctx,
4487 )
4488 .await?;
4489
4490 assert_join_metrics!(metrics, 0);
4491
4492 let expected_null_neq =
4493 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4494 assert_batches_eq!(expected_null_neq, &batches_null_neq);
4495
4496 Ok(())
4497 }
4498
4499 fn columns(schema: &Schema) -> Vec<String> {
4501 schema.fields().iter().map(|f| f.name().clone()).collect()
4502 }
4503}