1use std::collections::HashSet;
19use std::fmt;
20use std::mem::size_of;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, OnceLock};
23use std::{any::Any, vec};
24
25use crate::ExecutionPlanProperties;
26use crate::execution_plan::{
27 EmissionType, boundedness_from_children, has_same_children_properties,
28 stub_properties,
29};
30use crate::filter_pushdown::{
31 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32 FilterPushdownPropagation,
33};
34use crate::joins::Map;
35use crate::joins::array_map::ArrayMap;
36use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
37use crate::joins::hash_join::shared_bounds::{
38 ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
39};
40use crate::joins::hash_join::stream::{
41 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
42};
43use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
44use crate::joins::utils::{
45 OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
46 swap_join_projection, update_hash,
47};
48use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
49use crate::metrics::{Count, MetricBuilder};
50use crate::projection::{
51 EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
52 try_pushdown_through_join,
53};
54use crate::repartition::REPARTITION_RANDOM_STATE;
55use crate::spill::get_record_batch_memory_size;
56use crate::{
57 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
58 PlanProperties, SendableRecordBatchStream, Statistics,
59 common::can_project,
60 joins::utils::{
61 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
62 build_join_schema, check_join_is_valid, estimate_join_statistics,
63 need_produce_result_in_final, symmetric_join_output_partitioning,
64 },
65 metrics::{ExecutionPlanMetricsSet, MetricsSet},
66};
67
68use arrow::array::{ArrayRef, BooleanBufferBuilder};
69use arrow::compute::concat_batches;
70use arrow::datatypes::SchemaRef;
71use arrow::record_batch::RecordBatch;
72use arrow::util::bit_util;
73use arrow_schema::{DataType, Schema};
74use datafusion_common::config::ConfigOptions;
75use datafusion_common::utils::memory::estimate_memory_size;
76use datafusion_common::{
77 JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
78 plan_err, project_schema,
79};
80use datafusion_execution::TaskContext;
81use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
82use datafusion_expr::Accumulator;
83use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
84use datafusion_physical_expr::equivalence::{
85 ProjectionMapping, join_equivalence_properties,
86};
87use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
88use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
89use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
90
91use ahash::RandomState;
92use datafusion_physical_expr_common::physical_expr::fmt_sql;
93use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
94use futures::TryStreamExt;
95use parking_lot::Mutex;
96
97use super::partitioned_hash_eval::SeededRandomState;
98
99pub(crate) const HASH_JOIN_SEED: SeededRandomState =
101 SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
102
103const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";
104
105#[expect(clippy::too_many_arguments)]
106fn try_create_array_map(
107 bounds: &Option<PartitionBounds>,
108 schema: &SchemaRef,
109 batches: &[RecordBatch],
110 on_left: &[PhysicalExprRef],
111 reservation: &mut MemoryReservation,
112 perfect_hash_join_small_build_threshold: usize,
113 perfect_hash_join_min_key_density: f64,
114 null_equality: NullEquality,
115) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
116 if on_left.len() != 1 {
117 return Ok(None);
118 }
119
120 if null_equality == NullEquality::NullEqualsNull {
121 for batch in batches.iter() {
122 let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
123 if arrays[0].null_count() > 0 {
124 return Ok(None);
125 }
126 }
127 }
128
129 let (min_val, max_val) = if let Some(bounds) = bounds {
130 let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) {
131 (cb.min.clone(), cb.max.clone())
132 } else {
133 return Ok(None);
134 };
135
136 if min_val.is_null() || max_val.is_null() {
137 return Ok(None);
138 }
139
140 if min_val > max_val {
141 return internal_err!("min_val>max_val");
142 }
143
144 if let Some((mi, ma)) =
145 ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val))
146 {
147 (mi, ma)
148 } else {
149 return Ok(None);
150 }
151 } else {
152 return Ok(None);
153 };
154
155 let range = ArrayMap::calculate_range(min_val, max_val);
156 let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
157
158 if num_row >= u32::MAX as usize {
160 return Ok(None);
161 }
162
163 if range == usize::MAX as u64 {
166 return Ok(None);
167 }
168
169 let dense_ratio = (num_row as f64) / ((range + 1) as f64);
170
171 if range >= perfect_hash_join_small_build_threshold as u64
172 && dense_ratio <= perfect_hash_join_min_key_density
173 {
174 return Ok(None);
175 }
176
177 let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
178 reservation.try_grow(mem_size)?;
179
180 let batch = concat_batches(schema, batches)?;
181 let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;
182
183 let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
184
185 Ok(Some((array_map, batch, left_values)))
186}
187
188pub(super) struct JoinLeftData {
190 pub(super) map: Arc<Map>,
193 batch: RecordBatch,
195 values: Vec<ArrayRef>,
197 visited_indices_bitmap: SharedBitmapBuilder,
199 probe_threads_counter: AtomicUsize,
202 _reservation: MemoryReservation,
207 pub(super) bounds: Option<PartitionBounds>,
211 pub(super) membership: PushdownStrategy,
214 pub(super) probe_side_non_empty: AtomicBool,
217 pub(super) probe_side_has_null: AtomicBool,
219}
220
221impl JoinLeftData {
222 pub(super) fn map(&self) -> &Map {
224 &self.map
225 }
226
227 pub(super) fn batch(&self) -> &RecordBatch {
229 &self.batch
230 }
231
232 pub(super) fn values(&self) -> &[ArrayRef] {
234 &self.values
235 }
236
237 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
239 &self.visited_indices_bitmap
240 }
241
242 pub(super) fn membership(&self) -> &PushdownStrategy {
244 &self.membership
245 }
246
247 pub(super) fn report_probe_completed(&self) -> bool {
250 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
251 }
252}
253
254pub struct HashJoinExecBuilder {
267 exec: HashJoinExec,
268 preserve_properties: bool,
269}
270
271impl HashJoinExecBuilder {
272 pub fn new(
274 left: Arc<dyn ExecutionPlan>,
275 right: Arc<dyn ExecutionPlan>,
276 on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
277 join_type: JoinType,
278 ) -> Self {
279 Self {
280 exec: HashJoinExec {
281 left,
282 right,
283 on,
284 filter: None,
285 join_type,
286 left_fut: Default::default(),
287 random_state: HASH_JOIN_SEED,
288 mode: PartitionMode::Auto,
289 fetch: None,
290 metrics: ExecutionPlanMetricsSet::new(),
291 projection: None,
292 column_indices: vec![],
293 null_equality: NullEquality::NullEqualsNothing,
294 null_aware: false,
295 dynamic_filter: None,
296 cache: stub_properties(),
298 join_schema: Arc::new(Schema::empty()),
299 },
300 preserve_properties: false,
303 }
304 }
305
306 pub fn with_type(mut self, join_type: JoinType) -> Self {
308 self.exec.join_type = join_type;
309 self.preserve_properties = false;
310 self
311 }
312
313 pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
315 self.with_projection_ref(projection.map(Into::into))
316 }
317
318 pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self {
320 self.exec.projection = projection;
321 self.preserve_properties = false;
322 self
323 }
324
325 pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
327 self.exec.filter = filter;
328 self
329 }
330
331 pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) -> Self {
333 self.exec.on = on;
334 self.preserve_properties = false;
335 self
336 }
337
338 pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
340 self.exec.mode = mode;
341 self.preserve_properties = false;
342 self
343 }
344
345 pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
347 self.exec.null_equality = null_equality;
348 self
349 }
350
351 pub fn with_null_aware(mut self, null_aware: bool) -> Self {
353 self.exec.null_aware = null_aware;
354 self
355 }
356
357 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
359 self.exec.fetch = fetch;
360 self
361 }
362
363 pub fn recompute_properties(mut self) -> Self {
365 self.preserve_properties = false;
366 self
367 }
368
369 pub fn with_new_children(
371 mut self,
372 mut children: Vec<Arc<dyn ExecutionPlan>>,
373 ) -> Result<Self> {
374 assert_or_internal_err!(
375 children.len() == 2,
376 "wrong number of children passed into `HashJoinExecBuilder`"
377 );
378 self.preserve_properties &= has_same_children_properties(&self.exec, &children)?;
379 self.exec.right = children.swap_remove(1);
380 self.exec.left = children.swap_remove(0);
381 Ok(self)
382 }
383
384 pub fn reset_state(mut self) -> Self {
386 self.exec.left_fut = Default::default();
387 self.exec.dynamic_filter = None;
388 self.exec.metrics = ExecutionPlanMetricsSet::new();
389 self
390 }
391
392 pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
394 self.build().map(|p| Arc::new(p) as _)
395 }
396
397 pub fn build(self) -> Result<HashJoinExec> {
399 let Self {
400 exec,
401 preserve_properties,
402 } = self;
403
404 if exec.null_aware {
406 let join_type = exec.join_type();
407 if !matches!(join_type, JoinType::LeftAnti) {
408 return plan_err!(
409 "null_aware can only be true for LeftAnti joins, got {join_type}"
410 );
411 }
412 let on = exec.on();
413 if on.len() != 1 {
414 return plan_err!(
415 "null_aware anti join only supports single column join key, got {} columns",
416 on.len()
417 );
418 }
419 }
420
421 if preserve_properties {
422 return Ok(exec);
423 }
424
425 let HashJoinExec {
426 left,
427 right,
428 on,
429 filter,
430 join_type,
431 left_fut,
432 random_state,
433 mode,
434 metrics,
435 projection,
436 null_equality,
437 null_aware,
438 dynamic_filter,
439 fetch,
440 join_schema: _,
442 column_indices: _,
443 cache: _,
444 } = exec;
445
446 let left_schema = left.schema();
447 let right_schema = right.schema();
448 if on.is_empty() {
449 return plan_err!("On constraints in HashJoinExec should be non-empty");
450 }
451
452 check_join_is_valid(&left_schema, &right_schema, &on)?;
453 let (join_schema, column_indices) =
454 build_join_schema(&left_schema, &right_schema, &join_type);
455
456 let join_schema = Arc::new(join_schema);
457
458 can_project(&join_schema, projection.as_deref())?;
460
461 let cache = HashJoinExec::compute_properties(
462 &left,
463 &right,
464 &join_schema,
465 join_type,
466 &on,
467 mode,
468 projection.as_deref(),
469 )?;
470
471 Ok(HashJoinExec {
472 left,
473 right,
474 on,
475 filter,
476 join_type,
477 join_schema,
478 left_fut,
479 random_state,
480 mode,
481 metrics,
482 projection,
483 column_indices,
484 null_equality,
485 null_aware,
486 cache: Arc::new(cache),
487 dynamic_filter,
488 fetch,
489 })
490 }
491
492 fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
493 self.exec.dynamic_filter = filter;
494 self
495 }
496}
497
498impl From<&HashJoinExec> for HashJoinExecBuilder {
499 fn from(exec: &HashJoinExec) -> Self {
500 Self {
501 exec: HashJoinExec {
502 left: Arc::clone(exec.left()),
503 right: Arc::clone(exec.right()),
504 on: exec.on.clone(),
505 filter: exec.filter.clone(),
506 join_type: exec.join_type,
507 join_schema: Arc::clone(&exec.join_schema),
508 left_fut: Arc::clone(&exec.left_fut),
509 random_state: exec.random_state.clone(),
510 mode: exec.mode,
511 metrics: exec.metrics.clone(),
512 projection: exec.projection.clone(),
513 column_indices: exec.column_indices.clone(),
514 null_equality: exec.null_equality,
515 null_aware: exec.null_aware,
516 cache: Arc::clone(&exec.cache),
517 dynamic_filter: exec.dynamic_filter.clone(),
518 fetch: exec.fetch,
519 },
520 preserve_properties: true,
521 }
522 }
523}
524
525#[expect(rustdoc::private_intra_doc_links)]
526pub struct HashJoinExec {
718 pub left: Arc<dyn ExecutionPlan>,
720 pub right: Arc<dyn ExecutionPlan>,
722 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
724 pub filter: Option<JoinFilter>,
726 pub join_type: JoinType,
728 join_schema: SchemaRef,
731 left_fut: Arc<OnceAsync<JoinLeftData>>,
738 random_state: SeededRandomState,
740 pub mode: PartitionMode,
742 metrics: ExecutionPlanMetricsSet,
744 pub projection: Option<ProjectionRef>,
746 column_indices: Vec<ColumnIndex>,
748 pub null_equality: NullEquality,
750 pub null_aware: bool,
752 cache: Arc<PlanProperties>,
754 dynamic_filter: Option<HashJoinExecDynamicFilter>,
758 fetch: Option<usize>,
760}
761
762#[derive(Clone)]
763struct HashJoinExecDynamicFilter {
764 filter: Arc<DynamicFilterPhysicalExpr>,
766 build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
769}
770
771impl fmt::Debug for HashJoinExec {
772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773 f.debug_struct("HashJoinExec")
774 .field("left", &self.left)
775 .field("right", &self.right)
776 .field("on", &self.on)
777 .field("filter", &self.filter)
778 .field("join_type", &self.join_type)
779 .field("join_schema", &self.join_schema)
780 .field("left_fut", &self.left_fut)
781 .field("random_state", &self.random_state)
782 .field("mode", &self.mode)
783 .field("metrics", &self.metrics)
784 .field("projection", &self.projection)
785 .field("column_indices", &self.column_indices)
786 .field("null_equality", &self.null_equality)
787 .field("cache", &self.cache)
788 .finish()
790 }
791}
792
793impl EmbeddedProjection for HashJoinExec {
794 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
795 self.with_projection(projection)
796 }
797}
798
799impl HashJoinExec {
800 #[expect(clippy::too_many_arguments)]
805 pub fn try_new(
806 left: Arc<dyn ExecutionPlan>,
807 right: Arc<dyn ExecutionPlan>,
808 on: JoinOn,
809 filter: Option<JoinFilter>,
810 join_type: &JoinType,
811 projection: Option<Vec<usize>>,
812 partition_mode: PartitionMode,
813 null_equality: NullEquality,
814 null_aware: bool,
815 ) -> Result<Self> {
816 HashJoinExecBuilder::new(left, right, on, *join_type)
817 .with_filter(filter)
818 .with_projection(projection)
819 .with_partition_mode(partition_mode)
820 .with_null_equality(null_equality)
821 .with_null_aware(null_aware)
822 .build()
823 }
824
825 pub fn builder(&self) -> HashJoinExecBuilder {
831 self.into()
832 }
833
834 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
835 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
838 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
840 }
841
842 fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
843 if self.join_type != JoinType::Inner
844 || !config.optimizer.enable_join_dynamic_filter_pushdown
845 {
846 return false;
847 }
848
849 if config.optimizer.preserve_file_partitions > 0
856 && self.mode == PartitionMode::Partitioned
857 {
858 return false;
859 }
860
861 true
862 }
863
864 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
866 &self.left
867 }
868
869 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
871 &self.right
872 }
873
874 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
876 &self.on
877 }
878
879 pub fn filter(&self) -> Option<&JoinFilter> {
881 self.filter.as_ref()
882 }
883
884 pub fn join_type(&self) -> &JoinType {
886 &self.join_type
887 }
888
889 pub fn join_schema(&self) -> &SchemaRef {
892 &self.join_schema
893 }
894
895 pub fn partition_mode(&self) -> &PartitionMode {
897 &self.mode
898 }
899
900 pub fn null_equality(&self) -> NullEquality {
902 self.null_equality
903 }
904
905 #[doc(hidden)]
910 pub fn dynamic_filter_for_test(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
911 self.dynamic_filter.as_ref().map(|df| &df.filter)
912 }
913
914 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
916 vec![
917 false,
918 matches!(
919 join_type,
920 JoinType::Inner
921 | JoinType::Right
922 | JoinType::RightAnti
923 | JoinType::RightSemi
924 | JoinType::RightMark
925 ),
926 ]
927 }
928
929 pub fn probe_side() -> JoinSide {
931 JoinSide::Right
933 }
934
935 pub fn contains_projection(&self) -> bool {
937 self.projection.is_some()
938 }
939
940 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
942 let projection = projection.map(Into::into);
943 can_project(&self.schema(), projection.as_deref())?;
945 let projection =
946 combine_projections(projection.as_ref(), self.projection.as_ref())?;
947 self.builder().with_projection_ref(projection).build()
948 }
949
950 fn compute_properties(
952 left: &Arc<dyn ExecutionPlan>,
953 right: &Arc<dyn ExecutionPlan>,
954 schema: &SchemaRef,
955 join_type: JoinType,
956 on: JoinOnRef,
957 mode: PartitionMode,
958 projection: Option<&[usize]>,
959 ) -> Result<PlanProperties> {
960 let mut eq_properties = join_equivalence_properties(
962 left.equivalence_properties().clone(),
963 right.equivalence_properties().clone(),
964 &join_type,
965 Arc::clone(schema),
966 &Self::maintains_input_order(join_type),
967 Some(Self::probe_side()),
968 on,
969 )?;
970
971 let mut output_partitioning = match mode {
972 PartitionMode::CollectLeft => {
973 asymmetric_join_output_partitioning(left, right, &join_type)?
974 }
975 PartitionMode::Auto => Partitioning::UnknownPartitioning(
976 right.output_partitioning().partition_count(),
977 ),
978 PartitionMode::Partitioned => {
979 symmetric_join_output_partitioning(left, right, &join_type)?
980 }
981 };
982
983 let emission_type = if left.boundedness().is_unbounded() {
984 EmissionType::Final
985 } else if right.pipeline_behavior() == EmissionType::Incremental {
986 match join_type {
987 JoinType::Inner
990 | JoinType::LeftSemi
991 | JoinType::RightSemi
992 | JoinType::Right
993 | JoinType::RightAnti
994 | JoinType::RightMark => EmissionType::Incremental,
995 JoinType::Left
998 | JoinType::LeftAnti
999 | JoinType::LeftMark
1000 | JoinType::Full => EmissionType::Both,
1001 }
1002 } else {
1003 right.pipeline_behavior()
1004 };
1005
1006 if let Some(projection) = projection {
1008 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
1010 let out_schema = project_schema(schema, Some(&projection))?;
1011 output_partitioning =
1012 output_partitioning.project(&projection_mapping, &eq_properties);
1013 eq_properties = eq_properties.project(&projection_mapping, out_schema);
1014 }
1015
1016 Ok(PlanProperties::new(
1017 eq_properties,
1018 output_partitioning,
1019 emission_type,
1020 boundedness_from_children([left, right]),
1021 ))
1022 }
1023
1024 pub fn swap_inputs(
1048 &self,
1049 partition_mode: PartitionMode,
1050 ) -> Result<Arc<dyn ExecutionPlan>> {
1051 let left = self.left();
1052 let right = self.right();
1053 let new_join = self
1054 .builder()
1055 .with_type(self.join_type.swap())
1056 .with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
1057 .with_on(
1058 self.on()
1059 .iter()
1060 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
1061 .collect(),
1062 )
1063 .with_filter(self.filter().map(JoinFilter::swap))
1064 .with_projection(swap_join_projection(
1065 left.schema().fields().len(),
1066 right.schema().fields().len(),
1067 self.projection.as_deref(),
1068 self.join_type(),
1069 ))
1070 .with_partition_mode(partition_mode)
1071 .build()?;
1072 if matches!(
1074 self.join_type(),
1075 JoinType::LeftSemi
1076 | JoinType::RightSemi
1077 | JoinType::LeftAnti
1078 | JoinType::RightAnti
1079 | JoinType::LeftMark
1080 | JoinType::RightMark
1081 ) || self.projection.is_some()
1082 {
1083 Ok(Arc::new(new_join))
1084 } else {
1085 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
1086 }
1087 }
1088}
1089
1090impl DisplayAs for HashJoinExec {
1091 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
1092 match t {
1093 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1094 let display_filter = self.filter.as_ref().map_or_else(
1095 || "".to_string(),
1096 |f| format!(", filter={}", f.expression()),
1097 );
1098 let display_projections = if self.contains_projection() {
1099 format!(
1100 ", projection=[{}]",
1101 self.projection
1102 .as_ref()
1103 .unwrap()
1104 .iter()
1105 .map(|index| format!(
1106 "{}@{}",
1107 self.join_schema.fields().get(*index).unwrap().name(),
1108 index
1109 ))
1110 .collect::<Vec<_>>()
1111 .join(", ")
1112 )
1113 } else {
1114 "".to_string()
1115 };
1116 let display_null_equality =
1117 if self.null_equality() == NullEquality::NullEqualsNull {
1118 ", NullsEqual: true"
1119 } else {
1120 ""
1121 };
1122 let display_fetch = self
1123 .fetch
1124 .map_or_else(String::new, |f| format!(", fetch={f}"));
1125 let on = self
1126 .on
1127 .iter()
1128 .map(|(c1, c2)| format!("({c1}, {c2})"))
1129 .collect::<Vec<String>>()
1130 .join(", ");
1131 write!(
1132 f,
1133 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
1134 self.mode,
1135 self.join_type,
1136 on,
1137 display_filter,
1138 display_projections,
1139 display_null_equality,
1140 display_fetch,
1141 )
1142 }
1143 DisplayFormatType::TreeRender => {
1144 let on = self
1145 .on
1146 .iter()
1147 .map(|(c1, c2)| {
1148 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
1149 })
1150 .collect::<Vec<String>>()
1151 .join(", ");
1152
1153 if *self.join_type() != JoinType::Inner {
1154 writeln!(f, "join_type={:?}", self.join_type)?;
1155 }
1156
1157 writeln!(f, "on={on}")?;
1158
1159 if self.null_equality() == NullEquality::NullEqualsNull {
1160 writeln!(f, "NullsEqual: true")?;
1161 }
1162
1163 if let Some(filter) = self.filter.as_ref() {
1164 writeln!(f, "filter={filter}")?;
1165 }
1166
1167 if let Some(fetch) = self.fetch {
1168 writeln!(f, "fetch={fetch}")?;
1169 }
1170
1171 Ok(())
1172 }
1173 }
1174 }
1175}
1176
1177impl ExecutionPlan for HashJoinExec {
1178 fn name(&self) -> &'static str {
1179 "HashJoinExec"
1180 }
1181
1182 fn as_any(&self) -> &dyn Any {
1183 self
1184 }
1185
1186 fn properties(&self) -> &Arc<PlanProperties> {
1187 &self.cache
1188 }
1189
1190 fn required_input_distribution(&self) -> Vec<Distribution> {
1191 match self.mode {
1192 PartitionMode::CollectLeft => vec![
1193 Distribution::SinglePartition,
1194 Distribution::UnspecifiedDistribution,
1195 ],
1196 PartitionMode::Partitioned => {
1197 let (left_expr, right_expr) = self
1198 .on
1199 .iter()
1200 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1201 .unzip();
1202 vec![
1203 Distribution::HashPartitioned(left_expr),
1204 Distribution::HashPartitioned(right_expr),
1205 ]
1206 }
1207 PartitionMode::Auto => vec![
1208 Distribution::UnspecifiedDistribution,
1209 Distribution::UnspecifiedDistribution,
1210 ],
1211 }
1212 }
1213
1214 fn maintains_input_order(&self) -> Vec<bool> {
1231 Self::maintains_input_order(self.join_type)
1232 }
1233
1234 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1235 vec![&self.left, &self.right]
1236 }
1237
1238 fn with_new_children(
1244 self: Arc<Self>,
1245 children: Vec<Arc<dyn ExecutionPlan>>,
1246 ) -> Result<Arc<dyn ExecutionPlan>> {
1247 self.builder().with_new_children(children)?.build_exec()
1248 }
1249
1250 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1251 self.builder().reset_state().build_exec()
1252 }
1253
1254 fn execute(
1255 &self,
1256 partition: usize,
1257 context: Arc<TaskContext>,
1258 ) -> Result<SendableRecordBatchStream> {
1259 let on_left = self
1260 .on
1261 .iter()
1262 .map(|on| Arc::clone(&on.0))
1263 .collect::<Vec<_>>();
1264 let left_partitions = self.left.output_partitioning().partition_count();
1265 let right_partitions = self.right.output_partitioning().partition_count();
1266
1267 assert_or_internal_err!(
1268 self.mode != PartitionMode::Partitioned
1269 || left_partitions == right_partitions,
1270 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
1271 consider using RepartitionExec"
1272 );
1273
1274 assert_or_internal_err!(
1275 self.mode != PartitionMode::CollectLeft || left_partitions == 1,
1276 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
1277 consider using CoalescePartitionsExec or the EnforceDistribution rule"
1278 );
1279
1280 let enable_dynamic_filter_pushdown = self
1286 .allow_join_dynamic_filter_pushdown(context.session_config().options())
1287 && self
1288 .dynamic_filter
1289 .as_ref()
1290 .map(|df| df.filter.is_used())
1291 .unwrap_or(false);
1292
1293 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
1294
1295 let array_map_created_count = MetricBuilder::new(&self.metrics)
1296 .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
1297
1298 let left_fut = match self.mode {
1299 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
1300 let left_stream = self.left.execute(0, Arc::clone(&context))?;
1301
1302 let reservation =
1303 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
1304
1305 Ok(collect_left_input(
1306 self.random_state.random_state().clone(),
1307 left_stream,
1308 on_left.clone(),
1309 join_metrics.clone(),
1310 reservation,
1311 need_produce_result_in_final(self.join_type),
1312 self.right().output_partitioning().partition_count(),
1313 enable_dynamic_filter_pushdown,
1314 Arc::clone(context.session_config().options()),
1315 self.null_equality,
1316 array_map_created_count,
1317 ))
1318 })?,
1319 PartitionMode::Partitioned => {
1320 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
1321
1322 let reservation =
1323 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
1324 .register(context.memory_pool());
1325
1326 OnceFut::new(collect_left_input(
1327 self.random_state.random_state().clone(),
1328 left_stream,
1329 on_left.clone(),
1330 join_metrics.clone(),
1331 reservation,
1332 need_produce_result_in_final(self.join_type),
1333 1,
1334 enable_dynamic_filter_pushdown,
1335 Arc::clone(context.session_config().options()),
1336 self.null_equality,
1337 array_map_created_count,
1338 ))
1339 }
1340 PartitionMode::Auto => {
1341 return plan_err!(
1342 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1343 PartitionMode::Auto
1344 );
1345 }
1346 };
1347
1348 let batch_size = context.session_config().batch_size();
1349
1350 let repartition_random_state = REPARTITION_RANDOM_STATE;
1353 let build_accumulator = enable_dynamic_filter_pushdown
1354 .then(|| {
1355 self.dynamic_filter.as_ref().map(|df| {
1356 let filter = Arc::clone(&df.filter);
1357 let on_right = self
1358 .on
1359 .iter()
1360 .map(|(_, right_expr)| Arc::clone(right_expr))
1361 .collect::<Vec<_>>();
1362 Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1363 Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1364 self.mode,
1365 self.left.as_ref(),
1366 self.right.as_ref(),
1367 filter,
1368 on_right,
1369 repartition_random_state,
1370 ))
1371 })))
1372 })
1373 })
1374 .flatten()
1375 .flatten();
1376
1377 let right_stream = self.right.execute(partition, context)?;
1380
1381 let column_indices_after_projection = match self.projection.as_ref() {
1383 Some(projection) => projection
1384 .iter()
1385 .map(|i| self.column_indices[*i].clone())
1386 .collect(),
1387 None => self.column_indices.clone(),
1388 };
1389
1390 let on_right = self
1391 .on
1392 .iter()
1393 .map(|(_, right_expr)| Arc::clone(right_expr))
1394 .collect::<Vec<_>>();
1395
1396 Ok(Box::pin(HashJoinStream::new(
1397 partition,
1398 self.schema(),
1399 on_right,
1400 self.filter.clone(),
1401 self.join_type,
1402 right_stream,
1403 self.random_state.random_state().clone(),
1404 join_metrics,
1405 column_indices_after_projection,
1406 self.null_equality,
1407 HashJoinStreamState::WaitBuildSide,
1408 BuildSide::Initial(BuildSideInitialState { left_fut }),
1409 batch_size,
1410 vec![],
1411 self.right.output_ordering().is_some(),
1412 build_accumulator,
1413 self.mode,
1414 self.null_aware,
1415 self.fetch,
1416 )))
1417 }
1418
1419 fn metrics(&self) -> Option<MetricsSet> {
1420 Some(self.metrics.clone_inner())
1421 }
1422
1423 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1424 if partition.is_some() {
1425 return Ok(Statistics::new_unknown(&self.schema()));
1426 }
1427 let stats = estimate_join_statistics(
1431 self.left.partition_statistics(None)?,
1432 self.right.partition_statistics(None)?,
1433 &self.on,
1434 &self.join_type,
1435 &self.join_schema,
1436 )?;
1437 let stats = stats.project(self.projection.as_ref());
1439 stats.with_fetch(self.fetch, 0, 1)
1441 }
1442
1443 fn try_swapping_with_projection(
1447 &self,
1448 projection: &ProjectionExec,
1449 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1450 if self.contains_projection() {
1452 return Ok(None);
1453 }
1454
1455 let schema = self.schema();
1456 if let Some(JoinData {
1457 projected_left_child,
1458 projected_right_child,
1459 join_filter,
1460 join_on,
1461 }) = try_pushdown_through_join(
1462 projection,
1463 self.left(),
1464 self.right(),
1465 self.on(),
1466 &schema,
1467 self.filter(),
1468 )? {
1469 self.builder()
1470 .with_new_children(vec![
1471 Arc::new(projected_left_child),
1472 Arc::new(projected_right_child),
1473 ])?
1474 .with_on(join_on)
1475 .with_filter(join_filter)
1476 .with_projection(None)
1478 .build_exec()
1479 .map(Some)
1480 } else {
1481 try_embed_projection(projection, self)
1482 }
1483 }
1484
1485 fn gather_filters_for_pushdown(
1486 &self,
1487 phase: FilterPushdownPhase,
1488 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1489 config: &ConfigOptions,
1490 ) -> Result<FilterDescription> {
1491 let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);
1505
1506 let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
1508 Some(projection) => projection
1509 .iter()
1510 .map(|i| self.column_indices[*i].clone())
1511 .collect(),
1512 None => self.column_indices.clone(),
1513 };
1514
1515 let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new());
1516 column_indices
1517 .iter()
1518 .enumerate()
1519 .for_each(|(output_idx, ci)| {
1520 match ci.side {
1521 JoinSide::Left => left_allowed.insert(output_idx),
1522 JoinSide::Right => right_allowed.insert(output_idx),
1523 JoinSide::None => false,
1525 };
1526 });
1527
1528 match self.join_type {
1535 JoinType::LeftSemi | JoinType::LeftAnti => {
1536 let left_key_indices: HashSet<usize> = self
1537 .on
1538 .iter()
1539 .filter_map(|(left_key, _)| {
1540 left_key
1541 .as_any()
1542 .downcast_ref::<Column>()
1543 .map(|c| c.index())
1544 })
1545 .collect();
1546 for (output_idx, ci) in column_indices.iter().enumerate() {
1547 if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) {
1548 right_allowed.insert(output_idx);
1549 }
1550 }
1551 }
1552 JoinType::RightSemi | JoinType::RightAnti => {
1553 let right_key_indices: HashSet<usize> = self
1554 .on
1555 .iter()
1556 .filter_map(|(_, right_key)| {
1557 right_key
1558 .as_any()
1559 .downcast_ref::<Column>()
1560 .map(|c| c.index())
1561 })
1562 .collect();
1563 for (output_idx, ci) in column_indices.iter().enumerate() {
1564 if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index)
1565 {
1566 left_allowed.insert(output_idx);
1567 }
1568 }
1569 }
1570 _ => {}
1571 }
1572
1573 let left_child = if left_preserved {
1574 ChildFilterDescription::from_child_with_allowed_indices(
1575 &parent_filters,
1576 left_allowed,
1577 self.left(),
1578 )?
1579 } else {
1580 ChildFilterDescription::all_unsupported(&parent_filters)
1581 };
1582
1583 let mut right_child = if right_preserved {
1584 ChildFilterDescription::from_child_with_allowed_indices(
1585 &parent_filters,
1586 right_allowed,
1587 self.right(),
1588 )?
1589 } else {
1590 ChildFilterDescription::all_unsupported(&parent_filters)
1591 };
1592
1593 if phase == FilterPushdownPhase::Post
1595 && self.allow_join_dynamic_filter_pushdown(config)
1596 {
1597 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1599 right_child = right_child.with_self_filter(dynamic_filter);
1600 }
1601
1602 Ok(FilterDescription::new()
1603 .with_child(left_child)
1604 .with_child(right_child))
1605 }
1606
1607 fn handle_child_pushdown_result(
1608 &self,
1609 _phase: FilterPushdownPhase,
1610 child_pushdown_result: ChildPushdownResult,
1611 _config: &ConfigOptions,
1612 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1613 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1614 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1618 let predicate = Arc::clone(&filter.predicate);
1621 if let Ok(dynamic_filter) =
1622 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1623 {
1624 let new_node = self
1626 .builder()
1627 .with_dynamic_filter(Some(HashJoinExecDynamicFilter {
1628 filter: dynamic_filter,
1629 build_accumulator: OnceLock::new(),
1630 }))
1631 .build_exec()?;
1632 result = result.with_updated_node(new_node);
1633 }
1634 }
1635 Ok(result)
1636 }
1637
1638 fn supports_limit_pushdown(&self) -> bool {
1639 false
1643 }
1644
1645 fn fetch(&self) -> Option<usize> {
1646 self.fetch
1647 }
1648
1649 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1650 self.builder()
1651 .with_fetch(limit)
1652 .build()
1653 .ok()
1654 .map(|exec| Arc::new(exec) as _)
1655 }
1656}
1657
1658fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
1664 match join_type {
1665 JoinType::Inner => (true, true),
1666 JoinType::Left => (true, false),
1667 JoinType::Right => (false, true),
1668 JoinType::Full => (false, false),
1669 JoinType::LeftSemi | JoinType::LeftAnti => (true, true),
1673 JoinType::RightSemi | JoinType::RightAnti => (true, true),
1674 JoinType::LeftMark => (true, false),
1675 JoinType::RightMark => (false, true),
1676 }
1677}
1678
1679struct CollectLeftAccumulator {
1689 expr: Arc<dyn PhysicalExpr>,
1691 min: MinAccumulator,
1693 max: MaxAccumulator,
1695}
1696
1697impl CollectLeftAccumulator {
1698 fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1707 fn dictionary_value_type(data_type: &DataType) -> DataType {
1709 match data_type {
1710 DataType::Dictionary(_, value_type) => {
1711 dictionary_value_type(value_type.as_ref())
1712 }
1713 _ => data_type.clone(),
1714 }
1715 }
1716
1717 let data_type = expr
1718 .data_type(schema)
1719 .map(|dt| dictionary_value_type(&dt))?;
1721 Ok(Self {
1722 expr,
1723 min: MinAccumulator::try_new(&data_type)?,
1724 max: MaxAccumulator::try_new(&data_type)?,
1725 })
1726 }
1727
1728 fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1739 let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1740 self.min.update_batch(std::slice::from_ref(&array))?;
1741 self.max.update_batch(std::slice::from_ref(&array))?;
1742 Ok(())
1743 }
1744
1745 fn evaluate(mut self) -> Result<ColumnBounds> {
1752 Ok(ColumnBounds::new(
1753 self.min.evaluate()?,
1754 self.max.evaluate()?,
1755 ))
1756 }
1757}
1758
1759struct BuildSideState {
1761 batches: Vec<RecordBatch>,
1762 num_rows: usize,
1763 metrics: BuildProbeJoinMetrics,
1764 reservation: MemoryReservation,
1765 bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1766}
1767
1768impl BuildSideState {
1769 fn try_new(
1771 metrics: BuildProbeJoinMetrics,
1772 reservation: MemoryReservation,
1773 on_left: Vec<Arc<dyn PhysicalExpr>>,
1774 schema: &SchemaRef,
1775 should_compute_dynamic_filters: bool,
1776 ) -> Result<Self> {
1777 Ok(Self {
1778 batches: Vec::new(),
1779 num_rows: 0,
1780 metrics,
1781 reservation,
1782 bounds_accumulators: should_compute_dynamic_filters
1783 .then(|| {
1784 on_left
1785 .into_iter()
1786 .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1787 .collect::<Result<Vec<_>>>()
1788 })
1789 .transpose()?,
1790 })
1791 }
1792}
1793
1794fn should_collect_min_max_for_perfect_hash(
1795 on_left: &[PhysicalExprRef],
1796 schema: &SchemaRef,
1797) -> Result<bool> {
1798 if on_left.len() != 1 {
1799 return Ok(false);
1800 }
1801
1802 let expr = &on_left[0];
1803 let data_type = expr.data_type(schema)?;
1804 Ok(ArrayMap::is_supported_type(&data_type))
1805}
1806
1807#[expect(clippy::too_many_arguments)]
1836async fn collect_left_input(
1837 random_state: RandomState,
1838 left_stream: SendableRecordBatchStream,
1839 on_left: Vec<PhysicalExprRef>,
1840 metrics: BuildProbeJoinMetrics,
1841 reservation: MemoryReservation,
1842 with_visited_indices_bitmap: bool,
1843 probe_threads_count: usize,
1844 should_compute_dynamic_filters: bool,
1845 config: Arc<ConfigOptions>,
1846 null_equality: NullEquality,
1847 array_map_created_count: Count,
1848) -> Result<JoinLeftData> {
1849 let schema = left_stream.schema();
1850
1851 let should_collect_min_max_for_phj =
1852 should_collect_min_max_for_perfect_hash(&on_left, &schema)?;
1853
1854 let initial = BuildSideState::try_new(
1855 metrics,
1856 reservation,
1857 on_left.clone(),
1858 &schema,
1859 should_compute_dynamic_filters || should_collect_min_max_for_phj,
1860 )?;
1861
1862 let state = left_stream
1863 .try_fold(initial, |mut state, batch| async move {
1864 if let Some(ref mut accumulators) = state.bounds_accumulators {
1866 for accumulator in accumulators {
1867 accumulator.update_batch(&batch)?;
1868 }
1869 }
1870
1871 let batch_size = get_record_batch_memory_size(&batch);
1873 state.reservation.try_grow(batch_size)?;
1875 state.metrics.build_mem_used.add(batch_size);
1877 state.metrics.build_input_batches.add(1);
1878 state.metrics.build_input_rows.add(batch.num_rows());
1879 state.num_rows += batch.num_rows();
1881 state.batches.push(batch);
1883 Ok(state)
1884 })
1885 .await?;
1886
1887 let BuildSideState {
1889 batches,
1890 num_rows,
1891 metrics,
1892 mut reservation,
1893 bounds_accumulators,
1894 } = state;
1895
1896 let mut bounds = match bounds_accumulators {
1898 Some(accumulators) if num_rows > 0 => {
1899 let bounds = accumulators
1900 .into_iter()
1901 .map(CollectLeftAccumulator::evaluate)
1902 .collect::<Result<Vec<_>>>()?;
1903 Some(PartitionBounds::new(bounds))
1904 }
1905 _ => None,
1906 };
1907
1908 let (join_hash_map, batch, left_values) =
1909 if let Some((array_map, batch, left_value)) = try_create_array_map(
1910 &bounds,
1911 &schema,
1912 &batches,
1913 &on_left,
1914 &mut reservation,
1915 config.execution.perfect_hash_join_small_build_threshold,
1916 config.execution.perfect_hash_join_min_key_density,
1917 null_equality,
1918 )? {
1919 array_map_created_count.add(1);
1920 metrics.build_mem_used.add(array_map.size());
1921
1922 (Map::ArrayMap(array_map), batch, left_value)
1923 } else {
1924 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1927 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1928
1929 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1933 let estimated_hashtable_size =
1934 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1935 reservation.try_grow(estimated_hashtable_size)?;
1936 metrics.build_mem_used.add(estimated_hashtable_size);
1937 Box::new(JoinHashMapU64::with_capacity(num_rows))
1938 } else {
1939 let estimated_hashtable_size =
1940 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1941 reservation.try_grow(estimated_hashtable_size)?;
1942 metrics.build_mem_used.add(estimated_hashtable_size);
1943 Box::new(JoinHashMapU32::with_capacity(num_rows))
1944 };
1945
1946 let mut hashes_buffer = Vec::new();
1947 let mut offset = 0;
1948
1949 let batches_iter = batches.iter().rev();
1950
1951 for batch in batches_iter.clone() {
1953 hashes_buffer.clear();
1954 hashes_buffer.resize(batch.num_rows(), 0);
1955 update_hash(
1956 &on_left,
1957 batch,
1958 &mut *hashmap,
1959 offset,
1960 &random_state,
1961 &mut hashes_buffer,
1962 0,
1963 true,
1964 )?;
1965 offset += batch.num_rows();
1966 }
1967
1968 let batch = concat_batches(&schema, batches_iter.clone())?;
1970
1971 let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
1972
1973 (Map::HashMap(hashmap), batch, left_values)
1974 };
1975
1976 let visited_indices_bitmap = if with_visited_indices_bitmap {
1978 let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
1979 reservation.try_grow(bitmap_size)?;
1980 metrics.build_mem_used.add(bitmap_size);
1981
1982 let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
1983 bitmap_buffer.append_n(num_rows, false);
1984 bitmap_buffer
1985 } else {
1986 BooleanBufferBuilder::new(0)
1987 };
1988
1989 let map = Arc::new(join_hash_map);
1990
1991 let membership = if num_rows == 0 {
1992 PushdownStrategy::Empty
1993 } else {
1994 let estimated_size = left_values
1998 .iter()
1999 .map(|arr| arr.get_array_memory_size())
2000 .sum::<usize>();
2001 if left_values.is_empty()
2002 || left_values[0].is_empty()
2003 || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
2004 || map.num_of_distinct_key()
2005 > config
2006 .optimizer
2007 .hash_join_inlist_pushdown_max_distinct_values
2008 {
2009 PushdownStrategy::Map(Arc::clone(&map))
2010 } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
2011 PushdownStrategy::InList(in_list_values)
2012 } else {
2013 PushdownStrategy::Map(Arc::clone(&map))
2014 }
2015 };
2016
2017 if should_collect_min_max_for_phj && !should_compute_dynamic_filters {
2018 bounds = None;
2019 }
2020
2021 let data = JoinLeftData {
2022 map,
2023 batch,
2024 values: left_values,
2025 visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
2026 probe_threads_counter: AtomicUsize::new(probe_threads_count),
2027 _reservation: reservation,
2028 bounds,
2029 membership,
2030 probe_side_non_empty: AtomicBool::new(false),
2031 probe_side_has_null: AtomicBool::new(false),
2032 };
2033
2034 Ok(data)
2035}
2036
2037#[cfg(test)]
2038mod tests {
2039 use super::*;
2040
2041 fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) {
2042 if use_phj {
2043 assert!(
2044 metrics
2045 .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2046 .expect("should have array_map_created_count metrics")
2047 .as_usize()
2048 >= 1
2049 );
2050 } else {
2051 assert_eq!(
2052 metrics
2053 .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2054 .map(|v| v.as_usize())
2055 .unwrap_or(0),
2056 0
2057 )
2058 }
2059 }
2060
2061 fn build_schema_and_on() -> Result<(SchemaRef, SchemaRef, JoinOn)> {
2062 let left_schema = Arc::new(Schema::new(vec![
2063 Field::new("a1", DataType::Int32, true),
2064 Field::new("b1", DataType::Int32, true),
2065 ]));
2066 let right_schema = Arc::new(Schema::new(vec![
2067 Field::new("a2", DataType::Int32, true),
2068 Field::new("b1", DataType::Int32, true),
2069 ]));
2070 let on = vec![(
2071 Arc::new(Column::new_with_schema("b1", &left_schema)?) as _,
2072 Arc::new(Column::new_with_schema("b1", &right_schema)?) as _,
2073 )];
2074 Ok((left_schema, right_schema, on))
2075 }
2076
2077 use crate::coalesce_partitions::CoalescePartitionsExec;
2078 use crate::joins::hash_join::stream::lookup_join_hashmap;
2079 use crate::test::{TestMemoryExec, assert_join_metrics};
2080 use crate::{
2081 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2082 test::exec::MockExec,
2083 };
2084
2085 use arrow::array::{
2086 Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
2087 };
2088 use arrow::buffer::NullBuffer;
2089 use arrow::datatypes::{DataType, Field};
2090 use arrow_schema::Schema;
2091 use datafusion_common::hash_utils::create_hashes;
2092 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
2093 use datafusion_common::{
2094 ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
2095 exec_err, internal_err,
2096 };
2097 use datafusion_execution::config::SessionConfig;
2098 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2099 use datafusion_expr::Operator;
2100 use datafusion_physical_expr::PhysicalExpr;
2101 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2102 use hashbrown::HashTable;
2103 use insta::{allow_duplicates, assert_snapshot};
2104 use rstest::*;
2105 use rstest_reuse::*;
2106
2107 fn div_ceil(a: usize, b: usize) -> usize {
2108 a.div_ceil(b)
2109 }
2110
2111 #[template]
2112 #[rstest]
2113 fn hash_join_exec_configs(
2114 #[values(8192, 10, 5, 2, 1)] batch_size: usize,
2115 #[values(true, false)] use_perfect_hash_join_as_possible: bool,
2116 ) {
2117 }
2118
2119 fn prepare_task_ctx(
2120 batch_size: usize,
2121 use_perfect_hash_join_as_possible: bool,
2122 ) -> Arc<TaskContext> {
2123 let mut session_config = SessionConfig::default().with_batch_size(batch_size);
2124
2125 if use_perfect_hash_join_as_possible {
2126 session_config
2127 .options_mut()
2128 .execution
2129 .perfect_hash_join_small_build_threshold = 819200;
2130 session_config
2131 .options_mut()
2132 .execution
2133 .perfect_hash_join_min_key_density = 0.0;
2134 } else {
2135 session_config
2136 .options_mut()
2137 .execution
2138 .perfect_hash_join_small_build_threshold = 0;
2139 session_config
2140 .options_mut()
2141 .execution
2142 .perfect_hash_join_min_key_density = f64::INFINITY;
2143 }
2144 Arc::new(TaskContext::default().with_session_config(session_config))
2145 }
2146
2147 fn build_table(
2148 a: (&str, &Vec<i32>),
2149 b: (&str, &Vec<i32>),
2150 c: (&str, &Vec<i32>),
2151 ) -> Arc<dyn ExecutionPlan> {
2152 let batch = build_table_i32(a, b, c);
2153 let schema = batch.schema();
2154 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2155 }
2156
2157 fn build_table_two_cols(
2159 a: (&str, &Vec<Option<i32>>),
2160 b: (&str, &Vec<Option<i32>>),
2161 ) -> Arc<dyn ExecutionPlan> {
2162 let schema = Arc::new(Schema::new(vec![
2163 Field::new(a.0, DataType::Int32, true),
2164 Field::new(b.0, DataType::Int32, true),
2165 ]));
2166 let batch = RecordBatch::try_new(
2167 Arc::clone(&schema),
2168 vec![
2169 Arc::new(Int32Array::from(a.1.clone())),
2170 Arc::new(Int32Array::from(b.1.clone())),
2171 ],
2172 )
2173 .unwrap();
2174 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2175 }
2176
2177 fn join(
2178 left: Arc<dyn ExecutionPlan>,
2179 right: Arc<dyn ExecutionPlan>,
2180 on: JoinOn,
2181 join_type: &JoinType,
2182 null_equality: NullEquality,
2183 ) -> Result<HashJoinExec> {
2184 HashJoinExec::try_new(
2185 left,
2186 right,
2187 on,
2188 None,
2189 join_type,
2190 None,
2191 PartitionMode::CollectLeft,
2192 null_equality,
2193 false,
2194 )
2195 }
2196
2197 fn join_with_filter(
2198 left: Arc<dyn ExecutionPlan>,
2199 right: Arc<dyn ExecutionPlan>,
2200 on: JoinOn,
2201 filter: JoinFilter,
2202 join_type: &JoinType,
2203 null_equality: NullEquality,
2204 ) -> Result<HashJoinExec> {
2205 HashJoinExec::try_new(
2206 left,
2207 right,
2208 on,
2209 Some(filter),
2210 join_type,
2211 None,
2212 PartitionMode::CollectLeft,
2213 null_equality,
2214 false,
2215 )
2216 }
2217
2218 async fn join_collect(
2219 left: Arc<dyn ExecutionPlan>,
2220 right: Arc<dyn ExecutionPlan>,
2221 on: JoinOn,
2222 join_type: &JoinType,
2223 null_equality: NullEquality,
2224 context: Arc<TaskContext>,
2225 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2226 let join = join(left, right, on, join_type, null_equality)?;
2227 let columns_header = columns(&join.schema());
2228
2229 let stream = join.execute(0, context)?;
2230 let batches = common::collect(stream).await?;
2231 let metrics = join.metrics().unwrap();
2232
2233 Ok((columns_header, batches, metrics))
2234 }
2235
2236 async fn partitioned_join_collect(
2237 left: Arc<dyn ExecutionPlan>,
2238 right: Arc<dyn ExecutionPlan>,
2239 on: JoinOn,
2240 join_type: &JoinType,
2241 null_equality: NullEquality,
2242 context: Arc<TaskContext>,
2243 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2244 join_collect_with_partition_mode(
2245 left,
2246 right,
2247 on,
2248 join_type,
2249 PartitionMode::Partitioned,
2250 null_equality,
2251 context,
2252 )
2253 .await
2254 }
2255
2256 async fn join_collect_with_partition_mode(
2257 left: Arc<dyn ExecutionPlan>,
2258 right: Arc<dyn ExecutionPlan>,
2259 on: JoinOn,
2260 join_type: &JoinType,
2261 partition_mode: PartitionMode,
2262 null_equality: NullEquality,
2263 context: Arc<TaskContext>,
2264 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2265 let partition_count = 4;
2266
2267 let (left_expr, right_expr) = on
2268 .iter()
2269 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
2270 .unzip();
2271
2272 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2273 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
2274 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2275 left,
2276 Partitioning::Hash(left_expr, partition_count),
2277 )?),
2278 PartitionMode::Auto => {
2279 return internal_err!("Unexpected PartitionMode::Auto in join tests");
2280 }
2281 };
2282
2283 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2284 PartitionMode::CollectLeft => {
2285 let partition_column_name = right.schema().field(0).name().clone();
2286 let partition_expr = vec![Arc::new(Column::new_with_schema(
2287 &partition_column_name,
2288 &right.schema(),
2289 )?) as _];
2290 Arc::new(RepartitionExec::try_new(
2291 right,
2292 Partitioning::Hash(partition_expr, partition_count),
2293 )?) as _
2294 }
2295 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2296 right,
2297 Partitioning::Hash(right_expr, partition_count),
2298 )?),
2299 PartitionMode::Auto => {
2300 return internal_err!("Unexpected PartitionMode::Auto in join tests");
2301 }
2302 };
2303
2304 let join = HashJoinExec::try_new(
2305 left_repartitioned,
2306 right_repartitioned,
2307 on,
2308 None,
2309 join_type,
2310 None,
2311 partition_mode,
2312 null_equality,
2313 false,
2314 )?;
2315
2316 let columns = columns(&join.schema());
2317
2318 let mut batches = vec![];
2319 for i in 0..partition_count {
2320 let stream = join.execute(i, Arc::clone(&context))?;
2321 let more_batches = common::collect(stream).await?;
2322 batches.extend(
2323 more_batches
2324 .into_iter()
2325 .filter(|b| b.num_rows() > 0)
2326 .collect::<Vec<_>>(),
2327 );
2328 }
2329 let metrics = join.metrics().unwrap();
2330
2331 Ok((columns, batches, metrics))
2332 }
2333
2334 #[apply(hash_join_exec_configs)]
2335 #[tokio::test]
2336 async fn join_inner_one(
2337 batch_size: usize,
2338 use_perfect_hash_join_as_possible: bool,
2339 ) -> Result<()> {
2340 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2341 let left = build_table(
2342 ("a1", &vec![1, 2, 3]),
2343 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2345 );
2346 let right = build_table(
2347 ("a2", &vec![10, 20, 30]),
2348 ("b1", &vec![4, 5, 6]),
2349 ("c2", &vec![70, 80, 90]),
2350 );
2351
2352 let on = vec![(
2353 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2354 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2355 )];
2356
2357 let (columns, batches, metrics) = join_collect(
2358 Arc::clone(&left),
2359 Arc::clone(&right),
2360 on.clone(),
2361 &JoinType::Inner,
2362 NullEquality::NullEqualsNothing,
2363 task_ctx,
2364 )
2365 .await?;
2366
2367 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2368
2369 allow_duplicates! {
2370 assert_snapshot!(batches_to_string(&batches), @r"
2372 +----+----+----+----+----+----+
2373 | a1 | b1 | c1 | a2 | b1 | c2 |
2374 +----+----+----+----+----+----+
2375 | 1 | 4 | 7 | 10 | 4 | 70 |
2376 | 2 | 5 | 8 | 20 | 5 | 80 |
2377 | 3 | 5 | 9 | 20 | 5 | 80 |
2378 +----+----+----+----+----+----+
2379 ");
2380 }
2381
2382 assert_join_metrics!(metrics, 3);
2383 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2384
2385 Ok(())
2386 }
2387
2388 #[apply(hash_join_exec_configs)]
2389 #[tokio::test]
2390 async fn partitioned_join_inner_one(
2391 batch_size: usize,
2392 use_perfect_hash_join_as_possible: bool,
2393 ) -> Result<()> {
2394 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2395 let left = build_table(
2396 ("a1", &vec![1, 2, 3]),
2397 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2399 );
2400 let right = build_table(
2401 ("a2", &vec![10, 20, 30]),
2402 ("b1", &vec![4, 5, 6]),
2403 ("c2", &vec![70, 80, 90]),
2404 );
2405 let on = vec![(
2406 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2407 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2408 )];
2409
2410 let (columns, batches, metrics) = partitioned_join_collect(
2411 Arc::clone(&left),
2412 Arc::clone(&right),
2413 on.clone(),
2414 &JoinType::Inner,
2415 NullEquality::NullEqualsNothing,
2416 task_ctx,
2417 )
2418 .await?;
2419
2420 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2421
2422 allow_duplicates! {
2423 assert_snapshot!(batches_to_sort_string(&batches), @r"
2424 +----+----+----+----+----+----+
2425 | a1 | b1 | c1 | a2 | b1 | c2 |
2426 +----+----+----+----+----+----+
2427 | 1 | 4 | 7 | 10 | 4 | 70 |
2428 | 2 | 5 | 8 | 20 | 5 | 80 |
2429 | 3 | 5 | 9 | 20 | 5 | 80 |
2430 +----+----+----+----+----+----+
2431 ");
2432 }
2433
2434 assert_join_metrics!(metrics, 3);
2435 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2436
2437 Ok(())
2438 }
2439
2440 #[tokio::test]
2441 async fn join_inner_one_no_shared_column_names() -> Result<()> {
2442 let task_ctx = Arc::new(TaskContext::default());
2443 let left = build_table(
2444 ("a1", &vec![1, 2, 3]),
2445 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2447 );
2448 let right = build_table(
2449 ("a2", &vec![10, 20, 30]),
2450 ("b2", &vec![4, 5, 6]),
2451 ("c2", &vec![70, 80, 90]),
2452 );
2453 let on = vec![(
2454 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2455 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2456 )];
2457
2458 let (columns, batches, metrics) = join_collect(
2459 left,
2460 right,
2461 on,
2462 &JoinType::Inner,
2463 NullEquality::NullEqualsNothing,
2464 task_ctx,
2465 )
2466 .await?;
2467
2468 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2469
2470 allow_duplicates! {
2472 assert_snapshot!(batches_to_string(&batches), @r"
2473 +----+----+----+----+----+----+
2474 | a1 | b1 | c1 | a2 | b2 | c2 |
2475 +----+----+----+----+----+----+
2476 | 1 | 4 | 7 | 10 | 4 | 70 |
2477 | 2 | 5 | 8 | 20 | 5 | 80 |
2478 | 3 | 5 | 9 | 20 | 5 | 80 |
2479 +----+----+----+----+----+----+
2480 ");
2481 }
2482
2483 assert_join_metrics!(metrics, 3);
2484
2485 Ok(())
2486 }
2487
2488 #[tokio::test]
2489 async fn join_inner_one_randomly_ordered() -> Result<()> {
2490 let task_ctx = Arc::new(TaskContext::default());
2491 let left = build_table(
2492 ("a1", &vec![0, 3, 2, 1]),
2493 ("b1", &vec![4, 5, 5, 4]),
2494 ("c1", &vec![6, 9, 8, 7]),
2495 );
2496 let right = build_table(
2497 ("a2", &vec![20, 30, 10]),
2498 ("b2", &vec![5, 6, 4]),
2499 ("c2", &vec![80, 90, 70]),
2500 );
2501 let on = vec![(
2502 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2503 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2504 )];
2505
2506 let (columns, batches, metrics) = join_collect(
2507 left,
2508 right,
2509 on,
2510 &JoinType::Inner,
2511 NullEquality::NullEqualsNothing,
2512 task_ctx,
2513 )
2514 .await?;
2515
2516 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2517
2518 allow_duplicates! {
2520 assert_snapshot!(batches_to_string(&batches), @r"
2521 +----+----+----+----+----+----+
2522 | a1 | b1 | c1 | a2 | b2 | c2 |
2523 +----+----+----+----+----+----+
2524 | 3 | 5 | 9 | 20 | 5 | 80 |
2525 | 2 | 5 | 8 | 20 | 5 | 80 |
2526 | 0 | 4 | 6 | 10 | 4 | 70 |
2527 | 1 | 4 | 7 | 10 | 4 | 70 |
2528 +----+----+----+----+----+----+
2529 ");
2530 }
2531
2532 assert_join_metrics!(metrics, 4);
2533
2534 Ok(())
2535 }
2536
2537 #[apply(hash_join_exec_configs)]
2538 #[tokio::test]
2539 async fn join_inner_two(
2540 batch_size: usize,
2541 use_perfect_hash_join_as_possible: bool,
2542 ) -> Result<()> {
2543 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2544 let left = build_table(
2545 ("a1", &vec![1, 2, 2]),
2546 ("b2", &vec![1, 2, 2]),
2547 ("c1", &vec![7, 8, 9]),
2548 );
2549 let right = build_table(
2550 ("a1", &vec![1, 2, 3]),
2551 ("b2", &vec![1, 2, 2]),
2552 ("c2", &vec![70, 80, 90]),
2553 );
2554 let on = vec![
2555 (
2556 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2557 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2558 ),
2559 (
2560 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2561 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2562 ),
2563 ];
2564
2565 let (columns, batches, metrics) = join_collect(
2566 left,
2567 right,
2568 on,
2569 &JoinType::Inner,
2570 NullEquality::NullEqualsNothing,
2571 task_ctx,
2572 )
2573 .await?;
2574
2575 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2576
2577 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2578 let mut expected_batch_count = div_ceil(3, batch_size);
2581 if batch_size == 1 {
2582 expected_batch_count += 1;
2583 }
2584 expected_batch_count
2585 } else {
2586 div_ceil(9, batch_size)
2589 };
2590
2591 assert!(
2593 batches.len() <= expected_batch_count,
2594 "expected at most {expected_batch_count} batches, got {}",
2595 batches.len()
2596 );
2597
2598 allow_duplicates! {
2600 assert_snapshot!(batches_to_string(&batches), @r"
2601 +----+----+----+----+----+----+
2602 | a1 | b2 | c1 | a1 | b2 | c2 |
2603 +----+----+----+----+----+----+
2604 | 1 | 1 | 7 | 1 | 1 | 70 |
2605 | 2 | 2 | 8 | 2 | 2 | 80 |
2606 | 2 | 2 | 9 | 2 | 2 | 80 |
2607 +----+----+----+----+----+----+
2608 ");
2609 }
2610
2611 assert_join_metrics!(metrics, 3);
2612
2613 Ok(())
2614 }
2615
2616 #[apply(hash_join_exec_configs)]
2618 #[tokio::test]
2619 async fn join_inner_one_two_parts_left(
2620 batch_size: usize,
2621 use_perfect_hash_join_as_possible: bool,
2622 ) -> Result<()> {
2623 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2624 let batch1 = build_table_i32(
2625 ("a1", &vec![1, 2]),
2626 ("b2", &vec![1, 2]),
2627 ("c1", &vec![7, 8]),
2628 );
2629 let batch2 =
2630 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2631 let schema = batch1.schema();
2632 let left =
2633 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2634 .unwrap();
2635 let left = Arc::new(CoalescePartitionsExec::new(left));
2636
2637 let right = build_table(
2638 ("a1", &vec![1, 2, 3]),
2639 ("b2", &vec![1, 2, 2]),
2640 ("c2", &vec![70, 80, 90]),
2641 );
2642 let on = vec![
2643 (
2644 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2645 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2646 ),
2647 (
2648 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2649 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2650 ),
2651 ];
2652
2653 let (columns, batches, metrics) = join_collect(
2654 left,
2655 right,
2656 on,
2657 &JoinType::Inner,
2658 NullEquality::NullEqualsNothing,
2659 task_ctx,
2660 )
2661 .await?;
2662
2663 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2664
2665 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2666 let mut expected_batch_count = div_ceil(3, batch_size);
2669 if batch_size == 1 {
2670 expected_batch_count += 1;
2671 }
2672 expected_batch_count
2673 } else {
2674 div_ceil(9, batch_size)
2677 };
2678
2679 assert!(
2681 batches.len() <= expected_batch_count,
2682 "expected at most {expected_batch_count} batches, got {}",
2683 batches.len()
2684 );
2685
2686 allow_duplicates! {
2688 assert_snapshot!(batches_to_string(&batches), @r"
2689 +----+----+----+----+----+----+
2690 | a1 | b2 | c1 | a1 | b2 | c2 |
2691 +----+----+----+----+----+----+
2692 | 1 | 1 | 7 | 1 | 1 | 70 |
2693 | 2 | 2 | 8 | 2 | 2 | 80 |
2694 | 2 | 2 | 9 | 2 | 2 | 80 |
2695 +----+----+----+----+----+----+
2696 ");
2697 }
2698
2699 assert_join_metrics!(metrics, 3);
2700
2701 Ok(())
2702 }
2703
2704 #[tokio::test]
2705 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2706 let task_ctx = Arc::new(TaskContext::default());
2707 let batch1 = build_table_i32(
2708 ("a1", &vec![0, 3]),
2709 ("b1", &vec![4, 5]),
2710 ("c1", &vec![6, 9]),
2711 );
2712 let batch2 = build_table_i32(
2713 ("a1", &vec![2, 1]),
2714 ("b1", &vec![5, 4]),
2715 ("c1", &vec![8, 7]),
2716 );
2717 let schema = batch1.schema();
2718
2719 let left =
2720 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2721 .unwrap();
2722 let left = Arc::new(CoalescePartitionsExec::new(left));
2723 let right = build_table(
2724 ("a2", &vec![20, 30, 10]),
2725 ("b2", &vec![5, 6, 4]),
2726 ("c2", &vec![80, 90, 70]),
2727 );
2728 let on = vec![(
2729 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2730 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2731 )];
2732
2733 let (columns, batches, metrics) = join_collect(
2734 left,
2735 right,
2736 on,
2737 &JoinType::Inner,
2738 NullEquality::NullEqualsNothing,
2739 task_ctx,
2740 )
2741 .await?;
2742
2743 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2744
2745 allow_duplicates! {
2747 assert_snapshot!(batches_to_string(&batches), @r"
2748 +----+----+----+----+----+----+
2749 | a1 | b1 | c1 | a2 | b2 | c2 |
2750 +----+----+----+----+----+----+
2751 | 3 | 5 | 9 | 20 | 5 | 80 |
2752 | 2 | 5 | 8 | 20 | 5 | 80 |
2753 | 0 | 4 | 6 | 10 | 4 | 70 |
2754 | 1 | 4 | 7 | 10 | 4 | 70 |
2755 +----+----+----+----+----+----+
2756 ");
2757 }
2758
2759 assert_join_metrics!(metrics, 4);
2760
2761 Ok(())
2762 }
2763
2764 #[apply(hash_join_exec_configs)]
2766 #[tokio::test]
2767 async fn join_inner_one_two_parts_right(
2768 batch_size: usize,
2769 use_perfect_hash_join_as_possible: bool,
2770 ) -> Result<()> {
2771 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2772 let left = build_table(
2773 ("a1", &vec![1, 2, 3]),
2774 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2776 );
2777
2778 let batch1 = build_table_i32(
2779 ("a2", &vec![10, 20]),
2780 ("b1", &vec![4, 6]),
2781 ("c2", &vec![70, 80]),
2782 );
2783 let batch2 =
2784 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2785 let schema = batch1.schema();
2786 let right =
2787 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2788 .unwrap();
2789
2790 let on = vec![(
2791 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2792 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2793 )];
2794
2795 let join = join(
2796 left,
2797 right,
2798 on,
2799 &JoinType::Inner,
2800 NullEquality::NullEqualsNothing,
2801 )?;
2802
2803 let columns = columns(&join.schema());
2804 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2805
2806 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2808 let batches = common::collect(stream).await?;
2809
2810 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2811 let mut expected_batch_count = div_ceil(1, batch_size);
2814 if batch_size == 1 {
2815 expected_batch_count += 1;
2816 }
2817 expected_batch_count
2818 } else {
2819 div_ceil(6, batch_size)
2822 };
2823 assert!(
2825 batches.len() <= expected_batch_count,
2826 "expected at most {expected_batch_count} batches, got {}",
2827 batches.len()
2828 );
2829
2830 allow_duplicates! {
2832 assert_snapshot!(batches_to_string(&batches), @r"
2833 +----+----+----+----+----+----+
2834 | a1 | b1 | c1 | a2 | b1 | c2 |
2835 +----+----+----+----+----+----+
2836 | 1 | 4 | 7 | 10 | 4 | 70 |
2837 +----+----+----+----+----+----+
2838 ");
2839 }
2840
2841 let stream = join.execute(1, Arc::clone(&task_ctx))?;
2843 let batches = common::collect(stream).await?;
2844
2845 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2846 div_ceil(2, batch_size)
2848 } else {
2849 div_ceil(3, batch_size)
2852 };
2853 assert!(
2855 batches.len() <= expected_batch_count,
2856 "expected at most {expected_batch_count} batches, got {}",
2857 batches.len()
2858 );
2859
2860 allow_duplicates! {
2862 assert_snapshot!(batches_to_string(&batches), @r"
2863 +----+----+----+----+----+----+
2864 | a1 | b1 | c1 | a2 | b1 | c2 |
2865 +----+----+----+----+----+----+
2866 | 2 | 5 | 8 | 30 | 5 | 90 |
2867 | 3 | 5 | 9 | 30 | 5 | 90 |
2868 +----+----+----+----+----+----+
2869 ");
2870 }
2871
2872 let metrics = join.metrics().unwrap();
2873 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2874
2875 Ok(())
2876 }
2877
2878 fn build_table_two_batches(
2879 a: (&str, &Vec<i32>),
2880 b: (&str, &Vec<i32>),
2881 c: (&str, &Vec<i32>),
2882 ) -> Arc<dyn ExecutionPlan> {
2883 let batch = build_table_i32(a, b, c);
2884 let schema = batch.schema();
2885 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2886 }
2887
2888 #[apply(hash_join_exec_configs)]
2889 #[tokio::test]
2890 async fn join_left_multi_batch(
2891 batch_size: usize,
2892 use_perfect_hash_join_as_possible: bool,
2893 ) -> Result<()> {
2894 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2895 let left = build_table(
2896 ("a1", &vec![1, 2, 3]),
2897 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2899 );
2900 let right = build_table_two_batches(
2901 ("a2", &vec![10, 20, 30]),
2902 ("b1", &vec![4, 5, 6]),
2903 ("c2", &vec![70, 80, 90]),
2904 );
2905 let on = vec![(
2906 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2907 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2908 )];
2909
2910 let join = join(
2911 Arc::clone(&left),
2912 Arc::clone(&right),
2913 on.clone(),
2914 &JoinType::Left,
2915 NullEquality::NullEqualsNothing,
2916 )
2917 .unwrap();
2918
2919 let columns = columns(&join.schema());
2920 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2921
2922 let (_, batches, metrics) = join_collect(
2923 Arc::clone(&left),
2924 Arc::clone(&right),
2925 on.clone(),
2926 &JoinType::Left,
2927 NullEquality::NullEqualsNothing,
2928 task_ctx,
2929 )
2930 .await?;
2931
2932 allow_duplicates! {
2933 assert_snapshot!(batches_to_sort_string(&batches), @r"
2934 +----+----+----+----+----+----+
2935 | a1 | b1 | c1 | a2 | b1 | c2 |
2936 +----+----+----+----+----+----+
2937 | 1 | 4 | 7 | 10 | 4 | 70 |
2938 | 1 | 4 | 7 | 10 | 4 | 70 |
2939 | 2 | 5 | 8 | 20 | 5 | 80 |
2940 | 2 | 5 | 8 | 20 | 5 | 80 |
2941 | 3 | 7 | 9 | | | |
2942 +----+----+----+----+----+----+
2943 ");
2944 }
2945
2946 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2947 return Ok(());
2948 }
2949
2950 #[apply(hash_join_exec_configs)]
2951 #[tokio::test]
2952 async fn join_full_multi_batch(
2953 batch_size: usize,
2954 use_perfect_hash_join_as_possible: bool,
2955 ) {
2956 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2957 let left = build_table(
2958 ("a1", &vec![1, 2, 3]),
2959 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2961 );
2962 let right = build_table_two_batches(
2964 ("a2", &vec![10, 20, 30]),
2965 ("b2", &vec![4, 5, 6]),
2966 ("c2", &vec![70, 80, 90]),
2967 );
2968 let on = vec![(
2969 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2970 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2971 )];
2972
2973 let join = join(
2974 left,
2975 right,
2976 on,
2977 &JoinType::Full,
2978 NullEquality::NullEqualsNothing,
2979 )
2980 .unwrap();
2981
2982 let columns = columns(&join.schema());
2983 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2984
2985 let stream = join.execute(0, task_ctx).unwrap();
2986 let batches = common::collect(stream).await.unwrap();
2987 let metrics = join.metrics().unwrap();
2988
2989 allow_duplicates! {
2990 assert_snapshot!(batches_to_sort_string(&batches), @r"
2991 +----+----+----+----+----+----+
2992 | a1 | b1 | c1 | a2 | b2 | c2 |
2993 +----+----+----+----+----+----+
2994 | | | | 30 | 6 | 90 |
2995 | | | | 30 | 6 | 90 |
2996 | 1 | 4 | 7 | 10 | 4 | 70 |
2997 | 1 | 4 | 7 | 10 | 4 | 70 |
2998 | 2 | 5 | 8 | 20 | 5 | 80 |
2999 | 2 | 5 | 8 | 20 | 5 | 80 |
3000 | 3 | 7 | 9 | | | |
3001 +----+----+----+----+----+----+
3002 ");
3003 }
3004
3005 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3006 }
3007
3008 #[apply(hash_join_exec_configs)]
3009 #[tokio::test]
3010 async fn join_left_empty_right(
3011 batch_size: usize,
3012 use_perfect_hash_join_as_possible: bool,
3013 ) {
3014 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3015 let left = build_table(
3016 ("a1", &vec![1, 2, 3]),
3017 ("b1", &vec![4, 5, 7]),
3018 ("c1", &vec![7, 8, 9]),
3019 );
3020 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3021 let on = vec![(
3022 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3023 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3024 )];
3025 let schema = right.schema();
3026 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3027 let join = join(
3028 left,
3029 right,
3030 on,
3031 &JoinType::Left,
3032 NullEquality::NullEqualsNothing,
3033 )
3034 .unwrap();
3035
3036 let columns = columns(&join.schema());
3037 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3038
3039 let stream = join.execute(0, task_ctx).unwrap();
3040 let batches = common::collect(stream).await.unwrap();
3041 let metrics = join.metrics().unwrap();
3042
3043 allow_duplicates! {
3044 assert_snapshot!(batches_to_sort_string(&batches), @r"
3045 +----+----+----+----+----+----+
3046 | a1 | b1 | c1 | a2 | b1 | c2 |
3047 +----+----+----+----+----+----+
3048 | 1 | 4 | 7 | | | |
3049 | 2 | 5 | 8 | | | |
3050 | 3 | 7 | 9 | | | |
3051 +----+----+----+----+----+----+
3052 ");
3053 }
3054
3055 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3056 }
3057
3058 #[apply(hash_join_exec_configs)]
3059 #[tokio::test]
3060 async fn join_full_empty_right(
3061 batch_size: usize,
3062 use_perfect_hash_join_as_possible: bool,
3063 ) {
3064 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3065 let left = build_table(
3066 ("a1", &vec![1, 2, 3]),
3067 ("b1", &vec![4, 5, 7]),
3068 ("c1", &vec![7, 8, 9]),
3069 );
3070 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
3071 let on = vec![(
3072 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3073 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3074 )];
3075 let schema = right.schema();
3076 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3077 let join = join(
3078 left,
3079 right,
3080 on,
3081 &JoinType::Full,
3082 NullEquality::NullEqualsNothing,
3083 )
3084 .unwrap();
3085
3086 let columns = columns(&join.schema());
3087 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3088
3089 let stream = join.execute(0, task_ctx).unwrap();
3090 let batches = common::collect(stream).await.unwrap();
3091 let metrics = join.metrics().unwrap();
3092
3093 allow_duplicates! {
3094 assert_snapshot!(batches_to_sort_string(&batches), @r"
3095 +----+----+----+----+----+----+
3096 | a1 | b1 | c1 | a2 | b2 | c2 |
3097 +----+----+----+----+----+----+
3098 | 1 | 4 | 7 | | | |
3099 | 2 | 5 | 8 | | | |
3100 | 3 | 7 | 9 | | | |
3101 +----+----+----+----+----+----+
3102 ");
3103 }
3104
3105 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3106 }
3107
3108 #[apply(hash_join_exec_configs)]
3109 #[tokio::test]
3110 async fn join_left_one(
3111 batch_size: usize,
3112 use_perfect_hash_join_as_possible: bool,
3113 ) -> Result<()> {
3114 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3115 let left = build_table(
3116 ("a1", &vec![1, 2, 3]),
3117 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3119 );
3120 let right = build_table(
3121 ("a2", &vec![10, 20, 30]),
3122 ("b1", &vec![4, 5, 6]),
3123 ("c2", &vec![70, 80, 90]),
3124 );
3125 let on = vec![(
3126 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3127 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3128 )];
3129
3130 let (columns, batches, metrics) = join_collect(
3131 Arc::clone(&left),
3132 Arc::clone(&right),
3133 on.clone(),
3134 &JoinType::Left,
3135 NullEquality::NullEqualsNothing,
3136 task_ctx,
3137 )
3138 .await?;
3139
3140 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3141
3142 allow_duplicates! {
3143 assert_snapshot!(batches_to_sort_string(&batches), @r"
3144 +----+----+----+----+----+----+
3145 | a1 | b1 | c1 | a2 | b1 | c2 |
3146 +----+----+----+----+----+----+
3147 | 1 | 4 | 7 | 10 | 4 | 70 |
3148 | 2 | 5 | 8 | 20 | 5 | 80 |
3149 | 3 | 7 | 9 | | | |
3150 +----+----+----+----+----+----+
3151 ");
3152 }
3153
3154 assert_join_metrics!(metrics, 3);
3155 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3156
3157 Ok(())
3158 }
3159
3160 #[apply(hash_join_exec_configs)]
3161 #[tokio::test]
3162 async fn partitioned_join_left_one(
3163 batch_size: usize,
3164 use_perfect_hash_join_as_possible: bool,
3165 ) -> Result<()> {
3166 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3167 let left = build_table(
3168 ("a1", &vec![1, 2, 3]),
3169 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3171 );
3172 let right = build_table(
3173 ("a2", &vec![10, 20, 30]),
3174 ("b1", &vec![4, 5, 6]),
3175 ("c2", &vec![70, 80, 90]),
3176 );
3177 let on = vec![(
3178 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3179 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3180 )];
3181
3182 let (columns, batches, metrics) = partitioned_join_collect(
3183 Arc::clone(&left),
3184 Arc::clone(&right),
3185 on.clone(),
3186 &JoinType::Left,
3187 NullEquality::NullEqualsNothing,
3188 task_ctx,
3189 )
3190 .await?;
3191
3192 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3193
3194 allow_duplicates! {
3195 assert_snapshot!(batches_to_sort_string(&batches), @r"
3196 +----+----+----+----+----+----+
3197 | a1 | b1 | c1 | a2 | b1 | c2 |
3198 +----+----+----+----+----+----+
3199 | 1 | 4 | 7 | 10 | 4 | 70 |
3200 | 2 | 5 | 8 | 20 | 5 | 80 |
3201 | 3 | 7 | 9 | | | |
3202 +----+----+----+----+----+----+
3203 ");
3204 }
3205
3206 assert_join_metrics!(metrics, 3);
3207 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3208
3209 Ok(())
3210 }
3211
3212 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
3213 build_table(
3216 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
3217 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
3218 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
3219 )
3220 }
3221
3222 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
3223 build_table(
3226 ("a2", &vec![8, 12, 6, 2, 10, 4]),
3227 ("b2", &vec![8, 10, 6, 2, 10, 4]),
3228 ("c2", &vec![20, 40, 60, 80, 100, 120]),
3229 )
3230 }
3231
3232 #[apply(hash_join_exec_configs)]
3233 #[tokio::test]
3234 async fn join_left_semi(
3235 batch_size: usize,
3236 use_perfect_hash_join_as_possible: bool,
3237 ) -> Result<()> {
3238 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3239 let left = build_semi_anti_left_table();
3240 let right = build_semi_anti_right_table();
3241 let on = vec![(
3243 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3244 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3245 )];
3246
3247 let join = join(
3248 left,
3249 right,
3250 on,
3251 &JoinType::LeftSemi,
3252 NullEquality::NullEqualsNothing,
3253 )?;
3254
3255 let columns = columns(&join.schema());
3256 assert_eq!(columns, vec!["a1", "b1", "c1"]);
3257
3258 let stream = join.execute(0, task_ctx)?;
3259 let batches = common::collect(stream).await?;
3260
3261 allow_duplicates! {
3263 assert_snapshot!(batches_to_sort_string(&batches), @r"
3264 +----+----+-----+
3265 | a1 | b1 | c1 |
3266 +----+----+-----+
3267 | 11 | 8 | 110 |
3268 | 13 | 10 | 130 |
3269 | 9 | 8 | 90 |
3270 +----+----+-----+
3271 ");
3272 }
3273
3274 let metrics = join.metrics().unwrap();
3275 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3276
3277 Ok(())
3278 }
3279
3280 #[apply(hash_join_exec_configs)]
3281 #[tokio::test]
3282 async fn join_left_semi_with_filter(
3283 batch_size: usize,
3284 use_perfect_hash_join_as_possible: bool,
3285 ) -> Result<()> {
3286 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3287 let left = build_semi_anti_left_table();
3288 let right = build_semi_anti_right_table();
3289
3290 let on = vec![(
3292 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3293 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3294 )];
3295
3296 let column_indices = vec![ColumnIndex {
3297 index: 0,
3298 side: JoinSide::Right,
3299 }];
3300 let intermediate_schema =
3301 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3302
3303 let filter_expression = Arc::new(BinaryExpr::new(
3304 Arc::new(Column::new("x", 0)),
3305 Operator::NotEq,
3306 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3307 )) as Arc<dyn PhysicalExpr>;
3308
3309 let filter = JoinFilter::new(
3310 filter_expression,
3311 column_indices.clone(),
3312 Arc::new(intermediate_schema.clone()),
3313 );
3314
3315 let join = join_with_filter(
3316 Arc::clone(&left),
3317 Arc::clone(&right),
3318 on.clone(),
3319 filter,
3320 &JoinType::LeftSemi,
3321 NullEquality::NullEqualsNothing,
3322 )?;
3323
3324 let columns_header = columns(&join.schema());
3325 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
3326
3327 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3328 let batches = common::collect(stream).await?;
3329
3330 allow_duplicates! {
3331 assert_snapshot!(batches_to_sort_string(&batches), @r"
3332 +----+----+-----+
3333 | a1 | b1 | c1 |
3334 +----+----+-----+
3335 | 11 | 8 | 110 |
3336 | 13 | 10 | 130 |
3337 | 9 | 8 | 90 |
3338 +----+----+-----+
3339 ");
3340 }
3341
3342 let metrics = join.metrics().unwrap();
3343 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3344
3345 let filter_expression = Arc::new(BinaryExpr::new(
3347 Arc::new(Column::new("x", 0)),
3348 Operator::Gt,
3349 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3350 )) as Arc<dyn PhysicalExpr>;
3351 let filter = JoinFilter::new(
3352 filter_expression,
3353 column_indices,
3354 Arc::new(intermediate_schema),
3355 );
3356
3357 let join = join_with_filter(
3358 left,
3359 right,
3360 on,
3361 filter,
3362 &JoinType::LeftSemi,
3363 NullEquality::NullEqualsNothing,
3364 )?;
3365
3366 let columns_header = columns(&join.schema());
3367 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3368
3369 let stream = join.execute(0, task_ctx)?;
3370 let batches = common::collect(stream).await?;
3371
3372 allow_duplicates! {
3373 assert_snapshot!(batches_to_sort_string(&batches), @r"
3374 +----+----+-----+
3375 | a1 | b1 | c1 |
3376 +----+----+-----+
3377 | 13 | 10 | 130 |
3378 +----+----+-----+
3379 ");
3380 }
3381
3382 let metrics = join.metrics().unwrap();
3383 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3384
3385 Ok(())
3386 }
3387
3388 #[apply(hash_join_exec_configs)]
3389 #[tokio::test]
3390 async fn join_right_semi(
3391 batch_size: usize,
3392 use_perfect_hash_join_as_possible: bool,
3393 ) -> Result<()> {
3394 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3395 let left = build_semi_anti_left_table();
3396 let right = build_semi_anti_right_table();
3397
3398 let on = vec![(
3400 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3401 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3402 )];
3403
3404 let join = join(
3405 left,
3406 right,
3407 on,
3408 &JoinType::RightSemi,
3409 NullEquality::NullEqualsNothing,
3410 )?;
3411
3412 let columns = columns(&join.schema());
3413 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3414
3415 let stream = join.execute(0, task_ctx)?;
3416 let batches = common::collect(stream).await?;
3417
3418 allow_duplicates! {
3420 assert_snapshot!(batches_to_string(&batches), @r"
3421 +----+----+-----+
3422 | a2 | b2 | c2 |
3423 +----+----+-----+
3424 | 8 | 8 | 20 |
3425 | 12 | 10 | 40 |
3426 | 10 | 10 | 100 |
3427 +----+----+-----+
3428 ");
3429 }
3430
3431 let metrics = join.metrics().unwrap();
3432 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3433
3434 Ok(())
3435 }
3436
3437 #[apply(hash_join_exec_configs)]
3438 #[tokio::test]
3439 async fn join_right_semi_with_filter(
3440 batch_size: usize,
3441 use_perfect_hash_join_as_possible: bool,
3442 ) -> Result<()> {
3443 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3444 let left = build_semi_anti_left_table();
3445 let right = build_semi_anti_right_table();
3446
3447 let on = vec![(
3449 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3450 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3451 )];
3452
3453 let column_indices = vec![ColumnIndex {
3454 index: 0,
3455 side: JoinSide::Left,
3456 }];
3457 let intermediate_schema =
3458 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3459
3460 let filter_expression = Arc::new(BinaryExpr::new(
3461 Arc::new(Column::new("x", 0)),
3462 Operator::NotEq,
3463 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
3464 )) as Arc<dyn PhysicalExpr>;
3465
3466 let filter = JoinFilter::new(
3467 filter_expression,
3468 column_indices.clone(),
3469 Arc::new(intermediate_schema.clone()),
3470 );
3471
3472 let join = join_with_filter(
3473 Arc::clone(&left),
3474 Arc::clone(&right),
3475 on.clone(),
3476 filter,
3477 &JoinType::RightSemi,
3478 NullEquality::NullEqualsNothing,
3479 )?;
3480
3481 let columns = columns(&join.schema());
3482 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3483
3484 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3485 let batches = common::collect(stream).await?;
3486
3487 allow_duplicates! {
3489 assert_snapshot!(batches_to_string(&batches), @r"
3490 +----+----+-----+
3491 | a2 | b2 | c2 |
3492 +----+----+-----+
3493 | 8 | 8 | 20 |
3494 | 12 | 10 | 40 |
3495 | 10 | 10 | 100 |
3496 +----+----+-----+
3497 ");
3498 }
3499
3500 let metrics = join.metrics().unwrap();
3501 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3502
3503 let filter_expression = Arc::new(BinaryExpr::new(
3505 Arc::new(Column::new("x", 0)),
3506 Operator::Gt,
3507 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
3508 )) as Arc<dyn PhysicalExpr>;
3509
3510 let filter = JoinFilter::new(
3511 filter_expression,
3512 column_indices,
3513 Arc::new(intermediate_schema.clone()),
3514 );
3515
3516 let join = join_with_filter(
3517 left,
3518 right,
3519 on,
3520 filter,
3521 &JoinType::RightSemi,
3522 NullEquality::NullEqualsNothing,
3523 )?;
3524 let stream = join.execute(0, task_ctx)?;
3525 let batches = common::collect(stream).await?;
3526
3527 allow_duplicates! {
3529 assert_snapshot!(batches_to_string(&batches), @r"
3530 +----+----+-----+
3531 | a2 | b2 | c2 |
3532 +----+----+-----+
3533 | 12 | 10 | 40 |
3534 | 10 | 10 | 100 |
3535 +----+----+-----+
3536 ");
3537 }
3538
3539 let metrics = join.metrics().unwrap();
3540 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3541
3542 Ok(())
3543 }
3544
3545 #[apply(hash_join_exec_configs)]
3546 #[tokio::test]
3547 async fn join_left_anti(
3548 batch_size: usize,
3549 use_perfect_hash_join_as_possible: bool,
3550 ) -> Result<()> {
3551 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3552 let left = build_semi_anti_left_table();
3553 let right = build_semi_anti_right_table();
3554 let on = vec![(
3556 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3557 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3558 )];
3559
3560 let join = join(
3561 left,
3562 right,
3563 on,
3564 &JoinType::LeftAnti,
3565 NullEquality::NullEqualsNothing,
3566 )?;
3567
3568 let columns = columns(&join.schema());
3569 assert_eq!(columns, vec!["a1", "b1", "c1"]);
3570
3571 let stream = join.execute(0, task_ctx)?;
3572 let batches = common::collect(stream).await?;
3573
3574 allow_duplicates! {
3575 assert_snapshot!(batches_to_sort_string(&batches), @r"
3576 +----+----+----+
3577 | a1 | b1 | c1 |
3578 +----+----+----+
3579 | 1 | 1 | 10 |
3580 | 3 | 3 | 30 |
3581 | 5 | 5 | 50 |
3582 | 7 | 7 | 70 |
3583 +----+----+----+
3584 ");
3585 }
3586
3587 let metrics = join.metrics().unwrap();
3588 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3589
3590 Ok(())
3591 }
3592
3593 #[apply(hash_join_exec_configs)]
3594 #[tokio::test]
3595 async fn join_left_anti_with_filter(
3596 batch_size: usize,
3597 use_perfect_hash_join_as_possible: bool,
3598 ) -> Result<()> {
3599 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3600 let left = build_semi_anti_left_table();
3601 let right = build_semi_anti_right_table();
3602 let on = vec![(
3604 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3605 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3606 )];
3607
3608 let column_indices = vec![ColumnIndex {
3609 index: 0,
3610 side: JoinSide::Right,
3611 }];
3612 let intermediate_schema =
3613 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3614 let filter_expression = Arc::new(BinaryExpr::new(
3615 Arc::new(Column::new("x", 0)),
3616 Operator::NotEq,
3617 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3618 )) as Arc<dyn PhysicalExpr>;
3619
3620 let filter = JoinFilter::new(
3621 filter_expression,
3622 column_indices.clone(),
3623 Arc::new(intermediate_schema.clone()),
3624 );
3625
3626 let join = join_with_filter(
3627 Arc::clone(&left),
3628 Arc::clone(&right),
3629 on.clone(),
3630 filter,
3631 &JoinType::LeftAnti,
3632 NullEquality::NullEqualsNothing,
3633 )?;
3634
3635 let columns_header = columns(&join.schema());
3636 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3637
3638 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3639 let batches = common::collect(stream).await?;
3640
3641 allow_duplicates! {
3642 assert_snapshot!(batches_to_sort_string(&batches), @r"
3643 +----+----+-----+
3644 | a1 | b1 | c1 |
3645 +----+----+-----+
3646 | 1 | 1 | 10 |
3647 | 11 | 8 | 110 |
3648 | 3 | 3 | 30 |
3649 | 5 | 5 | 50 |
3650 | 7 | 7 | 70 |
3651 | 9 | 8 | 90 |
3652 +----+----+-----+
3653 ");
3654 }
3655
3656 let metrics = join.metrics().unwrap();
3657 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3658
3659 let filter_expression = Arc::new(BinaryExpr::new(
3661 Arc::new(Column::new("x", 0)),
3662 Operator::NotEq,
3663 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3664 )) as Arc<dyn PhysicalExpr>;
3665
3666 let filter = JoinFilter::new(
3667 filter_expression,
3668 column_indices,
3669 Arc::new(intermediate_schema),
3670 );
3671
3672 let join = join_with_filter(
3673 left,
3674 right,
3675 on,
3676 filter,
3677 &JoinType::LeftAnti,
3678 NullEquality::NullEqualsNothing,
3679 )?;
3680
3681 let columns_header = columns(&join.schema());
3682 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3683
3684 let stream = join.execute(0, task_ctx)?;
3685 let batches = common::collect(stream).await?;
3686
3687 allow_duplicates! {
3688 assert_snapshot!(batches_to_sort_string(&batches), @r"
3689 +----+----+-----+
3690 | a1 | b1 | c1 |
3691 +----+----+-----+
3692 | 1 | 1 | 10 |
3693 | 11 | 8 | 110 |
3694 | 3 | 3 | 30 |
3695 | 5 | 5 | 50 |
3696 | 7 | 7 | 70 |
3697 | 9 | 8 | 90 |
3698 +----+----+-----+
3699 ");
3700 }
3701
3702 let metrics = join.metrics().unwrap();
3703 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3704
3705 Ok(())
3706 }
3707
3708 #[apply(hash_join_exec_configs)]
3709 #[tokio::test]
3710 async fn join_right_anti(
3711 batch_size: usize,
3712 use_perfect_hash_join_as_possible: bool,
3713 ) -> Result<()> {
3714 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3715 let left = build_semi_anti_left_table();
3716 let right = build_semi_anti_right_table();
3717 let on = vec![(
3718 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3719 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3720 )];
3721
3722 let join = join(
3723 left,
3724 right,
3725 on,
3726 &JoinType::RightAnti,
3727 NullEquality::NullEqualsNothing,
3728 )?;
3729
3730 let columns = columns(&join.schema());
3731 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3732
3733 let stream = join.execute(0, task_ctx)?;
3734 let batches = common::collect(stream).await?;
3735
3736 allow_duplicates! {
3738 assert_snapshot!(batches_to_string(&batches), @r"
3739 +----+----+-----+
3740 | a2 | b2 | c2 |
3741 +----+----+-----+
3742 | 6 | 6 | 60 |
3743 | 2 | 2 | 80 |
3744 | 4 | 4 | 120 |
3745 +----+----+-----+
3746 ");
3747 }
3748
3749 let metrics = join.metrics().unwrap();
3750 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3751
3752 Ok(())
3753 }
3754
3755 #[apply(hash_join_exec_configs)]
3756 #[tokio::test]
3757 async fn join_right_anti_with_filter(
3758 batch_size: usize,
3759 use_perfect_hash_join_as_possible: bool,
3760 ) -> Result<()> {
3761 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3762 let left = build_semi_anti_left_table();
3763 let right = build_semi_anti_right_table();
3764 let on = vec![(
3766 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3767 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3768 )];
3769
3770 let column_indices = vec![ColumnIndex {
3771 index: 0,
3772 side: JoinSide::Left,
3773 }];
3774 let intermediate_schema =
3775 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3776
3777 let filter_expression = Arc::new(BinaryExpr::new(
3778 Arc::new(Column::new("x", 0)),
3779 Operator::NotEq,
3780 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3781 )) as Arc<dyn PhysicalExpr>;
3782
3783 let filter = JoinFilter::new(
3784 filter_expression,
3785 column_indices,
3786 Arc::new(intermediate_schema.clone()),
3787 );
3788
3789 let join = join_with_filter(
3790 Arc::clone(&left),
3791 Arc::clone(&right),
3792 on.clone(),
3793 filter,
3794 &JoinType::RightAnti,
3795 NullEquality::NullEqualsNothing,
3796 )?;
3797
3798 let columns_header = columns(&join.schema());
3799 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3800
3801 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3802 let batches = common::collect(stream).await?;
3803
3804 allow_duplicates! {
3806 assert_snapshot!(batches_to_string(&batches), @r"
3807 +----+----+-----+
3808 | a2 | b2 | c2 |
3809 +----+----+-----+
3810 | 12 | 10 | 40 |
3811 | 6 | 6 | 60 |
3812 | 2 | 2 | 80 |
3813 | 10 | 10 | 100 |
3814 | 4 | 4 | 120 |
3815 +----+----+-----+
3816 ");
3817 }
3818
3819 let metrics = join.metrics().unwrap();
3820 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3821
3822 let column_indices = vec![ColumnIndex {
3824 index: 1,
3825 side: JoinSide::Right,
3826 }];
3827 let filter_expression = Arc::new(BinaryExpr::new(
3828 Arc::new(Column::new("x", 0)),
3829 Operator::NotEq,
3830 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3831 )) as Arc<dyn PhysicalExpr>;
3832
3833 let filter = JoinFilter::new(
3834 filter_expression,
3835 column_indices,
3836 Arc::new(intermediate_schema),
3837 );
3838
3839 let join = join_with_filter(
3840 left,
3841 right,
3842 on,
3843 filter,
3844 &JoinType::RightAnti,
3845 NullEquality::NullEqualsNothing,
3846 )?;
3847
3848 let columns_header = columns(&join.schema());
3849 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3850
3851 let stream = join.execute(0, task_ctx)?;
3852 let batches = common::collect(stream).await?;
3853
3854 allow_duplicates! {
3856 assert_snapshot!(batches_to_string(&batches), @r"
3857 +----+----+-----+
3858 | a2 | b2 | c2 |
3859 +----+----+-----+
3860 | 8 | 8 | 20 |
3861 | 6 | 6 | 60 |
3862 | 2 | 2 | 80 |
3863 | 4 | 4 | 120 |
3864 +----+----+-----+
3865 ");
3866 }
3867
3868 let metrics = join.metrics().unwrap();
3869 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3870
3871 Ok(())
3872 }
3873
3874 #[apply(hash_join_exec_configs)]
3875 #[tokio::test]
3876 async fn join_right_one(
3877 batch_size: usize,
3878 use_perfect_hash_join_as_possible: bool,
3879 ) -> Result<()> {
3880 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3881 let left = build_table(
3882 ("a1", &vec![1, 2, 3]),
3883 ("b1", &vec![4, 5, 7]),
3884 ("c1", &vec![7, 8, 9]),
3885 );
3886 let right = build_table(
3887 ("a2", &vec![10, 20, 30]),
3888 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3890 );
3891 let on = vec![(
3892 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3893 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3894 )];
3895
3896 let (columns, batches, metrics) = join_collect(
3897 left,
3898 right,
3899 on,
3900 &JoinType::Right,
3901 NullEquality::NullEqualsNothing,
3902 task_ctx,
3903 )
3904 .await?;
3905
3906 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3907
3908 allow_duplicates! {
3909 assert_snapshot!(batches_to_sort_string(&batches), @r"
3910 +----+----+----+----+----+----+
3911 | a1 | b1 | c1 | a2 | b1 | c2 |
3912 +----+----+----+----+----+----+
3913 | | | | 30 | 6 | 90 |
3914 | 1 | 4 | 7 | 10 | 4 | 70 |
3915 | 2 | 5 | 8 | 20 | 5 | 80 |
3916 +----+----+----+----+----+----+
3917 ");
3918 }
3919
3920 assert_join_metrics!(metrics, 3);
3921 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3922
3923 Ok(())
3924 }
3925
3926 #[apply(hash_join_exec_configs)]
3927 #[tokio::test]
3928 async fn partitioned_join_right_one(
3929 batch_size: usize,
3930 use_perfect_hash_join_as_possible: bool,
3931 ) -> Result<()> {
3932 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3933 let left = build_table(
3934 ("a1", &vec![1, 2, 3]),
3935 ("b1", &vec![4, 5, 7]),
3936 ("c1", &vec![7, 8, 9]),
3937 );
3938 let right = build_table(
3939 ("a2", &vec![10, 20, 30]),
3940 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3942 );
3943 let on = vec![(
3944 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3945 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3946 )];
3947
3948 let (columns, batches, metrics) = partitioned_join_collect(
3949 left,
3950 right,
3951 on,
3952 &JoinType::Right,
3953 NullEquality::NullEqualsNothing,
3954 task_ctx,
3955 )
3956 .await?;
3957
3958 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3959
3960 allow_duplicates! {
3961 assert_snapshot!(batches_to_sort_string(&batches), @r"
3962 +----+----+----+----+----+----+
3963 | a1 | b1 | c1 | a2 | b1 | c2 |
3964 +----+----+----+----+----+----+
3965 | | | | 30 | 6 | 90 |
3966 | 1 | 4 | 7 | 10 | 4 | 70 |
3967 | 2 | 5 | 8 | 20 | 5 | 80 |
3968 +----+----+----+----+----+----+
3969 ");
3970 }
3971
3972 assert_join_metrics!(metrics, 3);
3973 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3974
3975 Ok(())
3976 }
3977
3978 #[apply(hash_join_exec_configs)]
3979 #[tokio::test]
3980 async fn join_full_one(
3981 batch_size: usize,
3982 use_perfect_hash_join_as_possible: bool,
3983 ) -> Result<()> {
3984 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3985 let left = build_table(
3986 ("a1", &vec![1, 2, 3]),
3987 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3989 );
3990 let right = build_table(
3991 ("a2", &vec![10, 20, 30]),
3992 ("b2", &vec![4, 5, 6]),
3993 ("c2", &vec![70, 80, 90]),
3994 );
3995 let on = vec![(
3996 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3997 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3998 )];
3999
4000 let join = join(
4001 left,
4002 right,
4003 on,
4004 &JoinType::Full,
4005 NullEquality::NullEqualsNothing,
4006 )?;
4007
4008 let columns = columns(&join.schema());
4009 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
4010
4011 let stream = join.execute(0, task_ctx)?;
4012 let batches = common::collect(stream).await?;
4013
4014 allow_duplicates! {
4015 assert_snapshot!(batches_to_sort_string(&batches), @r"
4016 +----+----+----+----+----+----+
4017 | a1 | b1 | c1 | a2 | b2 | c2 |
4018 +----+----+----+----+----+----+
4019 | | | | 30 | 6 | 90 |
4020 | 1 | 4 | 7 | 10 | 4 | 70 |
4021 | 2 | 5 | 8 | 20 | 5 | 80 |
4022 | 3 | 7 | 9 | | | |
4023 +----+----+----+----+----+----+
4024 ");
4025 }
4026
4027 let metrics = join.metrics().unwrap();
4028 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4029
4030 Ok(())
4031 }
4032
4033 #[apply(hash_join_exec_configs)]
4034 #[tokio::test]
4035 async fn join_left_mark(
4036 batch_size: usize,
4037 use_perfect_hash_join_as_possible: bool,
4038 ) -> Result<()> {
4039 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4040 let left = build_table(
4041 ("a1", &vec![1, 2, 3]),
4042 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4044 );
4045 let right = build_table(
4046 ("a2", &vec![10, 20, 30]),
4047 ("b1", &vec![4, 5, 6]),
4048 ("c2", &vec![70, 80, 90]),
4049 );
4050 let on = vec![(
4051 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4052 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4053 )];
4054
4055 let (columns, batches, metrics) = join_collect(
4056 Arc::clone(&left),
4057 Arc::clone(&right),
4058 on.clone(),
4059 &JoinType::LeftMark,
4060 NullEquality::NullEqualsNothing,
4061 task_ctx,
4062 )
4063 .await?;
4064
4065 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4066
4067 allow_duplicates! {
4068 assert_snapshot!(batches_to_sort_string(&batches), @r"
4069 +----+----+----+-------+
4070 | a1 | b1 | c1 | mark |
4071 +----+----+----+-------+
4072 | 1 | 4 | 7 | true |
4073 | 2 | 5 | 8 | true |
4074 | 3 | 7 | 9 | false |
4075 +----+----+----+-------+
4076 ");
4077 }
4078
4079 assert_join_metrics!(metrics, 3);
4080 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4081
4082 Ok(())
4083 }
4084
4085 #[apply(hash_join_exec_configs)]
4086 #[tokio::test]
4087 async fn partitioned_join_left_mark(
4088 batch_size: usize,
4089 use_perfect_hash_join_as_possible: bool,
4090 ) -> Result<()> {
4091 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4092 let left = build_table(
4093 ("a1", &vec![1, 2, 3]),
4094 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4096 );
4097 let right = build_table(
4098 ("a2", &vec![10, 20, 30, 40]),
4099 ("b1", &vec![4, 4, 5, 6]),
4100 ("c2", &vec![60, 70, 80, 90]),
4101 );
4102 let on = vec![(
4103 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4104 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4105 )];
4106
4107 let (columns, batches, metrics) = partitioned_join_collect(
4108 Arc::clone(&left),
4109 Arc::clone(&right),
4110 on.clone(),
4111 &JoinType::LeftMark,
4112 NullEquality::NullEqualsNothing,
4113 task_ctx,
4114 )
4115 .await?;
4116
4117 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4118
4119 allow_duplicates! {
4120 assert_snapshot!(batches_to_sort_string(&batches), @r"
4121 +----+----+----+-------+
4122 | a1 | b1 | c1 | mark |
4123 +----+----+----+-------+
4124 | 1 | 4 | 7 | true |
4125 | 2 | 5 | 8 | true |
4126 | 3 | 7 | 9 | false |
4127 +----+----+----+-------+
4128 ");
4129 }
4130
4131 assert_join_metrics!(metrics, 3);
4132 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4133
4134 Ok(())
4135 }
4136
4137 #[apply(hash_join_exec_configs)]
4138 #[tokio::test]
4139 async fn join_right_mark(
4140 batch_size: usize,
4141 use_perfect_hash_join_as_possible: bool,
4142 ) -> Result<()> {
4143 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4144 let left = build_table(
4145 ("a1", &vec![1, 2, 3]),
4146 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4148 );
4149 let right = build_table(
4150 ("a2", &vec![10, 20, 30]),
4151 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
4153 );
4154 let on = vec![(
4155 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4156 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4157 )];
4158
4159 let (columns, batches, metrics) = join_collect(
4160 Arc::clone(&left),
4161 Arc::clone(&right),
4162 on.clone(),
4163 &JoinType::RightMark,
4164 NullEquality::NullEqualsNothing,
4165 task_ctx,
4166 )
4167 .await?;
4168
4169 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4170
4171 let expected = [
4172 "+----+----+----+-------+",
4173 "| a2 | b1 | c2 | mark |",
4174 "+----+----+----+-------+",
4175 "| 10 | 4 | 70 | true |",
4176 "| 20 | 5 | 80 | true |",
4177 "| 30 | 6 | 90 | false |",
4178 "+----+----+----+-------+",
4179 ];
4180 assert_batches_sorted_eq!(expected, &batches);
4181
4182 assert_join_metrics!(metrics, 3);
4183 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4184
4185 Ok(())
4186 }
4187
4188 #[apply(hash_join_exec_configs)]
4189 #[tokio::test]
4190 async fn partitioned_join_right_mark(
4191 batch_size: usize,
4192 use_perfect_hash_join_as_possible: bool,
4193 ) -> Result<()> {
4194 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4195 let left = build_table(
4196 ("a1", &vec![1, 2, 3]),
4197 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4199 );
4200 let right = build_table(
4201 ("a2", &vec![10, 20, 30, 40]),
4202 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
4204 );
4205 let on = vec![(
4206 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4207 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4208 )];
4209
4210 let (columns, batches, metrics) = partitioned_join_collect(
4211 Arc::clone(&left),
4212 Arc::clone(&right),
4213 on.clone(),
4214 &JoinType::RightMark,
4215 NullEquality::NullEqualsNothing,
4216 task_ctx,
4217 )
4218 .await?;
4219
4220 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4221
4222 let expected = [
4223 "+----+----+----+-------+",
4224 "| a2 | b1 | c2 | mark |",
4225 "+----+----+----+-------+",
4226 "| 10 | 4 | 60 | true |",
4227 "| 20 | 4 | 70 | true |",
4228 "| 30 | 5 | 80 | true |",
4229 "| 40 | 6 | 90 | false |",
4230 "+----+----+----+-------+",
4231 ];
4232 assert_batches_sorted_eq!(expected, &batches);
4233
4234 assert_join_metrics!(metrics, 4);
4235 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4236
4237 Ok(())
4238 }
4239
4240 #[test]
4241 fn join_with_hash_collisions_64() -> Result<()> {
4242 let mut hashmap_left = HashTable::with_capacity(4);
4243 let left = build_table_i32(
4244 ("a", &vec![10, 20]),
4245 ("x", &vec![100, 200]),
4246 ("y", &vec![200, 300]),
4247 );
4248
4249 let random_state = RandomState::with_seeds(0, 0, 0, 0);
4250 let hashes_buff = &mut vec![0; left.num_rows()];
4251 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4252
4253 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
4258 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
4259
4260 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
4261 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
4262
4263 let next = vec![2, 0];
4264
4265 let right = build_table_i32(
4266 ("a", &vec![10, 20]),
4267 ("b", &vec![0, 0]),
4268 ("c", &vec![30, 40]),
4269 );
4270
4271 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4273
4274 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
4275
4276 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4277 let right_keys_values =
4278 key_column.evaluate(&right)?.into_array(right.num_rows())?;
4279 let mut hashes_buffer = vec![0; right.num_rows()];
4280 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4281
4282 let mut probe_indices_buffer = Vec::new();
4283 let mut build_indices_buffer = Vec::new();
4284 let (l, r, _) = lookup_join_hashmap(
4285 &join_hash_map,
4286 &[left_keys_values],
4287 &[right_keys_values],
4288 NullEquality::NullEqualsNothing,
4289 &hashes_buffer,
4290 8192,
4291 (0, None),
4292 &mut probe_indices_buffer,
4293 &mut build_indices_buffer,
4294 )?;
4295
4296 let left_ids: UInt64Array = vec![0, 1].into();
4297
4298 let right_ids: UInt32Array = vec![0, 1].into();
4299
4300 assert_eq!(left_ids, l);
4301
4302 assert_eq!(right_ids, r);
4303
4304 Ok(())
4305 }
4306
4307 #[test]
4308 fn join_with_hash_collisions_u32() -> Result<()> {
4309 let mut hashmap_left = HashTable::with_capacity(4);
4310 let left = build_table_i32(
4311 ("a", &vec![10, 20]),
4312 ("x", &vec![100, 200]),
4313 ("y", &vec![200, 300]),
4314 );
4315
4316 let random_state = RandomState::with_seeds(0, 0, 0, 0);
4317 let hashes_buff = &mut vec![0; left.num_rows()];
4318 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4319
4320 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
4321 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
4322 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
4323 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
4324
4325 let next: Vec<u32> = vec![2, 0];
4326
4327 let right = build_table_i32(
4328 ("a", &vec![10, 20]),
4329 ("b", &vec![0, 0]),
4330 ("c", &vec![30, 40]),
4331 );
4332
4333 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4334
4335 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
4336
4337 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4338 let right_keys_values =
4339 key_column.evaluate(&right)?.into_array(right.num_rows())?;
4340 let mut hashes_buffer = vec![0; right.num_rows()];
4341 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4342
4343 let mut probe_indices_buffer = Vec::new();
4344 let mut build_indices_buffer = Vec::new();
4345 let (l, r, _) = lookup_join_hashmap(
4346 &join_hash_map,
4347 &[left_keys_values],
4348 &[right_keys_values],
4349 NullEquality::NullEqualsNothing,
4350 &hashes_buffer,
4351 8192,
4352 (0, None),
4353 &mut probe_indices_buffer,
4354 &mut build_indices_buffer,
4355 )?;
4356
4357 let left_ids: UInt64Array = vec![0, 1].into();
4359 let right_ids: UInt32Array = vec![0, 1].into();
4360
4361 assert_eq!(left_ids, l);
4362 assert_eq!(right_ids, r);
4363
4364 Ok(())
4365 }
4366
4367 #[tokio::test]
4368 async fn join_with_duplicated_column_names() -> Result<()> {
4369 let task_ctx = Arc::new(TaskContext::default());
4370 let left = build_table(
4371 ("a", &vec![1, 2, 3]),
4372 ("b", &vec![4, 5, 7]),
4373 ("c", &vec![7, 8, 9]),
4374 );
4375 let right = build_table(
4376 ("a", &vec![10, 20, 30]),
4377 ("b", &vec![1, 2, 7]),
4378 ("c", &vec![70, 80, 90]),
4379 );
4380 let on = vec![(
4381 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4383 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4384 )];
4385
4386 let join = join(
4387 left,
4388 right,
4389 on,
4390 &JoinType::Inner,
4391 NullEquality::NullEqualsNothing,
4392 )?;
4393
4394 let columns = columns(&join.schema());
4395 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4396
4397 let stream = join.execute(0, task_ctx)?;
4398 let batches = common::collect(stream).await?;
4399
4400 allow_duplicates! {
4401 assert_snapshot!(batches_to_sort_string(&batches), @r"
4402 +---+---+---+----+---+----+
4403 | a | b | c | a | b | c |
4404 +---+---+---+----+---+----+
4405 | 1 | 4 | 7 | 10 | 1 | 70 |
4406 | 2 | 5 | 8 | 20 | 2 | 80 |
4407 +---+---+---+----+---+----+
4408 ");
4409 }
4410
4411 Ok(())
4412 }
4413
4414 fn prepare_join_filter() -> JoinFilter {
4415 let column_indices = vec![
4416 ColumnIndex {
4417 index: 2,
4418 side: JoinSide::Left,
4419 },
4420 ColumnIndex {
4421 index: 2,
4422 side: JoinSide::Right,
4423 },
4424 ];
4425 let intermediate_schema = Schema::new(vec![
4426 Field::new("c", DataType::Int32, true),
4427 Field::new("c", DataType::Int32, true),
4428 ]);
4429 let filter_expression = Arc::new(BinaryExpr::new(
4430 Arc::new(Column::new("c", 0)),
4431 Operator::Gt,
4432 Arc::new(Column::new("c", 1)),
4433 )) as Arc<dyn PhysicalExpr>;
4434
4435 JoinFilter::new(
4436 filter_expression,
4437 column_indices,
4438 Arc::new(intermediate_schema),
4439 )
4440 }
4441
4442 #[apply(hash_join_exec_configs)]
4443 #[tokio::test]
4444 async fn join_inner_with_filter(
4445 batch_size: usize,
4446 use_perfect_hash_join_as_possible: bool,
4447 ) -> Result<()> {
4448 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4449 let left = build_table(
4450 ("a", &vec![0, 1, 2, 2]),
4451 ("b", &vec![4, 5, 7, 8]),
4452 ("c", &vec![7, 8, 9, 1]),
4453 );
4454 let right = build_table(
4455 ("a", &vec![10, 20, 30, 40]),
4456 ("b", &vec![2, 2, 3, 4]),
4457 ("c", &vec![7, 5, 6, 4]),
4458 );
4459 let on = vec![(
4460 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4461 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4462 )];
4463 let filter = prepare_join_filter();
4464
4465 let join = join_with_filter(
4466 left,
4467 right,
4468 on,
4469 filter,
4470 &JoinType::Inner,
4471 NullEquality::NullEqualsNothing,
4472 )?;
4473
4474 let columns = columns(&join.schema());
4475 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4476
4477 let stream = join.execute(0, task_ctx)?;
4478 let batches = common::collect(stream).await?;
4479
4480 allow_duplicates! {
4481 assert_snapshot!(batches_to_sort_string(&batches), @r"
4482 +---+---+---+----+---+---+
4483 | a | b | c | a | b | c |
4484 +---+---+---+----+---+---+
4485 | 2 | 7 | 9 | 10 | 2 | 7 |
4486 | 2 | 7 | 9 | 20 | 2 | 5 |
4487 +---+---+---+----+---+---+
4488 ");
4489 }
4490
4491 let metrics = join.metrics().unwrap();
4492 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4493
4494 Ok(())
4495 }
4496
4497 #[apply(hash_join_exec_configs)]
4498 #[tokio::test]
4499 async fn join_left_with_filter(
4500 batch_size: usize,
4501 use_perfect_hash_join_as_possible: bool,
4502 ) -> Result<()> {
4503 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4504 let left = build_table(
4505 ("a", &vec![0, 1, 2, 2]),
4506 ("b", &vec![4, 5, 7, 8]),
4507 ("c", &vec![7, 8, 9, 1]),
4508 );
4509 let right = build_table(
4510 ("a", &vec![10, 20, 30, 40]),
4511 ("b", &vec![2, 2, 3, 4]),
4512 ("c", &vec![7, 5, 6, 4]),
4513 );
4514 let on = vec![(
4515 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4516 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4517 )];
4518 let filter = prepare_join_filter();
4519
4520 let join = join_with_filter(
4521 left,
4522 right,
4523 on,
4524 filter,
4525 &JoinType::Left,
4526 NullEquality::NullEqualsNothing,
4527 )?;
4528
4529 let columns = columns(&join.schema());
4530 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4531
4532 let stream = join.execute(0, task_ctx)?;
4533 let batches = common::collect(stream).await?;
4534
4535 allow_duplicates! {
4536 assert_snapshot!(batches_to_sort_string(&batches), @r"
4537 +---+---+---+----+---+---+
4538 | a | b | c | a | b | c |
4539 +---+---+---+----+---+---+
4540 | 0 | 4 | 7 | | | |
4541 | 1 | 5 | 8 | | | |
4542 | 2 | 7 | 9 | 10 | 2 | 7 |
4543 | 2 | 7 | 9 | 20 | 2 | 5 |
4544 | 2 | 8 | 1 | | | |
4545 +---+---+---+----+---+---+
4546 ");
4547 }
4548
4549 let metrics = join.metrics().unwrap();
4550 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4551
4552 Ok(())
4553 }
4554
4555 #[apply(hash_join_exec_configs)]
4556 #[tokio::test]
4557 async fn join_right_with_filter(
4558 batch_size: usize,
4559 use_perfect_hash_join_as_possible: bool,
4560 ) -> Result<()> {
4561 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4562 let left = build_table(
4563 ("a", &vec![0, 1, 2, 2]),
4564 ("b", &vec![4, 5, 7, 8]),
4565 ("c", &vec![7, 8, 9, 1]),
4566 );
4567 let right = build_table(
4568 ("a", &vec![10, 20, 30, 40]),
4569 ("b", &vec![2, 2, 3, 4]),
4570 ("c", &vec![7, 5, 6, 4]),
4571 );
4572 let on = vec![(
4573 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4574 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4575 )];
4576 let filter = prepare_join_filter();
4577
4578 let join = join_with_filter(
4579 left,
4580 right,
4581 on,
4582 filter,
4583 &JoinType::Right,
4584 NullEquality::NullEqualsNothing,
4585 )?;
4586
4587 let columns = columns(&join.schema());
4588 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4589
4590 let stream = join.execute(0, task_ctx)?;
4591 let batches = common::collect(stream).await?;
4592
4593 allow_duplicates! {
4594 assert_snapshot!(batches_to_sort_string(&batches), @r"
4595 +---+---+---+----+---+---+
4596 | a | b | c | a | b | c |
4597 +---+---+---+----+---+---+
4598 | | | | 30 | 3 | 6 |
4599 | | | | 40 | 4 | 4 |
4600 | 2 | 7 | 9 | 10 | 2 | 7 |
4601 | 2 | 7 | 9 | 20 | 2 | 5 |
4602 +---+---+---+----+---+---+
4603 ");
4604 }
4605
4606 let metrics = join.metrics().unwrap();
4607 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4608
4609 Ok(())
4610 }
4611
4612 #[apply(hash_join_exec_configs)]
4613 #[tokio::test]
4614 async fn join_full_with_filter(
4615 batch_size: usize,
4616 use_perfect_hash_join_as_possible: bool,
4617 ) -> Result<()> {
4618 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4619 let left = build_table(
4620 ("a", &vec![0, 1, 2, 2]),
4621 ("b", &vec![4, 5, 7, 8]),
4622 ("c", &vec![7, 8, 9, 1]),
4623 );
4624 let right = build_table(
4625 ("a", &vec![10, 20, 30, 40]),
4626 ("b", &vec![2, 2, 3, 4]),
4627 ("c", &vec![7, 5, 6, 4]),
4628 );
4629 let on = vec![(
4630 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4631 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4632 )];
4633 let filter = prepare_join_filter();
4634
4635 let join = join_with_filter(
4636 left,
4637 right,
4638 on,
4639 filter,
4640 &JoinType::Full,
4641 NullEquality::NullEqualsNothing,
4642 )?;
4643
4644 let columns = columns(&join.schema());
4645 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4646
4647 let stream = join.execute(0, task_ctx)?;
4648 let batches = common::collect(stream).await?;
4649
4650 let expected = [
4651 "+---+---+---+----+---+---+",
4652 "| a | b | c | a | b | c |",
4653 "+---+---+---+----+---+---+",
4654 "| | | | 30 | 3 | 6 |",
4655 "| | | | 40 | 4 | 4 |",
4656 "| 2 | 7 | 9 | 10 | 2 | 7 |",
4657 "| 2 | 7 | 9 | 20 | 2 | 5 |",
4658 "| 0 | 4 | 7 | | | |",
4659 "| 1 | 5 | 8 | | | |",
4660 "| 2 | 8 | 1 | | | |",
4661 "+---+---+---+----+---+---+",
4662 ];
4663 assert_batches_sorted_eq!(expected, &batches);
4664
4665 let metrics = join.metrics().unwrap();
4666 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4667
4668 Ok(())
4686 }
4687
4688 #[tokio::test]
4690 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
4691 let task_ctx = Arc::new(TaskContext::default());
4692 let left = build_table(
4693 ("a1", &vec![1, 2, 3]),
4694 ("b1", &vec![4, 5, 7]),
4695 ("c1", &vec![7, 8, 9]),
4696 );
4697 let right = build_table(
4698 ("a2", &vec![10, 20, 30]),
4699 ("b2", &vec![4, 5, 6]),
4700 ("c2", &vec![70, 80, 90]),
4701 );
4702 let on = vec![(
4703 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4704 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4705 )];
4706
4707 let expected_inner = vec![
4708 "+----+----+----+----+----+----+",
4709 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4710 "+----+----+----+----+----+----+",
4711 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4712 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4713 "+----+----+----+----+----+----+",
4714 ];
4715 let expected_left = vec![
4716 "+----+----+----+----+----+----+",
4717 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4718 "+----+----+----+----+----+----+",
4719 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4720 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4721 "| 3 | 7 | 9 | | | |",
4722 "+----+----+----+----+----+----+",
4723 ];
4724 let expected_right = vec![
4725 "+----+----+----+----+----+----+",
4726 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4727 "+----+----+----+----+----+----+",
4728 "| | | | 30 | 6 | 90 |",
4729 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4730 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4731 "+----+----+----+----+----+----+",
4732 ];
4733 let expected_full = vec![
4734 "+----+----+----+----+----+----+",
4735 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4736 "+----+----+----+----+----+----+",
4737 "| | | | 30 | 6 | 90 |",
4738 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4739 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4740 "| 3 | 7 | 9 | | | |",
4741 "+----+----+----+----+----+----+",
4742 ];
4743 let expected_left_semi = vec![
4744 "+----+----+----+",
4745 "| a1 | b1 | c1 |",
4746 "+----+----+----+",
4747 "| 1 | 4 | 7 |",
4748 "| 2 | 5 | 8 |",
4749 "+----+----+----+",
4750 ];
4751 let expected_left_anti = vec![
4752 "+----+----+----+",
4753 "| a1 | b1 | c1 |",
4754 "+----+----+----+",
4755 "| 3 | 7 | 9 |",
4756 "+----+----+----+",
4757 ];
4758 let expected_right_semi = vec![
4759 "+----+----+----+",
4760 "| a2 | b2 | c2 |",
4761 "+----+----+----+",
4762 "| 10 | 4 | 70 |",
4763 "| 20 | 5 | 80 |",
4764 "+----+----+----+",
4765 ];
4766 let expected_right_anti = vec![
4767 "+----+----+----+",
4768 "| a2 | b2 | c2 |",
4769 "+----+----+----+",
4770 "| 30 | 6 | 90 |",
4771 "+----+----+----+",
4772 ];
4773 let expected_left_mark = vec![
4774 "+----+----+----+-------+",
4775 "| a1 | b1 | c1 | mark |",
4776 "+----+----+----+-------+",
4777 "| 1 | 4 | 7 | true |",
4778 "| 2 | 5 | 8 | true |",
4779 "| 3 | 7 | 9 | false |",
4780 "+----+----+----+-------+",
4781 ];
4782 let expected_right_mark = vec![
4783 "+----+----+----+-------+",
4784 "| a2 | b2 | c2 | mark |",
4785 "+----+----+----+-------+",
4786 "| 10 | 4 | 70 | true |",
4787 "| 20 | 5 | 80 | true |",
4788 "| 30 | 6 | 90 | false |",
4789 "+----+----+----+-------+",
4790 ];
4791
4792 let test_cases = vec![
4793 (JoinType::Inner, expected_inner),
4794 (JoinType::Left, expected_left),
4795 (JoinType::Right, expected_right),
4796 (JoinType::Full, expected_full),
4797 (JoinType::LeftSemi, expected_left_semi),
4798 (JoinType::LeftAnti, expected_left_anti),
4799 (JoinType::RightSemi, expected_right_semi),
4800 (JoinType::RightAnti, expected_right_anti),
4801 (JoinType::LeftMark, expected_left_mark),
4802 (JoinType::RightMark, expected_right_mark),
4803 ];
4804
4805 for (join_type, expected) in test_cases {
4806 let (_, batches, metrics) = join_collect_with_partition_mode(
4807 Arc::clone(&left),
4808 Arc::clone(&right),
4809 on.clone(),
4810 &join_type,
4811 PartitionMode::CollectLeft,
4812 NullEquality::NullEqualsNothing,
4813 Arc::clone(&task_ctx),
4814 )
4815 .await?;
4816 assert_batches_sorted_eq!(expected, &batches);
4817 assert_join_metrics!(metrics, expected.len() - 4);
4818 }
4819
4820 Ok(())
4821 }
4822
4823 #[tokio::test]
4824 async fn join_date32() -> Result<()> {
4825 let schema = Arc::new(Schema::new(vec![
4826 Field::new("date", DataType::Date32, false),
4827 Field::new("n", DataType::Int32, false),
4828 ]));
4829
4830 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4831 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4832 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4833 let left =
4834 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4835 .unwrap();
4836 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4837 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4838 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4839 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4840 let on = vec![(
4841 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4842 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4843 )];
4844
4845 let join = join(
4846 left,
4847 right,
4848 on,
4849 &JoinType::Inner,
4850 NullEquality::NullEqualsNothing,
4851 )?;
4852
4853 let task_ctx = Arc::new(TaskContext::default());
4854 let stream = join.execute(0, task_ctx)?;
4855 let batches = common::collect(stream).await?;
4856
4857 allow_duplicates! {
4858 assert_snapshot!(batches_to_sort_string(&batches), @r"
4859 +------------+---+------------+---+
4860 | date | n | date | n |
4861 +------------+---+------------+---+
4862 | 2022-04-26 | 2 | 2022-04-26 | 4 |
4863 | 2022-04-26 | 2 | 2022-04-26 | 5 |
4864 | 2022-04-27 | 3 | 2022-04-27 | 6 |
4865 +------------+---+------------+---+
4866 ");
4867 }
4868
4869 Ok(())
4870 }
4871
4872 #[tokio::test]
4873 async fn join_with_error_right() {
4874 let left = build_table(
4875 ("a1", &vec![1, 2, 3]),
4876 ("b1", &vec![4, 5, 7]),
4877 ("c1", &vec![7, 8, 9]),
4878 );
4879
4880 let err = exec_err!("bad data error");
4883 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4884
4885 let on = vec![(
4886 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4887 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4888 )];
4889 let schema = right.schema();
4890 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4891 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4892
4893 let join_types = vec![
4894 JoinType::Inner,
4895 JoinType::Left,
4896 JoinType::Right,
4897 JoinType::Full,
4898 JoinType::LeftSemi,
4899 JoinType::LeftAnti,
4900 JoinType::RightSemi,
4901 JoinType::RightAnti,
4902 ];
4903
4904 for join_type in join_types {
4905 let join = join(
4906 Arc::clone(&left),
4907 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4908 on.clone(),
4909 &join_type,
4910 NullEquality::NullEqualsNothing,
4911 )
4912 .unwrap();
4913 let task_ctx = Arc::new(TaskContext::default());
4914
4915 let stream = join.execute(0, task_ctx).unwrap();
4916
4917 let result_string = common::collect(stream).await.unwrap_err().to_string();
4919 assert!(
4920 result_string.contains("bad data error"),
4921 "actual: {result_string}"
4922 );
4923 }
4924 }
4925
4926 #[tokio::test]
4927 async fn join_split_batch() {
4928 let left = build_table(
4929 ("a1", &vec![1, 2, 3, 4]),
4930 ("b1", &vec![1, 1, 1, 1]),
4931 ("c1", &vec![0, 0, 0, 0]),
4932 );
4933 let right = build_table(
4934 ("a2", &vec![10, 20, 30, 40, 50]),
4935 ("b2", &vec![1, 1, 1, 1, 1]),
4936 ("c2", &vec![0, 0, 0, 0, 0]),
4937 );
4938 let on = vec![(
4939 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4940 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4941 )];
4942
4943 let join_types = vec![
4944 JoinType::Inner,
4945 JoinType::Left,
4946 JoinType::Right,
4947 JoinType::Full,
4948 JoinType::RightSemi,
4949 JoinType::RightAnti,
4950 JoinType::LeftSemi,
4951 JoinType::LeftAnti,
4952 ];
4953 let expected_resultset_records = 20;
4954 let common_result = [
4955 "+----+----+----+----+----+----+",
4956 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4957 "+----+----+----+----+----+----+",
4958 "| 1 | 1 | 0 | 10 | 1 | 0 |",
4959 "| 2 | 1 | 0 | 10 | 1 | 0 |",
4960 "| 3 | 1 | 0 | 10 | 1 | 0 |",
4961 "| 4 | 1 | 0 | 10 | 1 | 0 |",
4962 "| 1 | 1 | 0 | 20 | 1 | 0 |",
4963 "| 2 | 1 | 0 | 20 | 1 | 0 |",
4964 "| 3 | 1 | 0 | 20 | 1 | 0 |",
4965 "| 4 | 1 | 0 | 20 | 1 | 0 |",
4966 "| 1 | 1 | 0 | 30 | 1 | 0 |",
4967 "| 2 | 1 | 0 | 30 | 1 | 0 |",
4968 "| 3 | 1 | 0 | 30 | 1 | 0 |",
4969 "| 4 | 1 | 0 | 30 | 1 | 0 |",
4970 "| 1 | 1 | 0 | 40 | 1 | 0 |",
4971 "| 2 | 1 | 0 | 40 | 1 | 0 |",
4972 "| 3 | 1 | 0 | 40 | 1 | 0 |",
4973 "| 4 | 1 | 0 | 40 | 1 | 0 |",
4974 "| 1 | 1 | 0 | 50 | 1 | 0 |",
4975 "| 2 | 1 | 0 | 50 | 1 | 0 |",
4976 "| 3 | 1 | 0 | 50 | 1 | 0 |",
4977 "| 4 | 1 | 0 | 50 | 1 | 0 |",
4978 "+----+----+----+----+----+----+",
4979 ];
4980 let left_batch = [
4981 "+----+----+----+",
4982 "| a1 | b1 | c1 |",
4983 "+----+----+----+",
4984 "| 1 | 1 | 0 |",
4985 "| 2 | 1 | 0 |",
4986 "| 3 | 1 | 0 |",
4987 "| 4 | 1 | 0 |",
4988 "+----+----+----+",
4989 ];
4990 let right_batch = [
4991 "+----+----+----+",
4992 "| a2 | b2 | c2 |",
4993 "+----+----+----+",
4994 "| 10 | 1 | 0 |",
4995 "| 20 | 1 | 0 |",
4996 "| 30 | 1 | 0 |",
4997 "| 40 | 1 | 0 |",
4998 "| 50 | 1 | 0 |",
4999 "+----+----+----+",
5000 ];
5001 let right_empty = [
5002 "+----+----+----+",
5003 "| a2 | b2 | c2 |",
5004 "+----+----+----+",
5005 "+----+----+----+",
5006 ];
5007 let left_empty = [
5008 "+----+----+----+",
5009 "| a1 | b1 | c1 |",
5010 "+----+----+----+",
5011 "+----+----+----+",
5012 ];
5013
5014 for join_type in join_types {
5016 for batch_size in (1..21).rev() {
5017 let task_ctx = prepare_task_ctx(batch_size, true);
5018
5019 let join = join(
5020 Arc::clone(&left),
5021 Arc::clone(&right),
5022 on.clone(),
5023 &join_type,
5024 NullEquality::NullEqualsNothing,
5025 )
5026 .unwrap();
5027
5028 let stream = join.execute(0, task_ctx).unwrap();
5029 let batches = common::collect(stream).await.unwrap();
5030
5031 let expected_batch_count = match join_type {
5036 JoinType::Inner
5037 | JoinType::Right
5038 | JoinType::RightSemi
5039 | JoinType::RightAnti => {
5040 div_ceil(expected_resultset_records, batch_size)
5041 }
5042 _ => div_ceil(expected_resultset_records, batch_size) + 1,
5043 };
5044 assert!(
5046 batches.len() <= expected_batch_count,
5047 "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
5048 batches.len()
5049 );
5050
5051 let expected = match join_type {
5052 JoinType::RightSemi => right_batch.to_vec(),
5053 JoinType::RightAnti => right_empty.to_vec(),
5054 JoinType::LeftSemi => left_batch.to_vec(),
5055 JoinType::LeftAnti => left_empty.to_vec(),
5056 _ => common_result.to_vec(),
5057 };
5058 if batches.is_empty() {
5061 assert!(
5063 matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
5064 "Unexpected empty result for {join_type} join"
5065 );
5066 } else {
5067 assert_batches_eq!(expected, &batches);
5068 }
5069 }
5070 }
5071 }
5072
5073 #[tokio::test]
5074 async fn single_partition_join_overallocation() -> Result<()> {
5075 let left = build_table(
5076 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5077 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5078 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5079 );
5080 let right = build_table(
5081 ("a2", &vec![10, 11]),
5082 ("b2", &vec![12, 13]),
5083 ("c2", &vec![14, 15]),
5084 );
5085 let on = vec![(
5086 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
5087 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5088 )];
5089
5090 let join_types = vec![
5091 JoinType::Inner,
5092 JoinType::Left,
5093 JoinType::Right,
5094 JoinType::Full,
5095 JoinType::LeftSemi,
5096 JoinType::LeftAnti,
5097 JoinType::RightSemi,
5098 JoinType::RightAnti,
5099 JoinType::LeftMark,
5100 JoinType::RightMark,
5101 ];
5102
5103 for join_type in join_types {
5104 let runtime = RuntimeEnvBuilder::new()
5105 .with_memory_limit(100, 1.0)
5106 .build_arc()?;
5107 let task_ctx = TaskContext::default().with_runtime(runtime);
5108 let task_ctx = Arc::new(task_ctx);
5109
5110 let join = join(
5111 Arc::clone(&left),
5112 Arc::clone(&right),
5113 on.clone(),
5114 &join_type,
5115 NullEquality::NullEqualsNothing,
5116 )?;
5117
5118 let stream = join.execute(0, task_ctx)?;
5119 let err = common::collect(stream).await.unwrap_err();
5120
5121 assert_contains!(
5123 err.to_string(),
5124 "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
5125 );
5126
5127 assert_contains!(
5128 err.to_string(),
5129 "Failed to allocate additional 120.0 B for HashJoinInput"
5130 );
5131 }
5132
5133 Ok(())
5134 }
5135
5136 #[tokio::test]
5137 async fn partitioned_join_overallocation() -> Result<()> {
5138 let left_batch = build_table_i32(
5141 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5142 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5143 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5144 );
5145 let left = TestMemoryExec::try_new_exec(
5146 &[vec![left_batch.clone()], vec![left_batch.clone()]],
5147 left_batch.schema(),
5148 None,
5149 )
5150 .unwrap();
5151 let right_batch = build_table_i32(
5152 ("a2", &vec![10, 11]),
5153 ("b2", &vec![12, 13]),
5154 ("c2", &vec![14, 15]),
5155 );
5156 let right = TestMemoryExec::try_new_exec(
5157 &[vec![right_batch.clone()], vec![right_batch.clone()]],
5158 right_batch.schema(),
5159 None,
5160 )
5161 .unwrap();
5162 let on = vec![(
5163 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
5164 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
5165 )];
5166
5167 let join_types = vec![
5168 JoinType::Inner,
5169 JoinType::Left,
5170 JoinType::Right,
5171 JoinType::Full,
5172 JoinType::LeftSemi,
5173 JoinType::LeftAnti,
5174 JoinType::RightSemi,
5175 JoinType::RightAnti,
5176 ];
5177
5178 for join_type in join_types {
5179 let runtime = RuntimeEnvBuilder::new()
5180 .with_memory_limit(100, 1.0)
5181 .build_arc()?;
5182 let session_config = SessionConfig::default().with_batch_size(50);
5183 let task_ctx = TaskContext::default()
5184 .with_session_config(session_config)
5185 .with_runtime(runtime);
5186 let task_ctx = Arc::new(task_ctx);
5187
5188 let join = HashJoinExec::try_new(
5189 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
5190 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
5191 on.clone(),
5192 None,
5193 &join_type,
5194 None,
5195 PartitionMode::Partitioned,
5196 NullEquality::NullEqualsNothing,
5197 false,
5198 )?;
5199
5200 let stream = join.execute(1, task_ctx)?;
5201 let err = common::collect(stream).await.unwrap_err();
5202
5203 assert_contains!(
5205 err.to_string(),
5206 "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
5207 );
5208
5209 assert_contains!(
5210 err.to_string(),
5211 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
5212 );
5213 }
5214
5215 Ok(())
5216 }
5217
5218 fn build_table_struct(
5219 struct_name: &str,
5220 field_name_and_values: (&str, &Vec<Option<i32>>),
5221 nulls: Option<NullBuffer>,
5222 ) -> Arc<dyn ExecutionPlan> {
5223 let (field_name, values) = field_name_and_values;
5224 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
5225 let schema = Schema::new(vec![Field::new(
5226 struct_name,
5227 DataType::Struct(inner_fields.clone().into()),
5228 nulls.is_some(),
5229 )]);
5230
5231 let batch = RecordBatch::try_new(
5232 Arc::new(schema),
5233 vec![Arc::new(StructArray::new(
5234 inner_fields.into(),
5235 vec![Arc::new(Int32Array::from(values.clone()))],
5236 nulls,
5237 ))],
5238 )
5239 .unwrap();
5240 let schema_ref = batch.schema();
5241 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
5242 }
5243
5244 #[tokio::test]
5245 async fn join_on_struct() -> Result<()> {
5246 let task_ctx = Arc::new(TaskContext::default());
5247 let left =
5248 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
5249 let right =
5250 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
5251 let on = vec![(
5252 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5253 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5254 )];
5255
5256 let (columns, batches, metrics) = join_collect(
5257 left,
5258 right,
5259 on,
5260 &JoinType::Inner,
5261 NullEquality::NullEqualsNothing,
5262 task_ctx,
5263 )
5264 .await?;
5265
5266 assert_eq!(columns, vec!["n1", "n2"]);
5267
5268 allow_duplicates! {
5269 assert_snapshot!(batches_to_string(&batches), @r"
5270 +--------+--------+
5271 | n1 | n2 |
5272 +--------+--------+
5273 | {a: } | {a: } |
5274 | {a: 1} | {a: 1} |
5275 | {a: 2} | {a: 2} |
5276 +--------+--------+
5277 ");
5278 }
5279
5280 assert_join_metrics!(metrics, 3);
5281
5282 Ok(())
5283 }
5284
5285 #[tokio::test]
5286 async fn join_on_struct_with_nulls() -> Result<()> {
5287 let task_ctx = Arc::new(TaskContext::default());
5288 let left =
5289 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5290 let right =
5291 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5292 let on = vec![(
5293 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5294 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5295 )];
5296
5297 let (_, batches_null_eq, metrics) = join_collect(
5298 Arc::clone(&left),
5299 Arc::clone(&right),
5300 on.clone(),
5301 &JoinType::Inner,
5302 NullEquality::NullEqualsNull,
5303 Arc::clone(&task_ctx),
5304 )
5305 .await?;
5306
5307 allow_duplicates! {
5308 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
5309 +----+----+
5310 | n1 | n2 |
5311 +----+----+
5312 | | |
5313 +----+----+
5314 ");
5315 }
5316
5317 assert_join_metrics!(metrics, 1);
5318
5319 let (_, batches_null_neq, metrics) = join_collect(
5320 left,
5321 right,
5322 on,
5323 &JoinType::Inner,
5324 NullEquality::NullEqualsNothing,
5325 task_ctx,
5326 )
5327 .await?;
5328
5329 assert_join_metrics!(metrics, 0);
5330
5331 if batches_null_neq.is_empty() {
5334 } else {
5336 let expected_null_neq =
5337 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
5338 assert_batches_eq!(expected_null_neq, &batches_null_neq);
5339 }
5340
5341 Ok(())
5342 }
5343
5344 fn columns(schema: &Schema) -> Vec<String> {
5346 schema.fields().iter().map(|f| f.name().clone()).collect()
5347 }
5348
5349 #[tokio::test]
5351 async fn test_hash_join_marks_filter_complete() -> Result<()> {
5352 let task_ctx = Arc::new(TaskContext::default());
5353 let left = build_table(
5354 ("a1", &vec![1, 2, 3]),
5355 ("b1", &vec![4, 5, 6]),
5356 ("c1", &vec![7, 8, 9]),
5357 );
5358 let right = build_table(
5359 ("a2", &vec![10, 20, 30]),
5360 ("b1", &vec![4, 5, 6]),
5361 ("c2", &vec![70, 80, 90]),
5362 );
5363
5364 let on = vec![(
5365 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5366 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5367 )];
5368
5369 let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5371 let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5372
5373 let mut join = HashJoinExec::try_new(
5375 left,
5376 right,
5377 on,
5378 None,
5379 &JoinType::Inner,
5380 None,
5381 PartitionMode::CollectLeft,
5382 NullEquality::NullEqualsNothing,
5383 false,
5384 )?;
5385 join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5386 filter: dynamic_filter,
5387 build_accumulator: OnceLock::new(),
5388 });
5389
5390 let stream = join.execute(0, task_ctx)?;
5392 let _batches = common::collect(stream).await?;
5393
5394 dynamic_filter_clone.wait_complete().await;
5397
5398 Ok(())
5399 }
5400
5401 #[tokio::test]
5403 async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
5404 let task_ctx = Arc::new(TaskContext::default());
5405 let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
5407 let right = build_table(
5408 ("a2", &vec![10, 20, 30]),
5409 ("b1", &vec![4, 5, 6]),
5410 ("c2", &vec![70, 80, 90]),
5411 );
5412
5413 let on = vec![(
5414 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5415 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5416 )];
5417
5418 let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5420 let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5421
5422 let mut join = HashJoinExec::try_new(
5424 left,
5425 right,
5426 on,
5427 None,
5428 &JoinType::Inner,
5429 None,
5430 PartitionMode::CollectLeft,
5431 NullEquality::NullEqualsNothing,
5432 false,
5433 )?;
5434 join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5435 filter: dynamic_filter,
5436 build_accumulator: OnceLock::new(),
5437 });
5438
5439 let stream = join.execute(0, task_ctx)?;
5441 let _batches = common::collect(stream).await?;
5442
5443 dynamic_filter_clone.wait_complete().await;
5446
5447 Ok(())
5448 }
5449
5450 #[tokio::test]
5451 async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> {
5452 let task_ctx = prepare_task_ctx(8192, true);
5453 let (left_schema, right_schema, on) = build_schema_and_on()?;
5454
5455 let left_batch = RecordBatch::try_new(
5456 Arc::clone(&left_schema),
5457 vec![
5458 Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
5459 Arc::new(Int32Array::from(vec![-1, 0, 1])) as ArrayRef,
5460 ],
5461 )?;
5462 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5463
5464 let right_batch = RecordBatch::try_new(
5465 Arc::clone(&right_schema),
5466 vec![
5467 Arc::new(Int32Array::from(vec![10, 20, 30, 40])) as ArrayRef,
5468 Arc::new(Int32Array::from(vec![1, -1, 0, 2])) as ArrayRef,
5469 ],
5470 )?;
5471 let right =
5472 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5473
5474 let (columns, batches, metrics) = join_collect(
5475 left,
5476 right,
5477 on,
5478 &JoinType::Inner,
5479 NullEquality::NullEqualsNothing,
5480 task_ctx,
5481 )
5482 .await?;
5483
5484 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5485
5486 assert_batches_sorted_eq!(
5487 [
5488 "+----+----+----+----+",
5489 "| a1 | b1 | a2 | b1 |",
5490 "+----+----+----+----+",
5491 "| 1 | -1 | 20 | -1 |",
5492 "| 2 | 0 | 30 | 0 |",
5493 "| 3 | 1 | 10 | 1 |",
5494 "+----+----+----+----+",
5495 ],
5496 &batches
5497 );
5498
5499 assert_phj_used(&metrics, true);
5500
5501 Ok(())
5502 }
5503
5504 #[tokio::test]
5505 async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
5506 let task_ctx = prepare_task_ctx(8192, true);
5507 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5508 let batch = RecordBatch::try_new(
5509 Arc::clone(&schema),
5510 vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
5511 )?;
5512 let left = TestMemoryExec::try_new_exec(
5513 &[vec![batch.clone()]],
5514 Arc::clone(&schema),
5515 None,
5516 )?;
5517 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
5518 let on: JoinOn = vec![(
5519 Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
5520 Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
5521 )];
5522 let (_columns, batches, _metrics) = join_collect(
5523 left,
5524 right,
5525 on,
5526 &JoinType::Inner,
5527 NullEquality::NullEqualsNothing,
5528 task_ctx,
5529 )
5530 .await?;
5531 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
5532 assert_eq!(total_rows, 2);
5533 Ok(())
5534 }
5535
5536 #[apply(hash_join_exec_configs)]
5537 #[tokio::test]
5538 async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
5539 batch_size: usize,
5540 use_perfect_hash_join_as_possible: bool,
5541 ) -> Result<()> {
5542 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5543 let (left_schema, right_schema, on) = build_schema_and_on()?;
5544
5545 let left_batch = RecordBatch::try_new(
5546 Arc::clone(&left_schema),
5547 vec![
5548 Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
5549 Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef,
5550 ],
5551 )?;
5552 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5553
5554 let right_batch = RecordBatch::try_new(
5555 Arc::clone(&right_schema),
5556 vec![
5557 Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef,
5558 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5559 ],
5560 )?;
5561 let right =
5562 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5563
5564 let (columns, batches, metrics) = join_collect(
5565 left,
5566 right,
5567 on,
5568 &JoinType::Inner,
5569 NullEquality::NullEqualsNull,
5570 task_ctx,
5571 )
5572 .await?;
5573
5574 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5575 assert_batches_sorted_eq!(
5576 [
5577 "+----+----+----+----+",
5578 "| a1 | b1 | a2 | b1 |",
5579 "+----+----+----+----+",
5580 "| 1 | 10 | 3 | 10 |",
5581 "+----+----+----+----+",
5582 ],
5583 &batches
5584 );
5585
5586 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5587
5588 Ok(())
5589 }
5590
5591 #[apply(hash_join_exec_configs)]
5592 #[tokio::test]
5593 async fn test_phj_null_equals_nothing_build_probe_all_have_nulls(
5594 batch_size: usize,
5595 use_perfect_hash_join_as_possible: bool,
5596 ) -> Result<()> {
5597 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5598 let (left_schema, right_schema, on) = build_schema_and_on()?;
5599
5600 let left_batch = RecordBatch::try_new(
5601 Arc::clone(&left_schema),
5602 vec![
5603 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef,
5604 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5605 ],
5606 )?;
5607 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5608
5609 let right_batch = RecordBatch::try_new(
5610 Arc::clone(&right_schema),
5611 vec![
5612 Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5613 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5614 ],
5615 )?;
5616 let right =
5617 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5618
5619 let (columns, batches, metrics) = join_collect(
5620 left,
5621 right,
5622 on,
5623 &JoinType::Inner,
5624 NullEquality::NullEqualsNothing,
5625 task_ctx,
5626 )
5627 .await?;
5628
5629 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5630 assert_batches_sorted_eq!(
5631 [
5632 "+----+----+----+----+",
5633 "| a1 | b1 | a2 | b1 |",
5634 "+----+----+----+----+",
5635 "| 1 | 10 | 3 | 10 |",
5636 "+----+----+----+----+",
5637 ],
5638 &batches
5639 );
5640
5641 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5642
5643 Ok(())
5644 }
5645
5646 #[tokio::test]
5647 async fn test_phj_null_equals_null_build_have_nulls() -> Result<()> {
5648 let task_ctx = prepare_task_ctx(8192, true);
5649 let (left_schema, right_schema, on) = build_schema_and_on()?;
5650
5651 let left_batch = RecordBatch::try_new(
5652 Arc::clone(&left_schema),
5653 vec![
5654 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef,
5655 Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef,
5656 ],
5657 )?;
5658 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5659
5660 let right_batch = RecordBatch::try_new(
5661 Arc::clone(&right_schema),
5662 vec![
5663 Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5664 Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef,
5665 ],
5666 )?;
5667 let right =
5668 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5669
5670 let (columns, batches, metrics) = join_collect(
5671 left,
5672 right,
5673 on,
5674 &JoinType::Inner,
5675 NullEquality::NullEqualsNull,
5676 task_ctx,
5677 )
5678 .await?;
5679
5680 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5681 assert_batches_sorted_eq!(
5682 [
5683 "+----+----+----+----+",
5684 "| a1 | b1 | a2 | b1 |",
5685 "+----+----+----+----+",
5686 "| 1 | 10 | 3 | 10 |",
5687 "+----+----+----+----+",
5688 ],
5689 &batches
5690 );
5691
5692 assert_phj_used(&metrics, false);
5693
5694 Ok(())
5695 }
5696
5697 #[apply(hash_join_exec_configs)]
5700 #[tokio::test]
5701 async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> {
5702 let task_ctx = prepare_task_ctx(batch_size, false);
5703
5704 let left = build_table_two_cols(
5706 ("c1", &vec![Some(1), Some(2), Some(3), Some(4)]),
5707 ("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]),
5708 );
5709
5710 let right = build_table_two_cols(
5712 ("c2", &vec![Some(1), Some(2), Some(3), None]),
5713 ("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]),
5714 );
5715
5716 let on = vec![(
5717 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5718 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5719 )];
5720
5721 let join = HashJoinExec::try_new(
5723 left,
5724 right,
5725 on,
5726 None,
5727 &JoinType::LeftAnti,
5728 None,
5729 PartitionMode::CollectLeft,
5730 NullEquality::NullEqualsNothing,
5731 true, )?;
5733
5734 let stream = join.execute(0, task_ctx)?;
5735 let batches = common::collect(stream).await?;
5736
5737 allow_duplicates! {
5739 assert_snapshot!(batches_to_sort_string(&batches), @r"
5740 ++
5741 ++
5742 ");
5743 }
5744 Ok(())
5745 }
5746
5747 #[apply(hash_join_exec_configs)]
5750 #[tokio::test]
5751 async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> {
5752 let task_ctx = prepare_task_ctx(batch_size, false);
5753
5754 let left = build_table_two_cols(
5756 ("c1", &vec![Some(1), Some(4), None]),
5757 ("dummy", &vec![Some(10), Some(40), Some(0)]),
5758 );
5759
5760 let right = build_table_two_cols(
5762 ("c2", &vec![Some(1), Some(2), Some(3)]),
5763 ("dummy", &vec![Some(100), Some(200), Some(300)]),
5764 );
5765
5766 let on = vec![(
5767 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5768 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5769 )];
5770
5771 let join = HashJoinExec::try_new(
5773 left,
5774 right,
5775 on,
5776 None,
5777 &JoinType::LeftAnti,
5778 None,
5779 PartitionMode::CollectLeft,
5780 NullEquality::NullEqualsNothing,
5781 true, )?;
5783
5784 let stream = join.execute(0, task_ctx)?;
5785 let batches = common::collect(stream).await?;
5786
5787 allow_duplicates! {
5789 assert_snapshot!(batches_to_sort_string(&batches), @r"
5790 +----+-------+
5791 | c1 | dummy |
5792 +----+-------+
5793 | 4 | 40 |
5794 +----+-------+
5795 ");
5796 }
5797 Ok(())
5798 }
5799
5800 #[apply(hash_join_exec_configs)]
5802 #[tokio::test]
5803 async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> {
5804 let task_ctx = prepare_task_ctx(batch_size, false);
5805
5806 let left = build_table_two_cols(
5808 ("c1", &vec![Some(1), Some(2), Some(4), Some(5)]),
5809 ("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]),
5810 );
5811
5812 let right = build_table_two_cols(
5814 ("c2", &vec![Some(1), Some(2), Some(3)]),
5815 ("dummy", &vec![Some(100), Some(200), Some(300)]),
5816 );
5817
5818 let on = vec![(
5819 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
5820 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
5821 )];
5822
5823 let join = HashJoinExec::try_new(
5825 left,
5826 right,
5827 on,
5828 None,
5829 &JoinType::LeftAnti,
5830 None,
5831 PartitionMode::CollectLeft,
5832 NullEquality::NullEqualsNothing,
5833 true, )?;
5835
5836 let stream = join.execute(0, task_ctx)?;
5837 let batches = common::collect(stream).await?;
5838
5839 allow_duplicates! {
5841 assert_snapshot!(batches_to_sort_string(&batches), @r"
5842 +----+-------+
5843 | c1 | dummy |
5844 +----+-------+
5845 | 4 | 40 |
5846 | 5 | 50 |
5847 +----+-------+
5848 ");
5849 }
5850 Ok(())
5851 }
5852
5853 #[tokio::test]
5855 async fn test_null_aware_validation_wrong_join_type() {
5856 let left =
5857 build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)]));
5858 let right =
5859 build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)]));
5860
5861 let on = vec![(
5862 Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _,
5863 Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _,
5864 )];
5865
5866 let result = HashJoinExec::try_new(
5868 left,
5869 right,
5870 on,
5871 None,
5872 &JoinType::Inner,
5873 None,
5874 PartitionMode::CollectLeft,
5875 NullEquality::NullEqualsNothing,
5876 true, );
5878
5879 assert!(result.is_err());
5880 assert!(
5881 result
5882 .unwrap_err()
5883 .to_string()
5884 .contains("null_aware can only be true for LeftAnti joins")
5885 );
5886 }
5887
5888 #[tokio::test]
5890 async fn test_null_aware_validation_multi_column() {
5891 let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3]));
5892 let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3]));
5893
5894 let on = vec![
5896 (
5897 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
5898 Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _,
5899 ),
5900 (
5901 Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _,
5902 Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _,
5903 ),
5904 ];
5905
5906 let result = HashJoinExec::try_new(
5908 left,
5909 right,
5910 on,
5911 None,
5912 &JoinType::LeftAnti,
5913 None,
5914 PartitionMode::CollectLeft,
5915 NullEquality::NullEqualsNothing,
5916 true, );
5918
5919 assert!(result.is_err());
5920 assert!(
5921 result
5922 .unwrap_err()
5923 .to_string()
5924 .contains("null_aware anti join only supports single column join key")
5925 );
5926 }
5927
5928 #[test]
5929 fn test_lr_is_preserved() {
5930 assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
5931 assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
5932 assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
5933 assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
5934 assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true));
5935 assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true));
5936 assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
5937 assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true));
5938 assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true));
5939 assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
5940 }
5941}