1use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::ExecutionPlanProperties;
25use crate::execution_plan::{EmissionType, boundedness_from_children};
26use crate::filter_pushdown::{
27 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
28 FilterPushdownPropagation,
29};
30use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
31use crate::joins::hash_join::shared_bounds::{
32 ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
33};
34use crate::joins::hash_join::stream::{
35 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
36};
37use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
38use crate::joins::utils::{
39 OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
40 swap_join_projection, update_hash,
41};
42use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
43use crate::projection::{
44 EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
45 try_pushdown_through_join,
46};
47use crate::repartition::REPARTITION_RANDOM_STATE;
48use crate::spill::get_record_batch_memory_size;
49use crate::{
50 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
51 PlanProperties, SendableRecordBatchStream, Statistics,
52 common::can_project,
53 joins::utils::{
54 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
55 build_join_schema, check_join_is_valid, estimate_join_statistics,
56 need_produce_result_in_final, symmetric_join_output_partitioning,
57 },
58 metrics::{ExecutionPlanMetricsSet, MetricsSet},
59};
60
61use arrow::array::{ArrayRef, BooleanBufferBuilder};
62use arrow::compute::concat_batches;
63use arrow::datatypes::SchemaRef;
64use arrow::record_batch::RecordBatch;
65use arrow::util::bit_util;
66use arrow_schema::DataType;
67use datafusion_common::config::ConfigOptions;
68use datafusion_common::utils::memory::estimate_memory_size;
69use datafusion_common::{
70 JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, plan_err,
71 project_schema,
72};
73use datafusion_execution::TaskContext;
74use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
75use datafusion_expr::Accumulator;
76use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
77use datafusion_physical_expr::equivalence::{
78 ProjectionMapping, join_equivalence_properties,
79};
80use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
81use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
82
83use ahash::RandomState;
84use datafusion_physical_expr_common::physical_expr::fmt_sql;
85use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
86use futures::TryStreamExt;
87use parking_lot::Mutex;
88
89use super::partitioned_hash_eval::SeededRandomState;
90
91pub(crate) const HASH_JOIN_SEED: SeededRandomState =
93 SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
94
95pub(super) struct JoinLeftData {
97 pub(super) hash_map: Arc<dyn JoinHashMapType>,
100 batch: RecordBatch,
102 values: Vec<ArrayRef>,
104 visited_indices_bitmap: SharedBitmapBuilder,
106 probe_threads_counter: AtomicUsize,
109 _reservation: MemoryReservation,
114 pub(super) bounds: Option<PartitionBounds>,
118 pub(super) membership: PushdownStrategy,
121}
122
123impl JoinLeftData {
124 pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
126 &*self.hash_map
127 }
128
129 pub(super) fn batch(&self) -> &RecordBatch {
131 &self.batch
132 }
133
134 pub(super) fn values(&self) -> &[ArrayRef] {
136 &self.values
137 }
138
139 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
141 &self.visited_indices_bitmap
142 }
143
144 pub(super) fn membership(&self) -> &PushdownStrategy {
146 &self.membership
147 }
148
149 pub(super) fn report_probe_completed(&self) -> bool {
152 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
153 }
154}
155
156#[expect(rustdoc::private_intra_doc_links)]
157pub struct HashJoinExec {
319 pub left: Arc<dyn ExecutionPlan>,
321 pub right: Arc<dyn ExecutionPlan>,
323 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
325 pub filter: Option<JoinFilter>,
327 pub join_type: JoinType,
329 join_schema: SchemaRef,
332 left_fut: Arc<OnceAsync<JoinLeftData>>,
339 random_state: SeededRandomState,
341 pub mode: PartitionMode,
343 metrics: ExecutionPlanMetricsSet,
345 pub projection: Option<Vec<usize>>,
347 column_indices: Vec<ColumnIndex>,
349 pub null_equality: NullEquality,
351 cache: PlanProperties,
353 dynamic_filter: Option<HashJoinExecDynamicFilter>,
357}
358
359#[derive(Clone)]
360struct HashJoinExecDynamicFilter {
361 filter: Arc<DynamicFilterPhysicalExpr>,
363 build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
366}
367
368impl fmt::Debug for HashJoinExec {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 f.debug_struct("HashJoinExec")
371 .field("left", &self.left)
372 .field("right", &self.right)
373 .field("on", &self.on)
374 .field("filter", &self.filter)
375 .field("join_type", &self.join_type)
376 .field("join_schema", &self.join_schema)
377 .field("left_fut", &self.left_fut)
378 .field("random_state", &self.random_state)
379 .field("mode", &self.mode)
380 .field("metrics", &self.metrics)
381 .field("projection", &self.projection)
382 .field("column_indices", &self.column_indices)
383 .field("null_equality", &self.null_equality)
384 .field("cache", &self.cache)
385 .finish()
387 }
388}
389
390impl EmbeddedProjection for HashJoinExec {
391 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
392 self.with_projection(projection)
393 }
394}
395
396impl HashJoinExec {
397 #[expect(clippy::too_many_arguments)]
402 pub fn try_new(
403 left: Arc<dyn ExecutionPlan>,
404 right: Arc<dyn ExecutionPlan>,
405 on: JoinOn,
406 filter: Option<JoinFilter>,
407 join_type: &JoinType,
408 projection: Option<Vec<usize>>,
409 partition_mode: PartitionMode,
410 null_equality: NullEquality,
411 ) -> Result<Self> {
412 let left_schema = left.schema();
413 let right_schema = right.schema();
414 if on.is_empty() {
415 return plan_err!("On constraints in HashJoinExec should be non-empty");
416 }
417
418 check_join_is_valid(&left_schema, &right_schema, &on)?;
419
420 let (join_schema, column_indices) =
421 build_join_schema(&left_schema, &right_schema, join_type);
422
423 let random_state = HASH_JOIN_SEED;
424
425 let join_schema = Arc::new(join_schema);
426
427 can_project(&join_schema, projection.as_ref())?;
429
430 let cache = Self::compute_properties(
431 &left,
432 &right,
433 &join_schema,
434 *join_type,
435 &on,
436 partition_mode,
437 projection.as_ref(),
438 )?;
439
440 Ok(HashJoinExec {
444 left,
445 right,
446 on,
447 filter,
448 join_type: *join_type,
449 join_schema,
450 left_fut: Default::default(),
451 random_state,
452 mode: partition_mode,
453 metrics: ExecutionPlanMetricsSet::new(),
454 projection,
455 column_indices,
456 null_equality,
457 cache,
458 dynamic_filter: None,
459 })
460 }
461
462 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
463 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
466 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
468 }
469
470 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
472 &self.left
473 }
474
475 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
477 &self.right
478 }
479
480 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
482 &self.on
483 }
484
485 pub fn filter(&self) -> Option<&JoinFilter> {
487 self.filter.as_ref()
488 }
489
490 pub fn join_type(&self) -> &JoinType {
492 &self.join_type
493 }
494
495 pub fn join_schema(&self) -> &SchemaRef {
498 &self.join_schema
499 }
500
501 pub fn partition_mode(&self) -> &PartitionMode {
503 &self.mode
504 }
505
506 pub fn null_equality(&self) -> NullEquality {
508 self.null_equality
509 }
510
511 #[doc(hidden)]
516 pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
517 self.dynamic_filter
518 .as_ref()
519 .map(|df| Arc::clone(&df.filter))
520 }
521
522 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
524 vec![
525 false,
526 matches!(
527 join_type,
528 JoinType::Inner
529 | JoinType::Right
530 | JoinType::RightAnti
531 | JoinType::RightSemi
532 | JoinType::RightMark
533 ),
534 ]
535 }
536
537 pub fn probe_side() -> JoinSide {
539 JoinSide::Right
541 }
542
543 pub fn contains_projection(&self) -> bool {
545 self.projection.is_some()
546 }
547
548 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
550 can_project(&self.schema(), projection.as_ref())?;
552 let projection = match projection {
553 Some(projection) => match &self.projection {
554 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
555 None => Some(projection),
556 },
557 None => None,
558 };
559 Self::try_new(
560 Arc::clone(&self.left),
561 Arc::clone(&self.right),
562 self.on.clone(),
563 self.filter.clone(),
564 &self.join_type,
565 projection,
566 self.mode,
567 self.null_equality,
568 )
569 }
570
571 fn compute_properties(
573 left: &Arc<dyn ExecutionPlan>,
574 right: &Arc<dyn ExecutionPlan>,
575 schema: &SchemaRef,
576 join_type: JoinType,
577 on: JoinOnRef,
578 mode: PartitionMode,
579 projection: Option<&Vec<usize>>,
580 ) -> Result<PlanProperties> {
581 let mut eq_properties = join_equivalence_properties(
583 left.equivalence_properties().clone(),
584 right.equivalence_properties().clone(),
585 &join_type,
586 Arc::clone(schema),
587 &Self::maintains_input_order(join_type),
588 Some(Self::probe_side()),
589 on,
590 )?;
591
592 let mut output_partitioning = match mode {
593 PartitionMode::CollectLeft => {
594 asymmetric_join_output_partitioning(left, right, &join_type)?
595 }
596 PartitionMode::Auto => Partitioning::UnknownPartitioning(
597 right.output_partitioning().partition_count(),
598 ),
599 PartitionMode::Partitioned => {
600 symmetric_join_output_partitioning(left, right, &join_type)?
601 }
602 };
603
604 let emission_type = if left.boundedness().is_unbounded() {
605 EmissionType::Final
606 } else if right.pipeline_behavior() == EmissionType::Incremental {
607 match join_type {
608 JoinType::Inner
611 | JoinType::LeftSemi
612 | JoinType::RightSemi
613 | JoinType::Right
614 | JoinType::RightAnti
615 | JoinType::RightMark => EmissionType::Incremental,
616 JoinType::Left
619 | JoinType::LeftAnti
620 | JoinType::LeftMark
621 | JoinType::Full => EmissionType::Both,
622 }
623 } else {
624 right.pipeline_behavior()
625 };
626
627 if let Some(projection) = projection {
629 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
631 let out_schema = project_schema(schema, Some(projection))?;
632 output_partitioning =
633 output_partitioning.project(&projection_mapping, &eq_properties);
634 eq_properties = eq_properties.project(&projection_mapping, out_schema);
635 }
636
637 Ok(PlanProperties::new(
638 eq_properties,
639 output_partitioning,
640 emission_type,
641 boundedness_from_children([left, right]),
642 ))
643 }
644
645 pub fn swap_inputs(
669 &self,
670 partition_mode: PartitionMode,
671 ) -> Result<Arc<dyn ExecutionPlan>> {
672 let left = self.left();
673 let right = self.right();
674 let new_join = HashJoinExec::try_new(
675 Arc::clone(right),
676 Arc::clone(left),
677 self.on()
678 .iter()
679 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
680 .collect(),
681 self.filter().map(JoinFilter::swap),
682 &self.join_type().swap(),
683 swap_join_projection(
684 left.schema().fields().len(),
685 right.schema().fields().len(),
686 self.projection.as_ref(),
687 self.join_type(),
688 ),
689 partition_mode,
690 self.null_equality(),
691 )?;
692 if matches!(
694 self.join_type(),
695 JoinType::LeftSemi
696 | JoinType::RightSemi
697 | JoinType::LeftAnti
698 | JoinType::RightAnti
699 | JoinType::LeftMark
700 | JoinType::RightMark
701 ) || self.projection.is_some()
702 {
703 Ok(Arc::new(new_join))
704 } else {
705 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
706 }
707 }
708}
709
710impl DisplayAs for HashJoinExec {
711 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
712 match t {
713 DisplayFormatType::Default | DisplayFormatType::Verbose => {
714 let display_filter = self.filter.as_ref().map_or_else(
715 || "".to_string(),
716 |f| format!(", filter={}", f.expression()),
717 );
718 let display_projections = if self.contains_projection() {
719 format!(
720 ", projection=[{}]",
721 self.projection
722 .as_ref()
723 .unwrap()
724 .iter()
725 .map(|index| format!(
726 "{}@{}",
727 self.join_schema.fields().get(*index).unwrap().name(),
728 index
729 ))
730 .collect::<Vec<_>>()
731 .join(", ")
732 )
733 } else {
734 "".to_string()
735 };
736 let display_null_equality =
737 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
738 ", NullsEqual: true"
739 } else {
740 ""
741 };
742 let on = self
743 .on
744 .iter()
745 .map(|(c1, c2)| format!("({c1}, {c2})"))
746 .collect::<Vec<String>>()
747 .join(", ");
748 write!(
749 f,
750 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
751 self.mode,
752 self.join_type,
753 on,
754 display_filter,
755 display_projections,
756 display_null_equality,
757 )
758 }
759 DisplayFormatType::TreeRender => {
760 let on = self
761 .on
762 .iter()
763 .map(|(c1, c2)| {
764 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
765 })
766 .collect::<Vec<String>>()
767 .join(", ");
768
769 if *self.join_type() != JoinType::Inner {
770 writeln!(f, "join_type={:?}", self.join_type)?;
771 }
772
773 writeln!(f, "on={on}")?;
774
775 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
776 writeln!(f, "NullsEqual: true")?;
777 }
778
779 if let Some(filter) = self.filter.as_ref() {
780 writeln!(f, "filter={filter}")?;
781 }
782
783 Ok(())
784 }
785 }
786 }
787}
788
789impl ExecutionPlan for HashJoinExec {
790 fn name(&self) -> &'static str {
791 "HashJoinExec"
792 }
793
794 fn as_any(&self) -> &dyn Any {
795 self
796 }
797
798 fn properties(&self) -> &PlanProperties {
799 &self.cache
800 }
801
802 fn required_input_distribution(&self) -> Vec<Distribution> {
803 match self.mode {
804 PartitionMode::CollectLeft => vec![
805 Distribution::SinglePartition,
806 Distribution::UnspecifiedDistribution,
807 ],
808 PartitionMode::Partitioned => {
809 let (left_expr, right_expr) = self
810 .on
811 .iter()
812 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
813 .unzip();
814 vec![
815 Distribution::HashPartitioned(left_expr),
816 Distribution::HashPartitioned(right_expr),
817 ]
818 }
819 PartitionMode::Auto => vec![
820 Distribution::UnspecifiedDistribution,
821 Distribution::UnspecifiedDistribution,
822 ],
823 }
824 }
825
826 fn maintains_input_order(&self) -> Vec<bool> {
843 Self::maintains_input_order(self.join_type)
844 }
845
846 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
847 vec![&self.left, &self.right]
848 }
849
850 fn with_new_children(
856 self: Arc<Self>,
857 children: Vec<Arc<dyn ExecutionPlan>>,
858 ) -> Result<Arc<dyn ExecutionPlan>> {
859 Ok(Arc::new(HashJoinExec {
860 left: Arc::clone(&children[0]),
861 right: Arc::clone(&children[1]),
862 on: self.on.clone(),
863 filter: self.filter.clone(),
864 join_type: self.join_type,
865 join_schema: Arc::clone(&self.join_schema),
866 left_fut: Arc::clone(&self.left_fut),
867 random_state: self.random_state.clone(),
868 mode: self.mode,
869 metrics: ExecutionPlanMetricsSet::new(),
870 projection: self.projection.clone(),
871 column_indices: self.column_indices.clone(),
872 null_equality: self.null_equality,
873 cache: Self::compute_properties(
874 &children[0],
875 &children[1],
876 &self.join_schema,
877 self.join_type,
878 &self.on,
879 self.mode,
880 self.projection.as_ref(),
881 )?,
882 dynamic_filter: self.dynamic_filter.clone(),
884 }))
885 }
886
887 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
888 Ok(Arc::new(HashJoinExec {
889 left: Arc::clone(&self.left),
890 right: Arc::clone(&self.right),
891 on: self.on.clone(),
892 filter: self.filter.clone(),
893 join_type: self.join_type,
894 join_schema: Arc::clone(&self.join_schema),
895 left_fut: Arc::new(OnceAsync::default()),
897 random_state: self.random_state.clone(),
898 mode: self.mode,
899 metrics: ExecutionPlanMetricsSet::new(),
900 projection: self.projection.clone(),
901 column_indices: self.column_indices.clone(),
902 null_equality: self.null_equality,
903 cache: self.cache.clone(),
904 dynamic_filter: None,
906 }))
907 }
908
909 fn execute(
910 &self,
911 partition: usize,
912 context: Arc<TaskContext>,
913 ) -> Result<SendableRecordBatchStream> {
914 let on_left = self
915 .on
916 .iter()
917 .map(|on| Arc::clone(&on.0))
918 .collect::<Vec<_>>();
919 let left_partitions = self.left.output_partitioning().partition_count();
920 let right_partitions = self.right.output_partitioning().partition_count();
921
922 assert_or_internal_err!(
923 self.mode != PartitionMode::Partitioned
924 || left_partitions == right_partitions,
925 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
926 consider using RepartitionExec"
927 );
928
929 assert_or_internal_err!(
930 self.mode != PartitionMode::CollectLeft || left_partitions == 1,
931 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
932 consider using CoalescePartitionsExec or the EnforceDistribution rule"
933 );
934
935 let enable_dynamic_filter_pushdown = context
941 .session_config()
942 .options()
943 .optimizer
944 .enable_join_dynamic_filter_pushdown
945 && self
946 .dynamic_filter
947 .as_ref()
948 .map(|df| df.filter.is_used())
949 .unwrap_or(false);
950
951 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
952 let left_fut = match self.mode {
953 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
954 let left_stream = self.left.execute(0, Arc::clone(&context))?;
955
956 let reservation =
957 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
958
959 Ok(collect_left_input(
960 self.random_state.random_state().clone(),
961 left_stream,
962 on_left.clone(),
963 join_metrics.clone(),
964 reservation,
965 need_produce_result_in_final(self.join_type),
966 self.right().output_partitioning().partition_count(),
967 enable_dynamic_filter_pushdown,
968 context
969 .session_config()
970 .options()
971 .optimizer
972 .hash_join_inlist_pushdown_max_size,
973 context
974 .session_config()
975 .options()
976 .optimizer
977 .hash_join_inlist_pushdown_max_distinct_values,
978 ))
979 })?,
980 PartitionMode::Partitioned => {
981 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
982
983 let reservation =
984 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
985 .register(context.memory_pool());
986
987 OnceFut::new(collect_left_input(
988 self.random_state.random_state().clone(),
989 left_stream,
990 on_left.clone(),
991 join_metrics.clone(),
992 reservation,
993 need_produce_result_in_final(self.join_type),
994 1,
995 enable_dynamic_filter_pushdown,
996 context
997 .session_config()
998 .options()
999 .optimizer
1000 .hash_join_inlist_pushdown_max_size,
1001 context
1002 .session_config()
1003 .options()
1004 .optimizer
1005 .hash_join_inlist_pushdown_max_distinct_values,
1006 ))
1007 }
1008 PartitionMode::Auto => {
1009 return plan_err!(
1010 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1011 PartitionMode::Auto
1012 );
1013 }
1014 };
1015
1016 let batch_size = context.session_config().batch_size();
1017
1018 let repartition_random_state = REPARTITION_RANDOM_STATE;
1021 let build_accumulator = enable_dynamic_filter_pushdown
1022 .then(|| {
1023 self.dynamic_filter.as_ref().map(|df| {
1024 let filter = Arc::clone(&df.filter);
1025 let on_right = self
1026 .on
1027 .iter()
1028 .map(|(_, right_expr)| Arc::clone(right_expr))
1029 .collect::<Vec<_>>();
1030 Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1031 Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1032 self.mode,
1033 self.left.as_ref(),
1034 self.right.as_ref(),
1035 filter,
1036 on_right,
1037 repartition_random_state,
1038 ))
1039 })))
1040 })
1041 })
1042 .flatten()
1043 .flatten();
1044
1045 let right_stream = self.right.execute(partition, context)?;
1048
1049 let column_indices_after_projection = match &self.projection {
1051 Some(projection) => projection
1052 .iter()
1053 .map(|i| self.column_indices[*i].clone())
1054 .collect(),
1055 None => self.column_indices.clone(),
1056 };
1057
1058 let on_right = self
1059 .on
1060 .iter()
1061 .map(|(_, right_expr)| Arc::clone(right_expr))
1062 .collect::<Vec<_>>();
1063
1064 Ok(Box::pin(HashJoinStream::new(
1065 partition,
1066 self.schema(),
1067 on_right,
1068 self.filter.clone(),
1069 self.join_type,
1070 right_stream,
1071 self.random_state.random_state().clone(),
1072 join_metrics,
1073 column_indices_after_projection,
1074 self.null_equality,
1075 HashJoinStreamState::WaitBuildSide,
1076 BuildSide::Initial(BuildSideInitialState { left_fut }),
1077 batch_size,
1078 vec![],
1079 self.right.output_ordering().is_some(),
1080 build_accumulator,
1081 self.mode,
1082 )))
1083 }
1084
1085 fn metrics(&self) -> Option<MetricsSet> {
1086 Some(self.metrics.clone_inner())
1087 }
1088
1089 fn statistics(&self) -> Result<Statistics> {
1090 self.partition_statistics(None)
1091 }
1092
1093 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1094 if partition.is_some() {
1095 return Ok(Statistics::new_unknown(&self.schema()));
1096 }
1097 let stats = estimate_join_statistics(
1101 self.left.partition_statistics(None)?,
1102 self.right.partition_statistics(None)?,
1103 &self.on,
1104 &self.join_type,
1105 &self.join_schema,
1106 )?;
1107 Ok(stats.project(self.projection.as_ref()))
1109 }
1110
1111 fn try_swapping_with_projection(
1115 &self,
1116 projection: &ProjectionExec,
1117 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1118 if self.contains_projection() {
1120 return Ok(None);
1121 }
1122
1123 let schema = self.schema();
1124 if let Some(JoinData {
1125 projected_left_child,
1126 projected_right_child,
1127 join_filter,
1128 join_on,
1129 }) = try_pushdown_through_join(
1130 projection,
1131 self.left(),
1132 self.right(),
1133 self.on(),
1134 &schema,
1135 self.filter(),
1136 )? {
1137 Ok(Some(Arc::new(HashJoinExec::try_new(
1138 Arc::new(projected_left_child),
1139 Arc::new(projected_right_child),
1140 join_on,
1141 join_filter,
1142 self.join_type(),
1143 None,
1145 *self.partition_mode(),
1146 self.null_equality,
1147 )?)))
1148 } else {
1149 try_embed_projection(projection, self)
1150 }
1151 }
1152
1153 fn gather_filters_for_pushdown(
1154 &self,
1155 phase: FilterPushdownPhase,
1156 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1157 config: &ConfigOptions,
1158 ) -> Result<FilterDescription> {
1159 if self.join_type != JoinType::Inner {
1164 return Ok(FilterDescription::all_unsupported(
1165 &parent_filters,
1166 &self.children(),
1167 ));
1168 }
1169
1170 let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1172 &parent_filters,
1173 self.left(),
1174 )?;
1175 let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1176 &parent_filters,
1177 self.right(),
1178 )?;
1179
1180 if matches!(phase, FilterPushdownPhase::Post)
1182 && config.optimizer.enable_join_dynamic_filter_pushdown
1183 {
1184 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1186 right_child = right_child.with_self_filter(dynamic_filter);
1187 }
1188
1189 Ok(FilterDescription::new()
1190 .with_child(left_child)
1191 .with_child(right_child))
1192 }
1193
1194 fn handle_child_pushdown_result(
1195 &self,
1196 _phase: FilterPushdownPhase,
1197 child_pushdown_result: ChildPushdownResult,
1198 _config: &ConfigOptions,
1199 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1200 if self.join_type != JoinType::Inner {
1205 return Ok(FilterPushdownPropagation::all_unsupported(
1209 child_pushdown_result,
1210 ));
1211 }
1212
1213 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1214 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1218 let predicate = Arc::clone(&filter.predicate);
1221 if let Ok(dynamic_filter) =
1222 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1223 {
1224 let new_node = Arc::new(HashJoinExec {
1226 left: Arc::clone(&self.left),
1227 right: Arc::clone(&self.right),
1228 on: self.on.clone(),
1229 filter: self.filter.clone(),
1230 join_type: self.join_type,
1231 join_schema: Arc::clone(&self.join_schema),
1232 left_fut: Arc::clone(&self.left_fut),
1233 random_state: self.random_state.clone(),
1234 mode: self.mode,
1235 metrics: ExecutionPlanMetricsSet::new(),
1236 projection: self.projection.clone(),
1237 column_indices: self.column_indices.clone(),
1238 null_equality: self.null_equality,
1239 cache: self.cache.clone(),
1240 dynamic_filter: Some(HashJoinExecDynamicFilter {
1241 filter: dynamic_filter,
1242 build_accumulator: OnceLock::new(),
1243 }),
1244 });
1245 result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1246 }
1247 }
1248 Ok(result)
1249 }
1250}
1251
1252struct CollectLeftAccumulator {
1262 expr: Arc<dyn PhysicalExpr>,
1264 min: MinAccumulator,
1266 max: MaxAccumulator,
1268}
1269
1270impl CollectLeftAccumulator {
1271 fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1280 fn dictionary_value_type(data_type: &DataType) -> DataType {
1282 match data_type {
1283 DataType::Dictionary(_, value_type) => {
1284 dictionary_value_type(value_type.as_ref())
1285 }
1286 _ => data_type.clone(),
1287 }
1288 }
1289
1290 let data_type = expr
1291 .data_type(schema)
1292 .map(|dt| dictionary_value_type(&dt))?;
1294 Ok(Self {
1295 expr,
1296 min: MinAccumulator::try_new(&data_type)?,
1297 max: MaxAccumulator::try_new(&data_type)?,
1298 })
1299 }
1300
1301 fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1312 let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1313 self.min.update_batch(std::slice::from_ref(&array))?;
1314 self.max.update_batch(std::slice::from_ref(&array))?;
1315 Ok(())
1316 }
1317
1318 fn evaluate(mut self) -> Result<ColumnBounds> {
1325 Ok(ColumnBounds::new(
1326 self.min.evaluate()?,
1327 self.max.evaluate()?,
1328 ))
1329 }
1330}
1331
1332struct BuildSideState {
1334 batches: Vec<RecordBatch>,
1335 num_rows: usize,
1336 metrics: BuildProbeJoinMetrics,
1337 reservation: MemoryReservation,
1338 bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1339}
1340
1341impl BuildSideState {
1342 fn try_new(
1344 metrics: BuildProbeJoinMetrics,
1345 reservation: MemoryReservation,
1346 on_left: Vec<Arc<dyn PhysicalExpr>>,
1347 schema: &SchemaRef,
1348 should_compute_dynamic_filters: bool,
1349 ) -> Result<Self> {
1350 Ok(Self {
1351 batches: Vec::new(),
1352 num_rows: 0,
1353 metrics,
1354 reservation,
1355 bounds_accumulators: should_compute_dynamic_filters
1356 .then(|| {
1357 on_left
1358 .into_iter()
1359 .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1360 .collect::<Result<Vec<_>>>()
1361 })
1362 .transpose()?,
1363 })
1364 }
1365}
1366
1367#[expect(clippy::too_many_arguments)]
1396async fn collect_left_input(
1397 random_state: RandomState,
1398 left_stream: SendableRecordBatchStream,
1399 on_left: Vec<PhysicalExprRef>,
1400 metrics: BuildProbeJoinMetrics,
1401 reservation: MemoryReservation,
1402 with_visited_indices_bitmap: bool,
1403 probe_threads_count: usize,
1404 should_compute_dynamic_filters: bool,
1405 max_inlist_size: usize,
1406 max_inlist_distinct_values: usize,
1407) -> Result<JoinLeftData> {
1408 let schema = left_stream.schema();
1409
1410 let initial = BuildSideState::try_new(
1414 metrics,
1415 reservation,
1416 on_left.clone(),
1417 &schema,
1418 should_compute_dynamic_filters,
1419 )?;
1420
1421 let state = left_stream
1422 .try_fold(initial, |mut state, batch| async move {
1423 if let Some(ref mut accumulators) = state.bounds_accumulators {
1425 for accumulator in accumulators {
1426 accumulator.update_batch(&batch)?;
1427 }
1428 }
1429
1430 let batch_size = get_record_batch_memory_size(&batch);
1432 state.reservation.try_grow(batch_size)?;
1434 state.metrics.build_mem_used.add(batch_size);
1436 state.metrics.build_input_batches.add(1);
1437 state.metrics.build_input_rows.add(batch.num_rows());
1438 state.num_rows += batch.num_rows();
1440 state.batches.push(batch);
1442 Ok(state)
1443 })
1444 .await?;
1445
1446 let BuildSideState {
1448 batches,
1449 num_rows,
1450 metrics,
1451 mut reservation,
1452 bounds_accumulators,
1453 } = state;
1454
1455 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1458 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1459
1460 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1464 let estimated_hashtable_size =
1465 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1466 reservation.try_grow(estimated_hashtable_size)?;
1467 metrics.build_mem_used.add(estimated_hashtable_size);
1468 Box::new(JoinHashMapU64::with_capacity(num_rows))
1469 } else {
1470 let estimated_hashtable_size =
1471 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1472 reservation.try_grow(estimated_hashtable_size)?;
1473 metrics.build_mem_used.add(estimated_hashtable_size);
1474 Box::new(JoinHashMapU32::with_capacity(num_rows))
1475 };
1476
1477 let mut hashes_buffer = Vec::new();
1478 let mut offset = 0;
1479
1480 let batches_iter = batches.iter().rev();
1482 for batch in batches_iter.clone() {
1483 hashes_buffer.clear();
1484 hashes_buffer.resize(batch.num_rows(), 0);
1485 update_hash(
1486 &on_left,
1487 batch,
1488 &mut *hashmap,
1489 offset,
1490 &random_state,
1491 &mut hashes_buffer,
1492 0,
1493 true,
1494 )?;
1495 offset += batch.num_rows();
1496 }
1497 let batch = concat_batches(&schema, batches_iter)?;
1499
1500 let visited_indices_bitmap = if with_visited_indices_bitmap {
1502 let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
1503 reservation.try_grow(bitmap_size)?;
1504 metrics.build_mem_used.add(bitmap_size);
1505
1506 let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
1507 bitmap_buffer.append_n(num_rows, false);
1508 bitmap_buffer
1509 } else {
1510 BooleanBufferBuilder::new(0)
1511 };
1512
1513 let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
1514
1515 let bounds = match bounds_accumulators {
1517 Some(accumulators) if num_rows > 0 => {
1518 let bounds = accumulators
1519 .into_iter()
1520 .map(CollectLeftAccumulator::evaluate)
1521 .collect::<Result<Vec<_>>>()?;
1522 Some(PartitionBounds::new(bounds))
1523 }
1524 _ => None,
1525 };
1526
1527 let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
1529
1530 let membership = if num_rows == 0 {
1531 PushdownStrategy::Empty
1532 } else {
1533 let estimated_size = left_values
1537 .iter()
1538 .map(|arr| arr.get_array_memory_size())
1539 .sum::<usize>();
1540 if left_values.is_empty()
1541 || left_values[0].is_empty()
1542 || estimated_size > max_inlist_size
1543 || hash_map.len() > max_inlist_distinct_values
1544 {
1545 PushdownStrategy::HashTable(Arc::clone(&hash_map))
1546 } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
1547 PushdownStrategy::InList(in_list_values)
1548 } else {
1549 PushdownStrategy::HashTable(Arc::clone(&hash_map))
1550 }
1551 };
1552
1553 let data = JoinLeftData {
1554 hash_map,
1555 batch,
1556 values: left_values,
1557 visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
1558 probe_threads_counter: AtomicUsize::new(probe_threads_count),
1559 _reservation: reservation,
1560 bounds,
1561 membership,
1562 };
1563
1564 Ok(data)
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569 use super::*;
1570 use crate::coalesce_partitions::CoalescePartitionsExec;
1571 use crate::joins::hash_join::stream::lookup_join_hashmap;
1572 use crate::test::{TestMemoryExec, assert_join_metrics};
1573 use crate::{
1574 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1575 test::exec::MockExec,
1576 };
1577
1578 use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1579 use arrow::buffer::NullBuffer;
1580 use arrow::datatypes::{DataType, Field};
1581 use arrow_schema::Schema;
1582 use datafusion_common::hash_utils::create_hashes;
1583 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1584 use datafusion_common::{
1585 ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
1586 exec_err, internal_err,
1587 };
1588 use datafusion_execution::config::SessionConfig;
1589 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1590 use datafusion_expr::Operator;
1591 use datafusion_physical_expr::PhysicalExpr;
1592 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1593 use hashbrown::HashTable;
1594 use insta::{allow_duplicates, assert_snapshot};
1595 use rstest::*;
1596 use rstest_reuse::*;
1597
1598 fn div_ceil(a: usize, b: usize) -> usize {
1599 a.div_ceil(b)
1600 }
1601
1602 #[template]
1603 #[rstest]
1604 fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1605
1606 fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1607 let session_config = SessionConfig::default().with_batch_size(batch_size);
1608 Arc::new(TaskContext::default().with_session_config(session_config))
1609 }
1610
1611 fn build_table(
1612 a: (&str, &Vec<i32>),
1613 b: (&str, &Vec<i32>),
1614 c: (&str, &Vec<i32>),
1615 ) -> Arc<dyn ExecutionPlan> {
1616 let batch = build_table_i32(a, b, c);
1617 let schema = batch.schema();
1618 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1619 }
1620
1621 fn join(
1622 left: Arc<dyn ExecutionPlan>,
1623 right: Arc<dyn ExecutionPlan>,
1624 on: JoinOn,
1625 join_type: &JoinType,
1626 null_equality: NullEquality,
1627 ) -> Result<HashJoinExec> {
1628 HashJoinExec::try_new(
1629 left,
1630 right,
1631 on,
1632 None,
1633 join_type,
1634 None,
1635 PartitionMode::CollectLeft,
1636 null_equality,
1637 )
1638 }
1639
1640 fn join_with_filter(
1641 left: Arc<dyn ExecutionPlan>,
1642 right: Arc<dyn ExecutionPlan>,
1643 on: JoinOn,
1644 filter: JoinFilter,
1645 join_type: &JoinType,
1646 null_equality: NullEquality,
1647 ) -> Result<HashJoinExec> {
1648 HashJoinExec::try_new(
1649 left,
1650 right,
1651 on,
1652 Some(filter),
1653 join_type,
1654 None,
1655 PartitionMode::CollectLeft,
1656 null_equality,
1657 )
1658 }
1659
1660 async fn join_collect(
1661 left: Arc<dyn ExecutionPlan>,
1662 right: Arc<dyn ExecutionPlan>,
1663 on: JoinOn,
1664 join_type: &JoinType,
1665 null_equality: NullEquality,
1666 context: Arc<TaskContext>,
1667 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1668 let join = join(left, right, on, join_type, null_equality)?;
1669 let columns_header = columns(&join.schema());
1670
1671 let stream = join.execute(0, context)?;
1672 let batches = common::collect(stream).await?;
1673 let metrics = join.metrics().unwrap();
1674
1675 Ok((columns_header, batches, metrics))
1676 }
1677
1678 async fn partitioned_join_collect(
1679 left: Arc<dyn ExecutionPlan>,
1680 right: Arc<dyn ExecutionPlan>,
1681 on: JoinOn,
1682 join_type: &JoinType,
1683 null_equality: NullEquality,
1684 context: Arc<TaskContext>,
1685 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1686 join_collect_with_partition_mode(
1687 left,
1688 right,
1689 on,
1690 join_type,
1691 PartitionMode::Partitioned,
1692 null_equality,
1693 context,
1694 )
1695 .await
1696 }
1697
1698 async fn join_collect_with_partition_mode(
1699 left: Arc<dyn ExecutionPlan>,
1700 right: Arc<dyn ExecutionPlan>,
1701 on: JoinOn,
1702 join_type: &JoinType,
1703 partition_mode: PartitionMode,
1704 null_equality: NullEquality,
1705 context: Arc<TaskContext>,
1706 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1707 let partition_count = 4;
1708
1709 let (left_expr, right_expr) = on
1710 .iter()
1711 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1712 .unzip();
1713
1714 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1715 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1716 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1717 left,
1718 Partitioning::Hash(left_expr, partition_count),
1719 )?),
1720 PartitionMode::Auto => {
1721 return internal_err!("Unexpected PartitionMode::Auto in join tests");
1722 }
1723 };
1724
1725 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1726 PartitionMode::CollectLeft => {
1727 let partition_column_name = right.schema().field(0).name().clone();
1728 let partition_expr = vec![Arc::new(Column::new_with_schema(
1729 &partition_column_name,
1730 &right.schema(),
1731 )?) as _];
1732 Arc::new(RepartitionExec::try_new(
1733 right,
1734 Partitioning::Hash(partition_expr, partition_count),
1735 )?) as _
1736 }
1737 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1738 right,
1739 Partitioning::Hash(right_expr, partition_count),
1740 )?),
1741 PartitionMode::Auto => {
1742 return internal_err!("Unexpected PartitionMode::Auto in join tests");
1743 }
1744 };
1745
1746 let join = HashJoinExec::try_new(
1747 left_repartitioned,
1748 right_repartitioned,
1749 on,
1750 None,
1751 join_type,
1752 None,
1753 partition_mode,
1754 null_equality,
1755 )?;
1756
1757 let columns = columns(&join.schema());
1758
1759 let mut batches = vec![];
1760 for i in 0..partition_count {
1761 let stream = join.execute(i, Arc::clone(&context))?;
1762 let more_batches = common::collect(stream).await?;
1763 batches.extend(
1764 more_batches
1765 .into_iter()
1766 .filter(|b| b.num_rows() > 0)
1767 .collect::<Vec<_>>(),
1768 );
1769 }
1770 let metrics = join.metrics().unwrap();
1771
1772 Ok((columns, batches, metrics))
1773 }
1774
1775 #[apply(batch_sizes)]
1776 #[tokio::test]
1777 async fn join_inner_one(batch_size: usize) -> Result<()> {
1778 let task_ctx = prepare_task_ctx(batch_size);
1779 let left = build_table(
1780 ("a1", &vec![1, 2, 3]),
1781 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1783 );
1784 let right = build_table(
1785 ("a2", &vec![10, 20, 30]),
1786 ("b1", &vec![4, 5, 6]),
1787 ("c2", &vec![70, 80, 90]),
1788 );
1789
1790 let on = vec![(
1791 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1792 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1793 )];
1794
1795 let (columns, batches, metrics) = join_collect(
1796 Arc::clone(&left),
1797 Arc::clone(&right),
1798 on.clone(),
1799 &JoinType::Inner,
1800 NullEquality::NullEqualsNothing,
1801 task_ctx,
1802 )
1803 .await?;
1804
1805 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1806
1807 allow_duplicates! {
1808 assert_snapshot!(batches_to_string(&batches), @r"
1810 +----+----+----+----+----+----+
1811 | a1 | b1 | c1 | a2 | b1 | c2 |
1812 +----+----+----+----+----+----+
1813 | 1 | 4 | 7 | 10 | 4 | 70 |
1814 | 2 | 5 | 8 | 20 | 5 | 80 |
1815 | 3 | 5 | 9 | 20 | 5 | 80 |
1816 +----+----+----+----+----+----+
1817 ");
1818 }
1819
1820 assert_join_metrics!(metrics, 3);
1821
1822 Ok(())
1823 }
1824
1825 #[apply(batch_sizes)]
1826 #[tokio::test]
1827 async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1828 let task_ctx = prepare_task_ctx(batch_size);
1829 let left = build_table(
1830 ("a1", &vec![1, 2, 3]),
1831 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1833 );
1834 let right = build_table(
1835 ("a2", &vec![10, 20, 30]),
1836 ("b1", &vec![4, 5, 6]),
1837 ("c2", &vec![70, 80, 90]),
1838 );
1839 let on = vec![(
1840 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1841 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1842 )];
1843
1844 let (columns, batches, metrics) = partitioned_join_collect(
1845 Arc::clone(&left),
1846 Arc::clone(&right),
1847 on.clone(),
1848 &JoinType::Inner,
1849 NullEquality::NullEqualsNothing,
1850 task_ctx,
1851 )
1852 .await?;
1853
1854 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1855
1856 allow_duplicates! {
1857 assert_snapshot!(batches_to_sort_string(&batches), @r"
1858 +----+----+----+----+----+----+
1859 | a1 | b1 | c1 | a2 | b1 | c2 |
1860 +----+----+----+----+----+----+
1861 | 1 | 4 | 7 | 10 | 4 | 70 |
1862 | 2 | 5 | 8 | 20 | 5 | 80 |
1863 | 3 | 5 | 9 | 20 | 5 | 80 |
1864 +----+----+----+----+----+----+
1865 ");
1866 }
1867
1868 assert_join_metrics!(metrics, 3);
1869
1870 Ok(())
1871 }
1872
1873 #[tokio::test]
1874 async fn join_inner_one_no_shared_column_names() -> Result<()> {
1875 let task_ctx = Arc::new(TaskContext::default());
1876 let left = build_table(
1877 ("a1", &vec![1, 2, 3]),
1878 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1880 );
1881 let right = build_table(
1882 ("a2", &vec![10, 20, 30]),
1883 ("b2", &vec![4, 5, 6]),
1884 ("c2", &vec![70, 80, 90]),
1885 );
1886 let on = vec![(
1887 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1888 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1889 )];
1890
1891 let (columns, batches, metrics) = join_collect(
1892 left,
1893 right,
1894 on,
1895 &JoinType::Inner,
1896 NullEquality::NullEqualsNothing,
1897 task_ctx,
1898 )
1899 .await?;
1900
1901 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1902
1903 allow_duplicates! {
1905 assert_snapshot!(batches_to_string(&batches), @r"
1906 +----+----+----+----+----+----+
1907 | a1 | b1 | c1 | a2 | b2 | c2 |
1908 +----+----+----+----+----+----+
1909 | 1 | 4 | 7 | 10 | 4 | 70 |
1910 | 2 | 5 | 8 | 20 | 5 | 80 |
1911 | 3 | 5 | 9 | 20 | 5 | 80 |
1912 +----+----+----+----+----+----+
1913 ");
1914 }
1915
1916 assert_join_metrics!(metrics, 3);
1917
1918 Ok(())
1919 }
1920
1921 #[tokio::test]
1922 async fn join_inner_one_randomly_ordered() -> Result<()> {
1923 let task_ctx = Arc::new(TaskContext::default());
1924 let left = build_table(
1925 ("a1", &vec![0, 3, 2, 1]),
1926 ("b1", &vec![4, 5, 5, 4]),
1927 ("c1", &vec![6, 9, 8, 7]),
1928 );
1929 let right = build_table(
1930 ("a2", &vec![20, 30, 10]),
1931 ("b2", &vec![5, 6, 4]),
1932 ("c2", &vec![80, 90, 70]),
1933 );
1934 let on = vec![(
1935 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1936 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1937 )];
1938
1939 let (columns, batches, metrics) = join_collect(
1940 left,
1941 right,
1942 on,
1943 &JoinType::Inner,
1944 NullEquality::NullEqualsNothing,
1945 task_ctx,
1946 )
1947 .await?;
1948
1949 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1950
1951 allow_duplicates! {
1953 assert_snapshot!(batches_to_string(&batches), @r"
1954 +----+----+----+----+----+----+
1955 | a1 | b1 | c1 | a2 | b2 | c2 |
1956 +----+----+----+----+----+----+
1957 | 3 | 5 | 9 | 20 | 5 | 80 |
1958 | 2 | 5 | 8 | 20 | 5 | 80 |
1959 | 0 | 4 | 6 | 10 | 4 | 70 |
1960 | 1 | 4 | 7 | 10 | 4 | 70 |
1961 +----+----+----+----+----+----+
1962 ");
1963 }
1964
1965 assert_join_metrics!(metrics, 4);
1966
1967 Ok(())
1968 }
1969
1970 #[apply(batch_sizes)]
1971 #[tokio::test]
1972 async fn join_inner_two(batch_size: usize) -> Result<()> {
1973 let task_ctx = prepare_task_ctx(batch_size);
1974 let left = build_table(
1975 ("a1", &vec![1, 2, 2]),
1976 ("b2", &vec![1, 2, 2]),
1977 ("c1", &vec![7, 8, 9]),
1978 );
1979 let right = build_table(
1980 ("a1", &vec![1, 2, 3]),
1981 ("b2", &vec![1, 2, 2]),
1982 ("c2", &vec![70, 80, 90]),
1983 );
1984 let on = vec![
1985 (
1986 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1987 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1988 ),
1989 (
1990 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1991 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1992 ),
1993 ];
1994
1995 let (columns, batches, metrics) = join_collect(
1996 left,
1997 right,
1998 on,
1999 &JoinType::Inner,
2000 NullEquality::NullEqualsNothing,
2001 task_ctx,
2002 )
2003 .await?;
2004
2005 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2006
2007 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2008 let mut expected_batch_count = div_ceil(3, batch_size);
2011 if batch_size == 1 {
2012 expected_batch_count += 1;
2013 }
2014 expected_batch_count
2015 } else {
2016 div_ceil(9, batch_size)
2019 };
2020
2021 assert!(
2023 batches.len() <= expected_batch_count,
2024 "expected at most {expected_batch_count} batches, got {}",
2025 batches.len()
2026 );
2027
2028 allow_duplicates! {
2030 assert_snapshot!(batches_to_string(&batches), @r"
2031 +----+----+----+----+----+----+
2032 | a1 | b2 | c1 | a1 | b2 | c2 |
2033 +----+----+----+----+----+----+
2034 | 1 | 1 | 7 | 1 | 1 | 70 |
2035 | 2 | 2 | 8 | 2 | 2 | 80 |
2036 | 2 | 2 | 9 | 2 | 2 | 80 |
2037 +----+----+----+----+----+----+
2038 ");
2039 }
2040
2041 assert_join_metrics!(metrics, 3);
2042
2043 Ok(())
2044 }
2045
2046 #[apply(batch_sizes)]
2048 #[tokio::test]
2049 async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
2050 let task_ctx = prepare_task_ctx(batch_size);
2051 let batch1 = build_table_i32(
2052 ("a1", &vec![1, 2]),
2053 ("b2", &vec![1, 2]),
2054 ("c1", &vec![7, 8]),
2055 );
2056 let batch2 =
2057 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2058 let schema = batch1.schema();
2059 let left =
2060 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2061 .unwrap();
2062 let left = Arc::new(CoalescePartitionsExec::new(left));
2063
2064 let right = build_table(
2065 ("a1", &vec![1, 2, 3]),
2066 ("b2", &vec![1, 2, 2]),
2067 ("c2", &vec![70, 80, 90]),
2068 );
2069 let on = vec![
2070 (
2071 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2072 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2073 ),
2074 (
2075 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2076 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2077 ),
2078 ];
2079
2080 let (columns, batches, metrics) = join_collect(
2081 left,
2082 right,
2083 on,
2084 &JoinType::Inner,
2085 NullEquality::NullEqualsNothing,
2086 task_ctx,
2087 )
2088 .await?;
2089
2090 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2091
2092 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2093 let mut expected_batch_count = div_ceil(3, batch_size);
2096 if batch_size == 1 {
2097 expected_batch_count += 1;
2098 }
2099 expected_batch_count
2100 } else {
2101 div_ceil(9, batch_size)
2104 };
2105
2106 assert!(
2108 batches.len() <= expected_batch_count,
2109 "expected at most {expected_batch_count} batches, got {}",
2110 batches.len()
2111 );
2112
2113 allow_duplicates! {
2115 assert_snapshot!(batches_to_string(&batches), @r"
2116 +----+----+----+----+----+----+
2117 | a1 | b2 | c1 | a1 | b2 | c2 |
2118 +----+----+----+----+----+----+
2119 | 1 | 1 | 7 | 1 | 1 | 70 |
2120 | 2 | 2 | 8 | 2 | 2 | 80 |
2121 | 2 | 2 | 9 | 2 | 2 | 80 |
2122 +----+----+----+----+----+----+
2123 ");
2124 }
2125
2126 assert_join_metrics!(metrics, 3);
2127
2128 Ok(())
2129 }
2130
2131 #[tokio::test]
2132 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2133 let task_ctx = Arc::new(TaskContext::default());
2134 let batch1 = build_table_i32(
2135 ("a1", &vec![0, 3]),
2136 ("b1", &vec![4, 5]),
2137 ("c1", &vec![6, 9]),
2138 );
2139 let batch2 = build_table_i32(
2140 ("a1", &vec![2, 1]),
2141 ("b1", &vec![5, 4]),
2142 ("c1", &vec![8, 7]),
2143 );
2144 let schema = batch1.schema();
2145
2146 let left =
2147 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2148 .unwrap();
2149 let left = Arc::new(CoalescePartitionsExec::new(left));
2150 let right = build_table(
2151 ("a2", &vec![20, 30, 10]),
2152 ("b2", &vec![5, 6, 4]),
2153 ("c2", &vec![80, 90, 70]),
2154 );
2155 let on = vec![(
2156 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2157 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2158 )];
2159
2160 let (columns, batches, metrics) = join_collect(
2161 left,
2162 right,
2163 on,
2164 &JoinType::Inner,
2165 NullEquality::NullEqualsNothing,
2166 task_ctx,
2167 )
2168 .await?;
2169
2170 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2171
2172 allow_duplicates! {
2174 assert_snapshot!(batches_to_string(&batches), @r"
2175 +----+----+----+----+----+----+
2176 | a1 | b1 | c1 | a2 | b2 | c2 |
2177 +----+----+----+----+----+----+
2178 | 3 | 5 | 9 | 20 | 5 | 80 |
2179 | 2 | 5 | 8 | 20 | 5 | 80 |
2180 | 0 | 4 | 6 | 10 | 4 | 70 |
2181 | 1 | 4 | 7 | 10 | 4 | 70 |
2182 +----+----+----+----+----+----+
2183 ");
2184 }
2185
2186 assert_join_metrics!(metrics, 4);
2187
2188 Ok(())
2189 }
2190
2191 #[apply(batch_sizes)]
2193 #[tokio::test]
2194 async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
2195 let task_ctx = prepare_task_ctx(batch_size);
2196 let left = build_table(
2197 ("a1", &vec![1, 2, 3]),
2198 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2200 );
2201
2202 let batch1 = build_table_i32(
2203 ("a2", &vec![10, 20]),
2204 ("b1", &vec![4, 6]),
2205 ("c2", &vec![70, 80]),
2206 );
2207 let batch2 =
2208 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2209 let schema = batch1.schema();
2210 let right =
2211 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2212 .unwrap();
2213
2214 let on = vec![(
2215 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2216 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2217 )];
2218
2219 let join = join(
2220 left,
2221 right,
2222 on,
2223 &JoinType::Inner,
2224 NullEquality::NullEqualsNothing,
2225 )?;
2226
2227 let columns = columns(&join.schema());
2228 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2229
2230 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2232 let batches = common::collect(stream).await?;
2233
2234 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2235 let mut expected_batch_count = div_ceil(1, batch_size);
2238 if batch_size == 1 {
2239 expected_batch_count += 1;
2240 }
2241 expected_batch_count
2242 } else {
2243 div_ceil(6, batch_size)
2246 };
2247 assert!(
2249 batches.len() <= expected_batch_count,
2250 "expected at most {expected_batch_count} batches, got {}",
2251 batches.len()
2252 );
2253
2254 allow_duplicates! {
2256 assert_snapshot!(batches_to_string(&batches), @r"
2257 +----+----+----+----+----+----+
2258 | a1 | b1 | c1 | a2 | b1 | c2 |
2259 +----+----+----+----+----+----+
2260 | 1 | 4 | 7 | 10 | 4 | 70 |
2261 +----+----+----+----+----+----+
2262 ");
2263 }
2264
2265 let stream = join.execute(1, Arc::clone(&task_ctx))?;
2267 let batches = common::collect(stream).await?;
2268
2269 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2270 div_ceil(2, batch_size)
2272 } else {
2273 div_ceil(3, batch_size)
2276 };
2277 assert!(
2279 batches.len() <= expected_batch_count,
2280 "expected at most {expected_batch_count} batches, got {}",
2281 batches.len()
2282 );
2283
2284 allow_duplicates! {
2286 assert_snapshot!(batches_to_string(&batches), @r"
2287 +----+----+----+----+----+----+
2288 | a1 | b1 | c1 | a2 | b1 | c2 |
2289 +----+----+----+----+----+----+
2290 | 2 | 5 | 8 | 30 | 5 | 90 |
2291 | 3 | 5 | 9 | 30 | 5 | 90 |
2292 +----+----+----+----+----+----+
2293 ");
2294 }
2295
2296 Ok(())
2297 }
2298
2299 fn build_table_two_batches(
2300 a: (&str, &Vec<i32>),
2301 b: (&str, &Vec<i32>),
2302 c: (&str, &Vec<i32>),
2303 ) -> Arc<dyn ExecutionPlan> {
2304 let batch = build_table_i32(a, b, c);
2305 let schema = batch.schema();
2306 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2307 }
2308
2309 #[apply(batch_sizes)]
2310 #[tokio::test]
2311 async fn join_left_multi_batch(batch_size: usize) {
2312 let task_ctx = prepare_task_ctx(batch_size);
2313 let left = build_table(
2314 ("a1", &vec![1, 2, 3]),
2315 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2317 );
2318 let right = build_table_two_batches(
2319 ("a2", &vec![10, 20, 30]),
2320 ("b1", &vec![4, 5, 6]),
2321 ("c2", &vec![70, 80, 90]),
2322 );
2323 let on = vec![(
2324 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2325 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2326 )];
2327
2328 let join = join(
2329 left,
2330 right,
2331 on,
2332 &JoinType::Left,
2333 NullEquality::NullEqualsNothing,
2334 )
2335 .unwrap();
2336
2337 let columns = columns(&join.schema());
2338 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2339
2340 let stream = join.execute(0, task_ctx).unwrap();
2341 let batches = common::collect(stream).await.unwrap();
2342
2343 allow_duplicates! {
2344 assert_snapshot!(batches_to_sort_string(&batches), @r"
2345 +----+----+----+----+----+----+
2346 | a1 | b1 | c1 | a2 | b1 | c2 |
2347 +----+----+----+----+----+----+
2348 | 1 | 4 | 7 | 10 | 4 | 70 |
2349 | 1 | 4 | 7 | 10 | 4 | 70 |
2350 | 2 | 5 | 8 | 20 | 5 | 80 |
2351 | 2 | 5 | 8 | 20 | 5 | 80 |
2352 | 3 | 7 | 9 | | | |
2353 +----+----+----+----+----+----+
2354 ");
2355 }
2356 }
2357
2358 #[apply(batch_sizes)]
2359 #[tokio::test]
2360 async fn join_full_multi_batch(batch_size: usize) {
2361 let task_ctx = prepare_task_ctx(batch_size);
2362 let left = build_table(
2363 ("a1", &vec![1, 2, 3]),
2364 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2366 );
2367 let right = build_table_two_batches(
2369 ("a2", &vec![10, 20, 30]),
2370 ("b2", &vec![4, 5, 6]),
2371 ("c2", &vec![70, 80, 90]),
2372 );
2373 let on = vec![(
2374 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2375 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2376 )];
2377
2378 let join = join(
2379 left,
2380 right,
2381 on,
2382 &JoinType::Full,
2383 NullEquality::NullEqualsNothing,
2384 )
2385 .unwrap();
2386
2387 let columns = columns(&join.schema());
2388 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2389
2390 let stream = join.execute(0, task_ctx).unwrap();
2391 let batches = common::collect(stream).await.unwrap();
2392
2393 allow_duplicates! {
2394 assert_snapshot!(batches_to_sort_string(&batches), @r"
2395 +----+----+----+----+----+----+
2396 | a1 | b1 | c1 | a2 | b2 | c2 |
2397 +----+----+----+----+----+----+
2398 | | | | 30 | 6 | 90 |
2399 | | | | 30 | 6 | 90 |
2400 | 1 | 4 | 7 | 10 | 4 | 70 |
2401 | 1 | 4 | 7 | 10 | 4 | 70 |
2402 | 2 | 5 | 8 | 20 | 5 | 80 |
2403 | 2 | 5 | 8 | 20 | 5 | 80 |
2404 | 3 | 7 | 9 | | | |
2405 +----+----+----+----+----+----+
2406 ");
2407 }
2408 }
2409
2410 #[apply(batch_sizes)]
2411 #[tokio::test]
2412 async fn join_left_empty_right(batch_size: usize) {
2413 let task_ctx = prepare_task_ctx(batch_size);
2414 let left = build_table(
2415 ("a1", &vec![1, 2, 3]),
2416 ("b1", &vec![4, 5, 7]),
2417 ("c1", &vec![7, 8, 9]),
2418 );
2419 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2420 let on = vec![(
2421 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2422 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2423 )];
2424 let schema = right.schema();
2425 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2426 let join = join(
2427 left,
2428 right,
2429 on,
2430 &JoinType::Left,
2431 NullEquality::NullEqualsNothing,
2432 )
2433 .unwrap();
2434
2435 let columns = columns(&join.schema());
2436 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2437
2438 let stream = join.execute(0, task_ctx).unwrap();
2439 let batches = common::collect(stream).await.unwrap();
2440
2441 allow_duplicates! {
2442 assert_snapshot!(batches_to_sort_string(&batches), @r"
2443 +----+----+----+----+----+----+
2444 | a1 | b1 | c1 | a2 | b1 | c2 |
2445 +----+----+----+----+----+----+
2446 | 1 | 4 | 7 | | | |
2447 | 2 | 5 | 8 | | | |
2448 | 3 | 7 | 9 | | | |
2449 +----+----+----+----+----+----+
2450 ");
2451 }
2452 }
2453
2454 #[apply(batch_sizes)]
2455 #[tokio::test]
2456 async fn join_full_empty_right(batch_size: usize) {
2457 let task_ctx = prepare_task_ctx(batch_size);
2458 let left = build_table(
2459 ("a1", &vec![1, 2, 3]),
2460 ("b1", &vec![4, 5, 7]),
2461 ("c1", &vec![7, 8, 9]),
2462 );
2463 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2464 let on = vec![(
2465 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2466 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2467 )];
2468 let schema = right.schema();
2469 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2470 let join = join(
2471 left,
2472 right,
2473 on,
2474 &JoinType::Full,
2475 NullEquality::NullEqualsNothing,
2476 )
2477 .unwrap();
2478
2479 let columns = columns(&join.schema());
2480 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2481
2482 let stream = join.execute(0, task_ctx).unwrap();
2483 let batches = common::collect(stream).await.unwrap();
2484
2485 allow_duplicates! {
2486 assert_snapshot!(batches_to_sort_string(&batches), @r"
2487 +----+----+----+----+----+----+
2488 | a1 | b1 | c1 | a2 | b2 | c2 |
2489 +----+----+----+----+----+----+
2490 | 1 | 4 | 7 | | | |
2491 | 2 | 5 | 8 | | | |
2492 | 3 | 7 | 9 | | | |
2493 +----+----+----+----+----+----+
2494 ");
2495 }
2496 }
2497
2498 #[apply(batch_sizes)]
2499 #[tokio::test]
2500 async fn join_left_one(batch_size: usize) -> Result<()> {
2501 let task_ctx = prepare_task_ctx(batch_size);
2502 let left = build_table(
2503 ("a1", &vec![1, 2, 3]),
2504 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2506 );
2507 let right = build_table(
2508 ("a2", &vec![10, 20, 30]),
2509 ("b1", &vec![4, 5, 6]),
2510 ("c2", &vec![70, 80, 90]),
2511 );
2512 let on = vec![(
2513 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2514 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2515 )];
2516
2517 let (columns, batches, metrics) = join_collect(
2518 Arc::clone(&left),
2519 Arc::clone(&right),
2520 on.clone(),
2521 &JoinType::Left,
2522 NullEquality::NullEqualsNothing,
2523 task_ctx,
2524 )
2525 .await?;
2526
2527 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2528
2529 allow_duplicates! {
2530 assert_snapshot!(batches_to_sort_string(&batches), @r"
2531 +----+----+----+----+----+----+
2532 | a1 | b1 | c1 | a2 | b1 | c2 |
2533 +----+----+----+----+----+----+
2534 | 1 | 4 | 7 | 10 | 4 | 70 |
2535 | 2 | 5 | 8 | 20 | 5 | 80 |
2536 | 3 | 7 | 9 | | | |
2537 +----+----+----+----+----+----+
2538 ");
2539 }
2540
2541 assert_join_metrics!(metrics, 3);
2542
2543 Ok(())
2544 }
2545
2546 #[apply(batch_sizes)]
2547 #[tokio::test]
2548 async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2549 let task_ctx = prepare_task_ctx(batch_size);
2550 let left = build_table(
2551 ("a1", &vec![1, 2, 3]),
2552 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2554 );
2555 let right = build_table(
2556 ("a2", &vec![10, 20, 30]),
2557 ("b1", &vec![4, 5, 6]),
2558 ("c2", &vec![70, 80, 90]),
2559 );
2560 let on = vec![(
2561 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2562 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2563 )];
2564
2565 let (columns, batches, metrics) = partitioned_join_collect(
2566 Arc::clone(&left),
2567 Arc::clone(&right),
2568 on.clone(),
2569 &JoinType::Left,
2570 NullEquality::NullEqualsNothing,
2571 task_ctx,
2572 )
2573 .await?;
2574
2575 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2576
2577 allow_duplicates! {
2578 assert_snapshot!(batches_to_sort_string(&batches), @r"
2579 +----+----+----+----+----+----+
2580 | a1 | b1 | c1 | a2 | b1 | c2 |
2581 +----+----+----+----+----+----+
2582 | 1 | 4 | 7 | 10 | 4 | 70 |
2583 | 2 | 5 | 8 | 20 | 5 | 80 |
2584 | 3 | 7 | 9 | | | |
2585 +----+----+----+----+----+----+
2586 ");
2587 }
2588
2589 assert_join_metrics!(metrics, 3);
2590
2591 Ok(())
2592 }
2593
2594 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2595 build_table(
2598 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2599 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2600 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2601 )
2602 }
2603
2604 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2605 build_table(
2608 ("a2", &vec![8, 12, 6, 2, 10, 4]),
2609 ("b2", &vec![8, 10, 6, 2, 10, 4]),
2610 ("c2", &vec![20, 40, 60, 80, 100, 120]),
2611 )
2612 }
2613
2614 #[apply(batch_sizes)]
2615 #[tokio::test]
2616 async fn join_left_semi(batch_size: usize) -> Result<()> {
2617 let task_ctx = prepare_task_ctx(batch_size);
2618 let left = build_semi_anti_left_table();
2619 let right = build_semi_anti_right_table();
2620 let on = vec![(
2622 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2623 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2624 )];
2625
2626 let join = join(
2627 left,
2628 right,
2629 on,
2630 &JoinType::LeftSemi,
2631 NullEquality::NullEqualsNothing,
2632 )?;
2633
2634 let columns = columns(&join.schema());
2635 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2636
2637 let stream = join.execute(0, task_ctx)?;
2638 let batches = common::collect(stream).await?;
2639
2640 allow_duplicates! {
2642 assert_snapshot!(batches_to_sort_string(&batches), @r"
2643 +----+----+-----+
2644 | a1 | b1 | c1 |
2645 +----+----+-----+
2646 | 11 | 8 | 110 |
2647 | 13 | 10 | 130 |
2648 | 9 | 8 | 90 |
2649 +----+----+-----+
2650 ");
2651 }
2652
2653 Ok(())
2654 }
2655
2656 #[apply(batch_sizes)]
2657 #[tokio::test]
2658 async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2659 let task_ctx = prepare_task_ctx(batch_size);
2660 let left = build_semi_anti_left_table();
2661 let right = build_semi_anti_right_table();
2662
2663 let on = vec![(
2665 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2666 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2667 )];
2668
2669 let column_indices = vec![ColumnIndex {
2670 index: 0,
2671 side: JoinSide::Right,
2672 }];
2673 let intermediate_schema =
2674 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2675
2676 let filter_expression = Arc::new(BinaryExpr::new(
2677 Arc::new(Column::new("x", 0)),
2678 Operator::NotEq,
2679 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2680 )) as Arc<dyn PhysicalExpr>;
2681
2682 let filter = JoinFilter::new(
2683 filter_expression,
2684 column_indices.clone(),
2685 Arc::new(intermediate_schema.clone()),
2686 );
2687
2688 let join = join_with_filter(
2689 Arc::clone(&left),
2690 Arc::clone(&right),
2691 on.clone(),
2692 filter,
2693 &JoinType::LeftSemi,
2694 NullEquality::NullEqualsNothing,
2695 )?;
2696
2697 let columns_header = columns(&join.schema());
2698 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2699
2700 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2701 let batches = common::collect(stream).await?;
2702
2703 allow_duplicates! {
2704 assert_snapshot!(batches_to_sort_string(&batches), @r"
2705 +----+----+-----+
2706 | a1 | b1 | c1 |
2707 +----+----+-----+
2708 | 11 | 8 | 110 |
2709 | 13 | 10 | 130 |
2710 | 9 | 8 | 90 |
2711 +----+----+-----+
2712 ");
2713 }
2714
2715 let filter_expression = Arc::new(BinaryExpr::new(
2717 Arc::new(Column::new("x", 0)),
2718 Operator::Gt,
2719 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2720 )) as Arc<dyn PhysicalExpr>;
2721 let filter = JoinFilter::new(
2722 filter_expression,
2723 column_indices,
2724 Arc::new(intermediate_schema),
2725 );
2726
2727 let join = join_with_filter(
2728 left,
2729 right,
2730 on,
2731 filter,
2732 &JoinType::LeftSemi,
2733 NullEquality::NullEqualsNothing,
2734 )?;
2735
2736 let columns_header = columns(&join.schema());
2737 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2738
2739 let stream = join.execute(0, task_ctx)?;
2740 let batches = common::collect(stream).await?;
2741
2742 allow_duplicates! {
2743 assert_snapshot!(batches_to_sort_string(&batches), @r"
2744 +----+----+-----+
2745 | a1 | b1 | c1 |
2746 +----+----+-----+
2747 | 13 | 10 | 130 |
2748 +----+----+-----+
2749 ");
2750 }
2751
2752 Ok(())
2753 }
2754
2755 #[apply(batch_sizes)]
2756 #[tokio::test]
2757 async fn join_right_semi(batch_size: usize) -> Result<()> {
2758 let task_ctx = prepare_task_ctx(batch_size);
2759 let left = build_semi_anti_left_table();
2760 let right = build_semi_anti_right_table();
2761
2762 let on = vec![(
2764 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2765 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2766 )];
2767
2768 let join = join(
2769 left,
2770 right,
2771 on,
2772 &JoinType::RightSemi,
2773 NullEquality::NullEqualsNothing,
2774 )?;
2775
2776 let columns = columns(&join.schema());
2777 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2778
2779 let stream = join.execute(0, task_ctx)?;
2780 let batches = common::collect(stream).await?;
2781
2782 allow_duplicates! {
2784 assert_snapshot!(batches_to_string(&batches), @r"
2785 +----+----+-----+
2786 | a2 | b2 | c2 |
2787 +----+----+-----+
2788 | 8 | 8 | 20 |
2789 | 12 | 10 | 40 |
2790 | 10 | 10 | 100 |
2791 +----+----+-----+
2792 ");
2793 }
2794
2795 Ok(())
2796 }
2797
2798 #[apply(batch_sizes)]
2799 #[tokio::test]
2800 async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2801 let task_ctx = prepare_task_ctx(batch_size);
2802 let left = build_semi_anti_left_table();
2803 let right = build_semi_anti_right_table();
2804
2805 let on = vec![(
2807 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2808 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2809 )];
2810
2811 let column_indices = vec![ColumnIndex {
2812 index: 0,
2813 side: JoinSide::Left,
2814 }];
2815 let intermediate_schema =
2816 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2817
2818 let filter_expression = Arc::new(BinaryExpr::new(
2819 Arc::new(Column::new("x", 0)),
2820 Operator::NotEq,
2821 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2822 )) as Arc<dyn PhysicalExpr>;
2823
2824 let filter = JoinFilter::new(
2825 filter_expression,
2826 column_indices.clone(),
2827 Arc::new(intermediate_schema.clone()),
2828 );
2829
2830 let join = join_with_filter(
2831 Arc::clone(&left),
2832 Arc::clone(&right),
2833 on.clone(),
2834 filter,
2835 &JoinType::RightSemi,
2836 NullEquality::NullEqualsNothing,
2837 )?;
2838
2839 let columns = columns(&join.schema());
2840 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2841
2842 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2843 let batches = common::collect(stream).await?;
2844
2845 allow_duplicates! {
2847 assert_snapshot!(batches_to_string(&batches), @r"
2848 +----+----+-----+
2849 | a2 | b2 | c2 |
2850 +----+----+-----+
2851 | 8 | 8 | 20 |
2852 | 12 | 10 | 40 |
2853 | 10 | 10 | 100 |
2854 +----+----+-----+
2855 ");
2856 }
2857
2858 let filter_expression = Arc::new(BinaryExpr::new(
2860 Arc::new(Column::new("x", 0)),
2861 Operator::Gt,
2862 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2863 )) as Arc<dyn PhysicalExpr>;
2864
2865 let filter = JoinFilter::new(
2866 filter_expression,
2867 column_indices,
2868 Arc::new(intermediate_schema.clone()),
2869 );
2870
2871 let join = join_with_filter(
2872 left,
2873 right,
2874 on,
2875 filter,
2876 &JoinType::RightSemi,
2877 NullEquality::NullEqualsNothing,
2878 )?;
2879 let stream = join.execute(0, task_ctx)?;
2880 let batches = common::collect(stream).await?;
2881
2882 allow_duplicates! {
2884 assert_snapshot!(batches_to_string(&batches), @r"
2885 +----+----+-----+
2886 | a2 | b2 | c2 |
2887 +----+----+-----+
2888 | 12 | 10 | 40 |
2889 | 10 | 10 | 100 |
2890 +----+----+-----+
2891 ");
2892 }
2893
2894 Ok(())
2895 }
2896
2897 #[apply(batch_sizes)]
2898 #[tokio::test]
2899 async fn join_left_anti(batch_size: usize) -> Result<()> {
2900 let task_ctx = prepare_task_ctx(batch_size);
2901 let left = build_semi_anti_left_table();
2902 let right = build_semi_anti_right_table();
2903 let on = vec![(
2905 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2906 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2907 )];
2908
2909 let join = join(
2910 left,
2911 right,
2912 on,
2913 &JoinType::LeftAnti,
2914 NullEquality::NullEqualsNothing,
2915 )?;
2916
2917 let columns = columns(&join.schema());
2918 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2919
2920 let stream = join.execute(0, task_ctx)?;
2921 let batches = common::collect(stream).await?;
2922
2923 allow_duplicates! {
2924 assert_snapshot!(batches_to_sort_string(&batches), @r"
2925 +----+----+----+
2926 | a1 | b1 | c1 |
2927 +----+----+----+
2928 | 1 | 1 | 10 |
2929 | 3 | 3 | 30 |
2930 | 5 | 5 | 50 |
2931 | 7 | 7 | 70 |
2932 +----+----+----+
2933 ");
2934 }
2935 Ok(())
2936 }
2937
2938 #[apply(batch_sizes)]
2939 #[tokio::test]
2940 async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2941 let task_ctx = prepare_task_ctx(batch_size);
2942 let left = build_semi_anti_left_table();
2943 let right = build_semi_anti_right_table();
2944 let on = vec![(
2946 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2947 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2948 )];
2949
2950 let column_indices = vec![ColumnIndex {
2951 index: 0,
2952 side: JoinSide::Right,
2953 }];
2954 let intermediate_schema =
2955 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2956 let filter_expression = Arc::new(BinaryExpr::new(
2957 Arc::new(Column::new("x", 0)),
2958 Operator::NotEq,
2959 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2960 )) as Arc<dyn PhysicalExpr>;
2961
2962 let filter = JoinFilter::new(
2963 filter_expression,
2964 column_indices.clone(),
2965 Arc::new(intermediate_schema.clone()),
2966 );
2967
2968 let join = join_with_filter(
2969 Arc::clone(&left),
2970 Arc::clone(&right),
2971 on.clone(),
2972 filter,
2973 &JoinType::LeftAnti,
2974 NullEquality::NullEqualsNothing,
2975 )?;
2976
2977 let columns_header = columns(&join.schema());
2978 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2979
2980 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2981 let batches = common::collect(stream).await?;
2982
2983 allow_duplicates! {
2984 assert_snapshot!(batches_to_sort_string(&batches), @r"
2985 +----+----+-----+
2986 | a1 | b1 | c1 |
2987 +----+----+-----+
2988 | 1 | 1 | 10 |
2989 | 11 | 8 | 110 |
2990 | 3 | 3 | 30 |
2991 | 5 | 5 | 50 |
2992 | 7 | 7 | 70 |
2993 | 9 | 8 | 90 |
2994 +----+----+-----+
2995 ");
2996 }
2997
2998 let filter_expression = Arc::new(BinaryExpr::new(
3000 Arc::new(Column::new("x", 0)),
3001 Operator::NotEq,
3002 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3003 )) as Arc<dyn PhysicalExpr>;
3004
3005 let filter = JoinFilter::new(
3006 filter_expression,
3007 column_indices,
3008 Arc::new(intermediate_schema),
3009 );
3010
3011 let join = join_with_filter(
3012 left,
3013 right,
3014 on,
3015 filter,
3016 &JoinType::LeftAnti,
3017 NullEquality::NullEqualsNothing,
3018 )?;
3019
3020 let columns_header = columns(&join.schema());
3021 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3022
3023 let stream = join.execute(0, task_ctx)?;
3024 let batches = common::collect(stream).await?;
3025
3026 allow_duplicates! {
3027 assert_snapshot!(batches_to_sort_string(&batches), @r"
3028 +----+----+-----+
3029 | a1 | b1 | c1 |
3030 +----+----+-----+
3031 | 1 | 1 | 10 |
3032 | 11 | 8 | 110 |
3033 | 3 | 3 | 30 |
3034 | 5 | 5 | 50 |
3035 | 7 | 7 | 70 |
3036 | 9 | 8 | 90 |
3037 +----+----+-----+
3038 ");
3039 }
3040
3041 Ok(())
3042 }
3043
3044 #[apply(batch_sizes)]
3045 #[tokio::test]
3046 async fn join_right_anti(batch_size: usize) -> Result<()> {
3047 let task_ctx = prepare_task_ctx(batch_size);
3048 let left = build_semi_anti_left_table();
3049 let right = build_semi_anti_right_table();
3050 let on = vec![(
3051 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3052 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3053 )];
3054
3055 let join = join(
3056 left,
3057 right,
3058 on,
3059 &JoinType::RightAnti,
3060 NullEquality::NullEqualsNothing,
3061 )?;
3062
3063 let columns = columns(&join.schema());
3064 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3065
3066 let stream = join.execute(0, task_ctx)?;
3067 let batches = common::collect(stream).await?;
3068
3069 allow_duplicates! {
3071 assert_snapshot!(batches_to_string(&batches), @r"
3072 +----+----+-----+
3073 | a2 | b2 | c2 |
3074 +----+----+-----+
3075 | 6 | 6 | 60 |
3076 | 2 | 2 | 80 |
3077 | 4 | 4 | 120 |
3078 +----+----+-----+
3079 ");
3080 }
3081 Ok(())
3082 }
3083
3084 #[apply(batch_sizes)]
3085 #[tokio::test]
3086 async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
3087 let task_ctx = prepare_task_ctx(batch_size);
3088 let left = build_semi_anti_left_table();
3089 let right = build_semi_anti_right_table();
3090 let on = vec![(
3092 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3093 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3094 )];
3095
3096 let column_indices = vec![ColumnIndex {
3097 index: 0,
3098 side: JoinSide::Left,
3099 }];
3100 let intermediate_schema =
3101 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3102
3103 let filter_expression = Arc::new(BinaryExpr::new(
3104 Arc::new(Column::new("x", 0)),
3105 Operator::NotEq,
3106 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3107 )) as Arc<dyn PhysicalExpr>;
3108
3109 let filter = JoinFilter::new(
3110 filter_expression,
3111 column_indices,
3112 Arc::new(intermediate_schema.clone()),
3113 );
3114
3115 let join = join_with_filter(
3116 Arc::clone(&left),
3117 Arc::clone(&right),
3118 on.clone(),
3119 filter,
3120 &JoinType::RightAnti,
3121 NullEquality::NullEqualsNothing,
3122 )?;
3123
3124 let columns_header = columns(&join.schema());
3125 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3126
3127 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3128 let batches = common::collect(stream).await?;
3129
3130 allow_duplicates! {
3132 assert_snapshot!(batches_to_string(&batches), @r"
3133 +----+----+-----+
3134 | a2 | b2 | c2 |
3135 +----+----+-----+
3136 | 12 | 10 | 40 |
3137 | 6 | 6 | 60 |
3138 | 2 | 2 | 80 |
3139 | 10 | 10 | 100 |
3140 | 4 | 4 | 120 |
3141 +----+----+-----+
3142 ");
3143 }
3144
3145 let column_indices = vec![ColumnIndex {
3147 index: 1,
3148 side: JoinSide::Right,
3149 }];
3150 let filter_expression = Arc::new(BinaryExpr::new(
3151 Arc::new(Column::new("x", 0)),
3152 Operator::NotEq,
3153 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3154 )) as Arc<dyn PhysicalExpr>;
3155
3156 let filter = JoinFilter::new(
3157 filter_expression,
3158 column_indices,
3159 Arc::new(intermediate_schema),
3160 );
3161
3162 let join = join_with_filter(
3163 left,
3164 right,
3165 on,
3166 filter,
3167 &JoinType::RightAnti,
3168 NullEquality::NullEqualsNothing,
3169 )?;
3170
3171 let columns_header = columns(&join.schema());
3172 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3173
3174 let stream = join.execute(0, task_ctx)?;
3175 let batches = common::collect(stream).await?;
3176
3177 allow_duplicates! {
3179 assert_snapshot!(batches_to_string(&batches), @r"
3180 +----+----+-----+
3181 | a2 | b2 | c2 |
3182 +----+----+-----+
3183 | 8 | 8 | 20 |
3184 | 6 | 6 | 60 |
3185 | 2 | 2 | 80 |
3186 | 4 | 4 | 120 |
3187 +----+----+-----+
3188 ");
3189 }
3190
3191 Ok(())
3192 }
3193
3194 #[apply(batch_sizes)]
3195 #[tokio::test]
3196 async fn join_right_one(batch_size: usize) -> Result<()> {
3197 let task_ctx = prepare_task_ctx(batch_size);
3198 let left = build_table(
3199 ("a1", &vec![1, 2, 3]),
3200 ("b1", &vec![4, 5, 7]),
3201 ("c1", &vec![7, 8, 9]),
3202 );
3203 let right = build_table(
3204 ("a2", &vec![10, 20, 30]),
3205 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3207 );
3208 let on = vec![(
3209 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3210 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3211 )];
3212
3213 let (columns, batches, metrics) = join_collect(
3214 left,
3215 right,
3216 on,
3217 &JoinType::Right,
3218 NullEquality::NullEqualsNothing,
3219 task_ctx,
3220 )
3221 .await?;
3222
3223 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3224
3225 allow_duplicates! {
3226 assert_snapshot!(batches_to_sort_string(&batches), @r"
3227 +----+----+----+----+----+----+
3228 | a1 | b1 | c1 | a2 | b1 | c2 |
3229 +----+----+----+----+----+----+
3230 | | | | 30 | 6 | 90 |
3231 | 1 | 4 | 7 | 10 | 4 | 70 |
3232 | 2 | 5 | 8 | 20 | 5 | 80 |
3233 +----+----+----+----+----+----+
3234 ");
3235 }
3236
3237 assert_join_metrics!(metrics, 3);
3238
3239 Ok(())
3240 }
3241
3242 #[apply(batch_sizes)]
3243 #[tokio::test]
3244 async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3245 let task_ctx = prepare_task_ctx(batch_size);
3246 let left = build_table(
3247 ("a1", &vec![1, 2, 3]),
3248 ("b1", &vec![4, 5, 7]),
3249 ("c1", &vec![7, 8, 9]),
3250 );
3251 let right = build_table(
3252 ("a2", &vec![10, 20, 30]),
3253 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3255 );
3256 let on = vec![(
3257 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3258 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3259 )];
3260
3261 let (columns, batches, metrics) = partitioned_join_collect(
3262 left,
3263 right,
3264 on,
3265 &JoinType::Right,
3266 NullEquality::NullEqualsNothing,
3267 task_ctx,
3268 )
3269 .await?;
3270
3271 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3272
3273 allow_duplicates! {
3274 assert_snapshot!(batches_to_sort_string(&batches), @r"
3275 +----+----+----+----+----+----+
3276 | a1 | b1 | c1 | a2 | b1 | c2 |
3277 +----+----+----+----+----+----+
3278 | | | | 30 | 6 | 90 |
3279 | 1 | 4 | 7 | 10 | 4 | 70 |
3280 | 2 | 5 | 8 | 20 | 5 | 80 |
3281 +----+----+----+----+----+----+
3282 ");
3283 }
3284
3285 assert_join_metrics!(metrics, 3);
3286
3287 Ok(())
3288 }
3289
3290 #[apply(batch_sizes)]
3291 #[tokio::test]
3292 async fn join_full_one(batch_size: usize) -> Result<()> {
3293 let task_ctx = prepare_task_ctx(batch_size);
3294 let left = build_table(
3295 ("a1", &vec![1, 2, 3]),
3296 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3298 );
3299 let right = build_table(
3300 ("a2", &vec![10, 20, 30]),
3301 ("b2", &vec![4, 5, 6]),
3302 ("c2", &vec![70, 80, 90]),
3303 );
3304 let on = vec![(
3305 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3306 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3307 )];
3308
3309 let join = join(
3310 left,
3311 right,
3312 on,
3313 &JoinType::Full,
3314 NullEquality::NullEqualsNothing,
3315 )?;
3316
3317 let columns = columns(&join.schema());
3318 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3319
3320 let stream = join.execute(0, task_ctx)?;
3321 let batches = common::collect(stream).await?;
3322
3323 allow_duplicates! {
3324 assert_snapshot!(batches_to_sort_string(&batches), @r"
3325 +----+----+----+----+----+----+
3326 | a1 | b1 | c1 | a2 | b2 | c2 |
3327 +----+----+----+----+----+----+
3328 | | | | 30 | 6 | 90 |
3329 | 1 | 4 | 7 | 10 | 4 | 70 |
3330 | 2 | 5 | 8 | 20 | 5 | 80 |
3331 | 3 | 7 | 9 | | | |
3332 +----+----+----+----+----+----+
3333 ");
3334 }
3335
3336 Ok(())
3337 }
3338
3339 #[apply(batch_sizes)]
3340 #[tokio::test]
3341 async fn join_left_mark(batch_size: usize) -> Result<()> {
3342 let task_ctx = prepare_task_ctx(batch_size);
3343 let left = build_table(
3344 ("a1", &vec![1, 2, 3]),
3345 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3347 );
3348 let right = build_table(
3349 ("a2", &vec![10, 20, 30]),
3350 ("b1", &vec![4, 5, 6]),
3351 ("c2", &vec![70, 80, 90]),
3352 );
3353 let on = vec![(
3354 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3355 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3356 )];
3357
3358 let (columns, batches, metrics) = join_collect(
3359 Arc::clone(&left),
3360 Arc::clone(&right),
3361 on.clone(),
3362 &JoinType::LeftMark,
3363 NullEquality::NullEqualsNothing,
3364 task_ctx,
3365 )
3366 .await?;
3367
3368 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3369
3370 allow_duplicates! {
3371 assert_snapshot!(batches_to_sort_string(&batches), @r"
3372 +----+----+----+-------+
3373 | a1 | b1 | c1 | mark |
3374 +----+----+----+-------+
3375 | 1 | 4 | 7 | true |
3376 | 2 | 5 | 8 | true |
3377 | 3 | 7 | 9 | false |
3378 +----+----+----+-------+
3379 ");
3380 }
3381
3382 assert_join_metrics!(metrics, 3);
3383
3384 Ok(())
3385 }
3386
3387 #[apply(batch_sizes)]
3388 #[tokio::test]
3389 async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3390 let task_ctx = prepare_task_ctx(batch_size);
3391 let left = build_table(
3392 ("a1", &vec![1, 2, 3]),
3393 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3395 );
3396 let right = build_table(
3397 ("a2", &vec![10, 20, 30, 40]),
3398 ("b1", &vec![4, 4, 5, 6]),
3399 ("c2", &vec![60, 70, 80, 90]),
3400 );
3401 let on = vec![(
3402 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3403 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3404 )];
3405
3406 let (columns, batches, metrics) = partitioned_join_collect(
3407 Arc::clone(&left),
3408 Arc::clone(&right),
3409 on.clone(),
3410 &JoinType::LeftMark,
3411 NullEquality::NullEqualsNothing,
3412 task_ctx,
3413 )
3414 .await?;
3415
3416 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3417
3418 allow_duplicates! {
3419 assert_snapshot!(batches_to_sort_string(&batches), @r"
3420 +----+----+----+-------+
3421 | a1 | b1 | c1 | mark |
3422 +----+----+----+-------+
3423 | 1 | 4 | 7 | true |
3424 | 2 | 5 | 8 | true |
3425 | 3 | 7 | 9 | false |
3426 +----+----+----+-------+
3427 ");
3428 }
3429
3430 assert_join_metrics!(metrics, 3);
3431
3432 Ok(())
3433 }
3434
3435 #[apply(batch_sizes)]
3436 #[tokio::test]
3437 async fn join_right_mark(batch_size: usize) -> Result<()> {
3438 let task_ctx = prepare_task_ctx(batch_size);
3439 let left = build_table(
3440 ("a1", &vec![1, 2, 3]),
3441 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3443 );
3444 let right = build_table(
3445 ("a2", &vec![10, 20, 30]),
3446 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3448 );
3449 let on = vec![(
3450 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3451 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3452 )];
3453
3454 let (columns, batches, metrics) = join_collect(
3455 Arc::clone(&left),
3456 Arc::clone(&right),
3457 on.clone(),
3458 &JoinType::RightMark,
3459 NullEquality::NullEqualsNothing,
3460 task_ctx,
3461 )
3462 .await?;
3463
3464 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3465
3466 let expected = [
3467 "+----+----+----+-------+",
3468 "| a2 | b1 | c2 | mark |",
3469 "+----+----+----+-------+",
3470 "| 10 | 4 | 70 | true |",
3471 "| 20 | 5 | 80 | true |",
3472 "| 30 | 6 | 90 | false |",
3473 "+----+----+----+-------+",
3474 ];
3475 assert_batches_sorted_eq!(expected, &batches);
3476
3477 assert_join_metrics!(metrics, 3);
3478
3479 Ok(())
3480 }
3481
3482 #[apply(batch_sizes)]
3483 #[tokio::test]
3484 async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3485 let task_ctx = prepare_task_ctx(batch_size);
3486 let left = build_table(
3487 ("a1", &vec![1, 2, 3]),
3488 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3490 );
3491 let right = build_table(
3492 ("a2", &vec![10, 20, 30, 40]),
3493 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
3495 );
3496 let on = vec![(
3497 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3498 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3499 )];
3500
3501 let (columns, batches, metrics) = partitioned_join_collect(
3502 Arc::clone(&left),
3503 Arc::clone(&right),
3504 on.clone(),
3505 &JoinType::RightMark,
3506 NullEquality::NullEqualsNothing,
3507 task_ctx,
3508 )
3509 .await?;
3510
3511 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3512
3513 let expected = [
3514 "+----+----+----+-------+",
3515 "| a2 | b1 | c2 | mark |",
3516 "+----+----+----+-------+",
3517 "| 10 | 4 | 60 | true |",
3518 "| 20 | 4 | 70 | true |",
3519 "| 30 | 5 | 80 | true |",
3520 "| 40 | 6 | 90 | false |",
3521 "+----+----+----+-------+",
3522 ];
3523 assert_batches_sorted_eq!(expected, &batches);
3524
3525 assert_join_metrics!(metrics, 4);
3526
3527 Ok(())
3528 }
3529
3530 #[test]
3531 fn join_with_hash_collisions_64() -> Result<()> {
3532 let mut hashmap_left = HashTable::with_capacity(4);
3533 let left = build_table_i32(
3534 ("a", &vec![10, 20]),
3535 ("x", &vec![100, 200]),
3536 ("y", &vec![200, 300]),
3537 );
3538
3539 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3540 let hashes_buff = &mut vec![0; left.num_rows()];
3541 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3542
3543 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3548 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3549
3550 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3551 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3552
3553 let next = vec![2, 0];
3554
3555 let right = build_table_i32(
3556 ("a", &vec![10, 20]),
3557 ("b", &vec![0, 0]),
3558 ("c", &vec![30, 40]),
3559 );
3560
3561 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3563
3564 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3565
3566 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3567 let right_keys_values =
3568 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3569 let mut hashes_buffer = vec![0; right.num_rows()];
3570 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3571
3572 let mut probe_indices_buffer = Vec::new();
3573 let mut build_indices_buffer = Vec::new();
3574 let (l, r, _) = lookup_join_hashmap(
3575 &join_hash_map,
3576 &[left_keys_values],
3577 &[right_keys_values],
3578 NullEquality::NullEqualsNothing,
3579 &hashes_buffer,
3580 8192,
3581 (0, None),
3582 &mut probe_indices_buffer,
3583 &mut build_indices_buffer,
3584 )?;
3585
3586 let left_ids: UInt64Array = vec![0, 1].into();
3587
3588 let right_ids: UInt32Array = vec![0, 1].into();
3589
3590 assert_eq!(left_ids, l);
3591
3592 assert_eq!(right_ids, r);
3593
3594 Ok(())
3595 }
3596
3597 #[test]
3598 fn join_with_hash_collisions_u32() -> Result<()> {
3599 let mut hashmap_left = HashTable::with_capacity(4);
3600 let left = build_table_i32(
3601 ("a", &vec![10, 20]),
3602 ("x", &vec![100, 200]),
3603 ("y", &vec![200, 300]),
3604 );
3605
3606 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3607 let hashes_buff = &mut vec![0; left.num_rows()];
3608 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
3609
3610 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3611 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3612 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3613 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3614
3615 let next: Vec<u32> = vec![2, 0];
3616
3617 let right = build_table_i32(
3618 ("a", &vec![10, 20]),
3619 ("b", &vec![0, 0]),
3620 ("c", &vec![30, 40]),
3621 );
3622
3623 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3624
3625 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3626
3627 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3628 let right_keys_values =
3629 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3630 let mut hashes_buffer = vec![0; right.num_rows()];
3631 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
3632
3633 let mut probe_indices_buffer = Vec::new();
3634 let mut build_indices_buffer = Vec::new();
3635 let (l, r, _) = lookup_join_hashmap(
3636 &join_hash_map,
3637 &[left_keys_values],
3638 &[right_keys_values],
3639 NullEquality::NullEqualsNothing,
3640 &hashes_buffer,
3641 8192,
3642 (0, None),
3643 &mut probe_indices_buffer,
3644 &mut build_indices_buffer,
3645 )?;
3646
3647 let left_ids: UInt64Array = vec![0, 1].into();
3649 let right_ids: UInt32Array = vec![0, 1].into();
3650
3651 assert_eq!(left_ids, l);
3652 assert_eq!(right_ids, r);
3653
3654 Ok(())
3655 }
3656
3657 #[tokio::test]
3658 async fn join_with_duplicated_column_names() -> Result<()> {
3659 let task_ctx = Arc::new(TaskContext::default());
3660 let left = build_table(
3661 ("a", &vec![1, 2, 3]),
3662 ("b", &vec![4, 5, 7]),
3663 ("c", &vec![7, 8, 9]),
3664 );
3665 let right = build_table(
3666 ("a", &vec![10, 20, 30]),
3667 ("b", &vec![1, 2, 7]),
3668 ("c", &vec![70, 80, 90]),
3669 );
3670 let on = vec![(
3671 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3673 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3674 )];
3675
3676 let join = join(
3677 left,
3678 right,
3679 on,
3680 &JoinType::Inner,
3681 NullEquality::NullEqualsNothing,
3682 )?;
3683
3684 let columns = columns(&join.schema());
3685 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3686
3687 let stream = join.execute(0, task_ctx)?;
3688 let batches = common::collect(stream).await?;
3689
3690 allow_duplicates! {
3691 assert_snapshot!(batches_to_sort_string(&batches), @r"
3692 +---+---+---+----+---+----+
3693 | a | b | c | a | b | c |
3694 +---+---+---+----+---+----+
3695 | 1 | 4 | 7 | 10 | 1 | 70 |
3696 | 2 | 5 | 8 | 20 | 2 | 80 |
3697 +---+---+---+----+---+----+
3698 ");
3699 }
3700
3701 Ok(())
3702 }
3703
3704 fn prepare_join_filter() -> JoinFilter {
3705 let column_indices = vec![
3706 ColumnIndex {
3707 index: 2,
3708 side: JoinSide::Left,
3709 },
3710 ColumnIndex {
3711 index: 2,
3712 side: JoinSide::Right,
3713 },
3714 ];
3715 let intermediate_schema = Schema::new(vec![
3716 Field::new("c", DataType::Int32, true),
3717 Field::new("c", DataType::Int32, true),
3718 ]);
3719 let filter_expression = Arc::new(BinaryExpr::new(
3720 Arc::new(Column::new("c", 0)),
3721 Operator::Gt,
3722 Arc::new(Column::new("c", 1)),
3723 )) as Arc<dyn PhysicalExpr>;
3724
3725 JoinFilter::new(
3726 filter_expression,
3727 column_indices,
3728 Arc::new(intermediate_schema),
3729 )
3730 }
3731
3732 #[apply(batch_sizes)]
3733 #[tokio::test]
3734 async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3735 let task_ctx = prepare_task_ctx(batch_size);
3736 let left = build_table(
3737 ("a", &vec![0, 1, 2, 2]),
3738 ("b", &vec![4, 5, 7, 8]),
3739 ("c", &vec![7, 8, 9, 1]),
3740 );
3741 let right = build_table(
3742 ("a", &vec![10, 20, 30, 40]),
3743 ("b", &vec![2, 2, 3, 4]),
3744 ("c", &vec![7, 5, 6, 4]),
3745 );
3746 let on = vec![(
3747 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3748 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3749 )];
3750 let filter = prepare_join_filter();
3751
3752 let join = join_with_filter(
3753 left,
3754 right,
3755 on,
3756 filter,
3757 &JoinType::Inner,
3758 NullEquality::NullEqualsNothing,
3759 )?;
3760
3761 let columns = columns(&join.schema());
3762 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3763
3764 let stream = join.execute(0, task_ctx)?;
3765 let batches = common::collect(stream).await?;
3766
3767 allow_duplicates! {
3768 assert_snapshot!(batches_to_sort_string(&batches), @r"
3769 +---+---+---+----+---+---+
3770 | a | b | c | a | b | c |
3771 +---+---+---+----+---+---+
3772 | 2 | 7 | 9 | 10 | 2 | 7 |
3773 | 2 | 7 | 9 | 20 | 2 | 5 |
3774 +---+---+---+----+---+---+
3775 ");
3776 }
3777
3778 Ok(())
3779 }
3780
3781 #[apply(batch_sizes)]
3782 #[tokio::test]
3783 async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3784 let task_ctx = prepare_task_ctx(batch_size);
3785 let left = build_table(
3786 ("a", &vec![0, 1, 2, 2]),
3787 ("b", &vec![4, 5, 7, 8]),
3788 ("c", &vec![7, 8, 9, 1]),
3789 );
3790 let right = build_table(
3791 ("a", &vec![10, 20, 30, 40]),
3792 ("b", &vec![2, 2, 3, 4]),
3793 ("c", &vec![7, 5, 6, 4]),
3794 );
3795 let on = vec![(
3796 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3797 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3798 )];
3799 let filter = prepare_join_filter();
3800
3801 let join = join_with_filter(
3802 left,
3803 right,
3804 on,
3805 filter,
3806 &JoinType::Left,
3807 NullEquality::NullEqualsNothing,
3808 )?;
3809
3810 let columns = columns(&join.schema());
3811 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3812
3813 let stream = join.execute(0, task_ctx)?;
3814 let batches = common::collect(stream).await?;
3815
3816 allow_duplicates! {
3817 assert_snapshot!(batches_to_sort_string(&batches), @r"
3818 +---+---+---+----+---+---+
3819 | a | b | c | a | b | c |
3820 +---+---+---+----+---+---+
3821 | 0 | 4 | 7 | | | |
3822 | 1 | 5 | 8 | | | |
3823 | 2 | 7 | 9 | 10 | 2 | 7 |
3824 | 2 | 7 | 9 | 20 | 2 | 5 |
3825 | 2 | 8 | 1 | | | |
3826 +---+---+---+----+---+---+
3827 ");
3828 }
3829
3830 Ok(())
3831 }
3832
3833 #[apply(batch_sizes)]
3834 #[tokio::test]
3835 async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3836 let task_ctx = prepare_task_ctx(batch_size);
3837 let left = build_table(
3838 ("a", &vec![0, 1, 2, 2]),
3839 ("b", &vec![4, 5, 7, 8]),
3840 ("c", &vec![7, 8, 9, 1]),
3841 );
3842 let right = build_table(
3843 ("a", &vec![10, 20, 30, 40]),
3844 ("b", &vec![2, 2, 3, 4]),
3845 ("c", &vec![7, 5, 6, 4]),
3846 );
3847 let on = vec![(
3848 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3849 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3850 )];
3851 let filter = prepare_join_filter();
3852
3853 let join = join_with_filter(
3854 left,
3855 right,
3856 on,
3857 filter,
3858 &JoinType::Right,
3859 NullEquality::NullEqualsNothing,
3860 )?;
3861
3862 let columns = columns(&join.schema());
3863 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3864
3865 let stream = join.execute(0, task_ctx)?;
3866 let batches = common::collect(stream).await?;
3867
3868 allow_duplicates! {
3869 assert_snapshot!(batches_to_sort_string(&batches), @r"
3870 +---+---+---+----+---+---+
3871 | a | b | c | a | b | c |
3872 +---+---+---+----+---+---+
3873 | | | | 30 | 3 | 6 |
3874 | | | | 40 | 4 | 4 |
3875 | 2 | 7 | 9 | 10 | 2 | 7 |
3876 | 2 | 7 | 9 | 20 | 2 | 5 |
3877 +---+---+---+----+---+---+
3878 ");
3879 }
3880
3881 Ok(())
3882 }
3883
3884 #[apply(batch_sizes)]
3885 #[tokio::test]
3886 async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3887 let task_ctx = prepare_task_ctx(batch_size);
3888 let left = build_table(
3889 ("a", &vec![0, 1, 2, 2]),
3890 ("b", &vec![4, 5, 7, 8]),
3891 ("c", &vec![7, 8, 9, 1]),
3892 );
3893 let right = build_table(
3894 ("a", &vec![10, 20, 30, 40]),
3895 ("b", &vec![2, 2, 3, 4]),
3896 ("c", &vec![7, 5, 6, 4]),
3897 );
3898 let on = vec![(
3899 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3900 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3901 )];
3902 let filter = prepare_join_filter();
3903
3904 let join = join_with_filter(
3905 left,
3906 right,
3907 on,
3908 filter,
3909 &JoinType::Full,
3910 NullEquality::NullEqualsNothing,
3911 )?;
3912
3913 let columns = columns(&join.schema());
3914 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3915
3916 let stream = join.execute(0, task_ctx)?;
3917 let batches = common::collect(stream).await?;
3918
3919 let expected = [
3920 "+---+---+---+----+---+---+",
3921 "| a | b | c | a | b | c |",
3922 "+---+---+---+----+---+---+",
3923 "| | | | 30 | 3 | 6 |",
3924 "| | | | 40 | 4 | 4 |",
3925 "| 2 | 7 | 9 | 10 | 2 | 7 |",
3926 "| 2 | 7 | 9 | 20 | 2 | 5 |",
3927 "| 0 | 4 | 7 | | | |",
3928 "| 1 | 5 | 8 | | | |",
3929 "| 2 | 8 | 1 | | | |",
3930 "+---+---+---+----+---+---+",
3931 ];
3932 assert_batches_sorted_eq!(expected, &batches);
3933
3934 Ok(())
3952 }
3953
3954 #[tokio::test]
3956 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3957 let task_ctx = Arc::new(TaskContext::default());
3958 let left = build_table(
3959 ("a1", &vec![1, 2, 3]),
3960 ("b1", &vec![4, 5, 7]),
3961 ("c1", &vec![7, 8, 9]),
3962 );
3963 let right = build_table(
3964 ("a2", &vec![10, 20, 30]),
3965 ("b2", &vec![4, 5, 6]),
3966 ("c2", &vec![70, 80, 90]),
3967 );
3968 let on = vec![(
3969 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3970 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3971 )];
3972
3973 let expected_inner = vec![
3974 "+----+----+----+----+----+----+",
3975 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3976 "+----+----+----+----+----+----+",
3977 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3978 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3979 "+----+----+----+----+----+----+",
3980 ];
3981 let expected_left = vec![
3982 "+----+----+----+----+----+----+",
3983 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3984 "+----+----+----+----+----+----+",
3985 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3986 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3987 "| 3 | 7 | 9 | | | |",
3988 "+----+----+----+----+----+----+",
3989 ];
3990 let expected_right = vec![
3991 "+----+----+----+----+----+----+",
3992 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3993 "+----+----+----+----+----+----+",
3994 "| | | | 30 | 6 | 90 |",
3995 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3996 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3997 "+----+----+----+----+----+----+",
3998 ];
3999 let expected_full = vec![
4000 "+----+----+----+----+----+----+",
4001 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4002 "+----+----+----+----+----+----+",
4003 "| | | | 30 | 6 | 90 |",
4004 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4005 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4006 "| 3 | 7 | 9 | | | |",
4007 "+----+----+----+----+----+----+",
4008 ];
4009 let expected_left_semi = vec![
4010 "+----+----+----+",
4011 "| a1 | b1 | c1 |",
4012 "+----+----+----+",
4013 "| 1 | 4 | 7 |",
4014 "| 2 | 5 | 8 |",
4015 "+----+----+----+",
4016 ];
4017 let expected_left_anti = vec![
4018 "+----+----+----+",
4019 "| a1 | b1 | c1 |",
4020 "+----+----+----+",
4021 "| 3 | 7 | 9 |",
4022 "+----+----+----+",
4023 ];
4024 let expected_right_semi = vec![
4025 "+----+----+----+",
4026 "| a2 | b2 | c2 |",
4027 "+----+----+----+",
4028 "| 10 | 4 | 70 |",
4029 "| 20 | 5 | 80 |",
4030 "+----+----+----+",
4031 ];
4032 let expected_right_anti = vec![
4033 "+----+----+----+",
4034 "| a2 | b2 | c2 |",
4035 "+----+----+----+",
4036 "| 30 | 6 | 90 |",
4037 "+----+----+----+",
4038 ];
4039 let expected_left_mark = vec![
4040 "+----+----+----+-------+",
4041 "| a1 | b1 | c1 | mark |",
4042 "+----+----+----+-------+",
4043 "| 1 | 4 | 7 | true |",
4044 "| 2 | 5 | 8 | true |",
4045 "| 3 | 7 | 9 | false |",
4046 "+----+----+----+-------+",
4047 ];
4048 let expected_right_mark = vec![
4049 "+----+----+----+-------+",
4050 "| a2 | b2 | c2 | mark |",
4051 "+----+----+----+-------+",
4052 "| 10 | 4 | 70 | true |",
4053 "| 20 | 5 | 80 | true |",
4054 "| 30 | 6 | 90 | false |",
4055 "+----+----+----+-------+",
4056 ];
4057
4058 let test_cases = vec![
4059 (JoinType::Inner, expected_inner),
4060 (JoinType::Left, expected_left),
4061 (JoinType::Right, expected_right),
4062 (JoinType::Full, expected_full),
4063 (JoinType::LeftSemi, expected_left_semi),
4064 (JoinType::LeftAnti, expected_left_anti),
4065 (JoinType::RightSemi, expected_right_semi),
4066 (JoinType::RightAnti, expected_right_anti),
4067 (JoinType::LeftMark, expected_left_mark),
4068 (JoinType::RightMark, expected_right_mark),
4069 ];
4070
4071 for (join_type, expected) in test_cases {
4072 let (_, batches, metrics) = join_collect_with_partition_mode(
4073 Arc::clone(&left),
4074 Arc::clone(&right),
4075 on.clone(),
4076 &join_type,
4077 PartitionMode::CollectLeft,
4078 NullEquality::NullEqualsNothing,
4079 Arc::clone(&task_ctx),
4080 )
4081 .await?;
4082 assert_batches_sorted_eq!(expected, &batches);
4083 assert_join_metrics!(metrics, expected.len() - 4);
4084 }
4085
4086 Ok(())
4087 }
4088
4089 #[tokio::test]
4090 async fn join_date32() -> Result<()> {
4091 let schema = Arc::new(Schema::new(vec![
4092 Field::new("date", DataType::Date32, false),
4093 Field::new("n", DataType::Int32, false),
4094 ]));
4095
4096 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4097 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4098 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4099 let left =
4100 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4101 .unwrap();
4102 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4103 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4104 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4105 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4106 let on = vec![(
4107 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4108 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4109 )];
4110
4111 let join = join(
4112 left,
4113 right,
4114 on,
4115 &JoinType::Inner,
4116 NullEquality::NullEqualsNothing,
4117 )?;
4118
4119 let task_ctx = Arc::new(TaskContext::default());
4120 let stream = join.execute(0, task_ctx)?;
4121 let batches = common::collect(stream).await?;
4122
4123 allow_duplicates! {
4124 assert_snapshot!(batches_to_sort_string(&batches), @r"
4125 +------------+---+------------+---+
4126 | date | n | date | n |
4127 +------------+---+------------+---+
4128 | 2022-04-26 | 2 | 2022-04-26 | 4 |
4129 | 2022-04-26 | 2 | 2022-04-26 | 5 |
4130 | 2022-04-27 | 3 | 2022-04-27 | 6 |
4131 +------------+---+------------+---+
4132 ");
4133 }
4134
4135 Ok(())
4136 }
4137
4138 #[tokio::test]
4139 async fn join_with_error_right() {
4140 let left = build_table(
4141 ("a1", &vec![1, 2, 3]),
4142 ("b1", &vec![4, 5, 7]),
4143 ("c1", &vec![7, 8, 9]),
4144 );
4145
4146 let err = exec_err!("bad data error");
4149 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4150
4151 let on = vec![(
4152 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4153 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4154 )];
4155 let schema = right.schema();
4156 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4157 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4158
4159 let join_types = vec![
4160 JoinType::Inner,
4161 JoinType::Left,
4162 JoinType::Right,
4163 JoinType::Full,
4164 JoinType::LeftSemi,
4165 JoinType::LeftAnti,
4166 JoinType::RightSemi,
4167 JoinType::RightAnti,
4168 ];
4169
4170 for join_type in join_types {
4171 let join = join(
4172 Arc::clone(&left),
4173 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4174 on.clone(),
4175 &join_type,
4176 NullEquality::NullEqualsNothing,
4177 )
4178 .unwrap();
4179 let task_ctx = Arc::new(TaskContext::default());
4180
4181 let stream = join.execute(0, task_ctx).unwrap();
4182
4183 let result_string = common::collect(stream).await.unwrap_err().to_string();
4185 assert!(
4186 result_string.contains("bad data error"),
4187 "actual: {result_string}"
4188 );
4189 }
4190 }
4191
4192 #[tokio::test]
4193 async fn join_split_batch() {
4194 let left = build_table(
4195 ("a1", &vec![1, 2, 3, 4]),
4196 ("b1", &vec![1, 1, 1, 1]),
4197 ("c1", &vec![0, 0, 0, 0]),
4198 );
4199 let right = build_table(
4200 ("a2", &vec![10, 20, 30, 40, 50]),
4201 ("b2", &vec![1, 1, 1, 1, 1]),
4202 ("c2", &vec![0, 0, 0, 0, 0]),
4203 );
4204 let on = vec![(
4205 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4206 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4207 )];
4208
4209 let join_types = vec![
4210 JoinType::Inner,
4211 JoinType::Left,
4212 JoinType::Right,
4213 JoinType::Full,
4214 JoinType::RightSemi,
4215 JoinType::RightAnti,
4216 JoinType::LeftSemi,
4217 JoinType::LeftAnti,
4218 ];
4219 let expected_resultset_records = 20;
4220 let common_result = [
4221 "+----+----+----+----+----+----+",
4222 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4223 "+----+----+----+----+----+----+",
4224 "| 1 | 1 | 0 | 10 | 1 | 0 |",
4225 "| 2 | 1 | 0 | 10 | 1 | 0 |",
4226 "| 3 | 1 | 0 | 10 | 1 | 0 |",
4227 "| 4 | 1 | 0 | 10 | 1 | 0 |",
4228 "| 1 | 1 | 0 | 20 | 1 | 0 |",
4229 "| 2 | 1 | 0 | 20 | 1 | 0 |",
4230 "| 3 | 1 | 0 | 20 | 1 | 0 |",
4231 "| 4 | 1 | 0 | 20 | 1 | 0 |",
4232 "| 1 | 1 | 0 | 30 | 1 | 0 |",
4233 "| 2 | 1 | 0 | 30 | 1 | 0 |",
4234 "| 3 | 1 | 0 | 30 | 1 | 0 |",
4235 "| 4 | 1 | 0 | 30 | 1 | 0 |",
4236 "| 1 | 1 | 0 | 40 | 1 | 0 |",
4237 "| 2 | 1 | 0 | 40 | 1 | 0 |",
4238 "| 3 | 1 | 0 | 40 | 1 | 0 |",
4239 "| 4 | 1 | 0 | 40 | 1 | 0 |",
4240 "| 1 | 1 | 0 | 50 | 1 | 0 |",
4241 "| 2 | 1 | 0 | 50 | 1 | 0 |",
4242 "| 3 | 1 | 0 | 50 | 1 | 0 |",
4243 "| 4 | 1 | 0 | 50 | 1 | 0 |",
4244 "+----+----+----+----+----+----+",
4245 ];
4246 let left_batch = [
4247 "+----+----+----+",
4248 "| a1 | b1 | c1 |",
4249 "+----+----+----+",
4250 "| 1 | 1 | 0 |",
4251 "| 2 | 1 | 0 |",
4252 "| 3 | 1 | 0 |",
4253 "| 4 | 1 | 0 |",
4254 "+----+----+----+",
4255 ];
4256 let right_batch = [
4257 "+----+----+----+",
4258 "| a2 | b2 | c2 |",
4259 "+----+----+----+",
4260 "| 10 | 1 | 0 |",
4261 "| 20 | 1 | 0 |",
4262 "| 30 | 1 | 0 |",
4263 "| 40 | 1 | 0 |",
4264 "| 50 | 1 | 0 |",
4265 "+----+----+----+",
4266 ];
4267 let right_empty = [
4268 "+----+----+----+",
4269 "| a2 | b2 | c2 |",
4270 "+----+----+----+",
4271 "+----+----+----+",
4272 ];
4273 let left_empty = [
4274 "+----+----+----+",
4275 "| a1 | b1 | c1 |",
4276 "+----+----+----+",
4277 "+----+----+----+",
4278 ];
4279
4280 for join_type in join_types {
4282 for batch_size in (1..21).rev() {
4283 let task_ctx = prepare_task_ctx(batch_size);
4284
4285 let join = join(
4286 Arc::clone(&left),
4287 Arc::clone(&right),
4288 on.clone(),
4289 &join_type,
4290 NullEquality::NullEqualsNothing,
4291 )
4292 .unwrap();
4293
4294 let stream = join.execute(0, task_ctx).unwrap();
4295 let batches = common::collect(stream).await.unwrap();
4296
4297 let expected_batch_count = match join_type {
4302 JoinType::Inner
4303 | JoinType::Right
4304 | JoinType::RightSemi
4305 | JoinType::RightAnti => {
4306 div_ceil(expected_resultset_records, batch_size)
4307 }
4308 _ => div_ceil(expected_resultset_records, batch_size) + 1,
4309 };
4310 assert!(
4312 batches.len() <= expected_batch_count,
4313 "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
4314 batches.len()
4315 );
4316
4317 let expected = match join_type {
4318 JoinType::RightSemi => right_batch.to_vec(),
4319 JoinType::RightAnti => right_empty.to_vec(),
4320 JoinType::LeftSemi => left_batch.to_vec(),
4321 JoinType::LeftAnti => left_empty.to_vec(),
4322 _ => common_result.to_vec(),
4323 };
4324 if batches.is_empty() {
4327 assert!(
4329 matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
4330 "Unexpected empty result for {join_type} join"
4331 );
4332 } else {
4333 assert_batches_eq!(expected, &batches);
4334 }
4335 }
4336 }
4337 }
4338
4339 #[tokio::test]
4340 async fn single_partition_join_overallocation() -> Result<()> {
4341 let left = build_table(
4342 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4343 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4344 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4345 );
4346 let right = build_table(
4347 ("a2", &vec![10, 11]),
4348 ("b2", &vec![12, 13]),
4349 ("c2", &vec![14, 15]),
4350 );
4351 let on = vec![(
4352 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4353 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4354 )];
4355
4356 let join_types = vec![
4357 JoinType::Inner,
4358 JoinType::Left,
4359 JoinType::Right,
4360 JoinType::Full,
4361 JoinType::LeftSemi,
4362 JoinType::LeftAnti,
4363 JoinType::RightSemi,
4364 JoinType::RightAnti,
4365 JoinType::LeftMark,
4366 JoinType::RightMark,
4367 ];
4368
4369 for join_type in join_types {
4370 let runtime = RuntimeEnvBuilder::new()
4371 .with_memory_limit(100, 1.0)
4372 .build_arc()?;
4373 let task_ctx = TaskContext::default().with_runtime(runtime);
4374 let task_ctx = Arc::new(task_ctx);
4375
4376 let join = join(
4377 Arc::clone(&left),
4378 Arc::clone(&right),
4379 on.clone(),
4380 &join_type,
4381 NullEquality::NullEqualsNothing,
4382 )?;
4383
4384 let stream = join.execute(0, task_ctx)?;
4385 let err = common::collect(stream).await.unwrap_err();
4386
4387 assert_contains!(
4389 err.to_string(),
4390 "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
4391 );
4392
4393 assert_contains!(
4394 err.to_string(),
4395 "Failed to allocate additional 120.0 B for HashJoinInput"
4396 );
4397 }
4398
4399 Ok(())
4400 }
4401
4402 #[tokio::test]
4403 async fn partitioned_join_overallocation() -> Result<()> {
4404 let left_batch = build_table_i32(
4407 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4408 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4409 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4410 );
4411 let left = TestMemoryExec::try_new_exec(
4412 &[vec![left_batch.clone()], vec![left_batch.clone()]],
4413 left_batch.schema(),
4414 None,
4415 )
4416 .unwrap();
4417 let right_batch = build_table_i32(
4418 ("a2", &vec![10, 11]),
4419 ("b2", &vec![12, 13]),
4420 ("c2", &vec![14, 15]),
4421 );
4422 let right = TestMemoryExec::try_new_exec(
4423 &[vec![right_batch.clone()], vec![right_batch.clone()]],
4424 right_batch.schema(),
4425 None,
4426 )
4427 .unwrap();
4428 let on = vec![(
4429 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4430 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4431 )];
4432
4433 let join_types = vec![
4434 JoinType::Inner,
4435 JoinType::Left,
4436 JoinType::Right,
4437 JoinType::Full,
4438 JoinType::LeftSemi,
4439 JoinType::LeftAnti,
4440 JoinType::RightSemi,
4441 JoinType::RightAnti,
4442 ];
4443
4444 for join_type in join_types {
4445 let runtime = RuntimeEnvBuilder::new()
4446 .with_memory_limit(100, 1.0)
4447 .build_arc()?;
4448 let session_config = SessionConfig::default().with_batch_size(50);
4449 let task_ctx = TaskContext::default()
4450 .with_session_config(session_config)
4451 .with_runtime(runtime);
4452 let task_ctx = Arc::new(task_ctx);
4453
4454 let join = HashJoinExec::try_new(
4455 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4456 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4457 on.clone(),
4458 None,
4459 &join_type,
4460 None,
4461 PartitionMode::Partitioned,
4462 NullEquality::NullEqualsNothing,
4463 )?;
4464
4465 let stream = join.execute(1, task_ctx)?;
4466 let err = common::collect(stream).await.unwrap_err();
4467
4468 assert_contains!(
4470 err.to_string(),
4471 "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
4472 );
4473
4474 assert_contains!(
4475 err.to_string(),
4476 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4477 );
4478 }
4479
4480 Ok(())
4481 }
4482
4483 fn build_table_struct(
4484 struct_name: &str,
4485 field_name_and_values: (&str, &Vec<Option<i32>>),
4486 nulls: Option<NullBuffer>,
4487 ) -> Arc<dyn ExecutionPlan> {
4488 let (field_name, values) = field_name_and_values;
4489 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4490 let schema = Schema::new(vec![Field::new(
4491 struct_name,
4492 DataType::Struct(inner_fields.clone().into()),
4493 nulls.is_some(),
4494 )]);
4495
4496 let batch = RecordBatch::try_new(
4497 Arc::new(schema),
4498 vec![Arc::new(StructArray::new(
4499 inner_fields.into(),
4500 vec![Arc::new(Int32Array::from(values.clone()))],
4501 nulls,
4502 ))],
4503 )
4504 .unwrap();
4505 let schema_ref = batch.schema();
4506 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4507 }
4508
4509 #[tokio::test]
4510 async fn join_on_struct() -> Result<()> {
4511 let task_ctx = Arc::new(TaskContext::default());
4512 let left =
4513 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4514 let right =
4515 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4516 let on = vec![(
4517 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4518 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4519 )];
4520
4521 let (columns, batches, metrics) = join_collect(
4522 left,
4523 right,
4524 on,
4525 &JoinType::Inner,
4526 NullEquality::NullEqualsNothing,
4527 task_ctx,
4528 )
4529 .await?;
4530
4531 assert_eq!(columns, vec!["n1", "n2"]);
4532
4533 allow_duplicates! {
4534 assert_snapshot!(batches_to_string(&batches), @r"
4535 +--------+--------+
4536 | n1 | n2 |
4537 +--------+--------+
4538 | {a: } | {a: } |
4539 | {a: 1} | {a: 1} |
4540 | {a: 2} | {a: 2} |
4541 +--------+--------+
4542 ");
4543 }
4544
4545 assert_join_metrics!(metrics, 3);
4546
4547 Ok(())
4548 }
4549
4550 #[tokio::test]
4551 async fn join_on_struct_with_nulls() -> Result<()> {
4552 let task_ctx = Arc::new(TaskContext::default());
4553 let left =
4554 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4555 let right =
4556 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4557 let on = vec![(
4558 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4559 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4560 )];
4561
4562 let (_, batches_null_eq, metrics) = join_collect(
4563 Arc::clone(&left),
4564 Arc::clone(&right),
4565 on.clone(),
4566 &JoinType::Inner,
4567 NullEquality::NullEqualsNull,
4568 Arc::clone(&task_ctx),
4569 )
4570 .await?;
4571
4572 allow_duplicates! {
4573 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
4574 +----+----+
4575 | n1 | n2 |
4576 +----+----+
4577 | | |
4578 +----+----+
4579 ");
4580 }
4581
4582 assert_join_metrics!(metrics, 1);
4583
4584 let (_, batches_null_neq, metrics) = join_collect(
4585 left,
4586 right,
4587 on,
4588 &JoinType::Inner,
4589 NullEquality::NullEqualsNothing,
4590 task_ctx,
4591 )
4592 .await?;
4593
4594 assert_join_metrics!(metrics, 0);
4595
4596 if batches_null_neq.is_empty() {
4599 } else {
4601 let expected_null_neq =
4602 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4603 assert_batches_eq!(expected_null_neq, &batches_null_neq);
4604 }
4605
4606 Ok(())
4607 }
4608
4609 fn columns(schema: &Schema) -> Vec<String> {
4611 schema.fields().iter().map(|f| f.name().clone()).collect()
4612 }
4613
4614 #[tokio::test]
4616 async fn test_hash_join_marks_filter_complete() -> Result<()> {
4617 let task_ctx = Arc::new(TaskContext::default());
4618 let left = build_table(
4619 ("a1", &vec![1, 2, 3]),
4620 ("b1", &vec![4, 5, 6]),
4621 ("c1", &vec![7, 8, 9]),
4622 );
4623 let right = build_table(
4624 ("a2", &vec![10, 20, 30]),
4625 ("b1", &vec![4, 5, 6]),
4626 ("c2", &vec![70, 80, 90]),
4627 );
4628
4629 let on = vec![(
4630 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4631 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4632 )];
4633
4634 let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4636 let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4637
4638 let _consumer = Arc::clone(&dynamic_filter)
4640 .with_new_children(vec![])
4641 .unwrap();
4642
4643 let mut join = HashJoinExec::try_new(
4645 left,
4646 right,
4647 on,
4648 None,
4649 &JoinType::Inner,
4650 None,
4651 PartitionMode::CollectLeft,
4652 NullEquality::NullEqualsNothing,
4653 )?;
4654 join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4655 filter: dynamic_filter,
4656 build_accumulator: OnceLock::new(),
4657 });
4658
4659 let stream = join.execute(0, task_ctx)?;
4661 let _batches = common::collect(stream).await?;
4662
4663 dynamic_filter_clone.wait_complete().await;
4666
4667 Ok(())
4668 }
4669
4670 #[tokio::test]
4672 async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
4673 let task_ctx = Arc::new(TaskContext::default());
4674 let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4676 let right = build_table(
4677 ("a2", &vec![10, 20, 30]),
4678 ("b1", &vec![4, 5, 6]),
4679 ("c2", &vec![70, 80, 90]),
4680 );
4681
4682 let on = vec![(
4683 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4684 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4685 )];
4686
4687 let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4689 let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4690
4691 let _consumer = Arc::clone(&dynamic_filter)
4693 .with_new_children(vec![])
4694 .unwrap();
4695
4696 let mut join = HashJoinExec::try_new(
4698 left,
4699 right,
4700 on,
4701 None,
4702 &JoinType::Inner,
4703 None,
4704 PartitionMode::CollectLeft,
4705 NullEquality::NullEqualsNothing,
4706 )?;
4707 join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4708 filter: dynamic_filter,
4709 build_accumulator: OnceLock::new(),
4710 });
4711
4712 let stream = join.execute(0, task_ctx)?;
4714 let _batches = common::collect(stream).await?;
4715
4716 dynamic_filter_clone.wait_complete().await;
4719
4720 Ok(())
4721 }
4722}