1use std::collections::HashSet;
19use std::fmt;
20use std::mem::size_of;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, OnceLock};
23use std::vec;
24
25use crate::ExecutionPlanProperties;
26use crate::execution_plan::{
27 EmissionType, boundedness_from_children, has_same_children_properties,
28 stub_properties,
29};
30use crate::filter_pushdown::{
31 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32 FilterPushdownPropagation,
33};
34use crate::joins::Map;
35use crate::joins::array_map::ArrayMap;
36use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
37use crate::joins::hash_join::shared_bounds::{
38 ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
39};
40use crate::joins::hash_join::stream::{
41 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
42};
43use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
44use crate::joins::utils::{
45 OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
46 swap_join_projection, update_hash,
47};
48use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
49use crate::metrics::{Count, MetricBuilder, MetricCategory};
50use crate::projection::{
51 EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
52 try_pushdown_through_join,
53};
54use crate::repartition::REPARTITION_RANDOM_STATE;
55use crate::spill::get_record_batch_memory_size;
56use crate::{
57 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
58 PlanProperties, SendableRecordBatchStream, Statistics,
59 common::can_project,
60 joins::utils::{
61 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
62 build_join_schema, check_join_is_valid, estimate_join_statistics,
63 need_produce_result_in_final, symmetric_join_output_partitioning,
64 },
65 metrics::{ExecutionPlanMetricsSet, MetricsSet},
66};
67
68use arrow::array::{ArrayRef, BooleanBufferBuilder};
69use arrow::compute::concat_batches;
70use arrow::datatypes::SchemaRef;
71use arrow::record_batch::RecordBatch;
72use arrow::util::bit_util;
73use arrow_schema::{DataType, Schema};
74use datafusion_common::config::ConfigOptions;
75use datafusion_common::utils::memory::estimate_memory_size;
76use datafusion_common::{
77 JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
78 plan_err, project_schema,
79};
80use datafusion_execution::TaskContext;
81use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
82use datafusion_expr::Accumulator;
83use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
84use datafusion_physical_expr::equivalence::{
85 ProjectionMapping, join_equivalence_properties,
86};
87use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
88use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
89use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
90
91use datafusion_common::hash_utils::RandomState;
92use datafusion_physical_expr_common::physical_expr::fmt_sql;
93use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
94use futures::TryStreamExt;
95use parking_lot::Mutex;
96
97use super::partitioned_hash_eval::SeededRandomState;
98
99pub(crate) const HASH_JOIN_SEED: SeededRandomState =
101 SeededRandomState::with_seed(12210250226015887276);
102
103const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";
104
105#[expect(clippy::too_many_arguments)]
106fn try_create_array_map(
107 bounds: &Option<PartitionBounds>,
108 schema: &SchemaRef,
109 batches: &[RecordBatch],
110 on_left: &[PhysicalExprRef],
111 reservation: &mut MemoryReservation,
112 perfect_hash_join_small_build_threshold: usize,
113 perfect_hash_join_min_key_density: f64,
114 null_equality: NullEquality,
115) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
116 if on_left.len() != 1 {
117 return Ok(None);
118 }
119
120 if null_equality == NullEquality::NullEqualsNull {
121 for batch in batches.iter() {
122 let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
123 if arrays[0].null_count() > 0 {
124 return Ok(None);
125 }
126 }
127 }
128
129 let (min_val, max_val) = if let Some(bounds) = bounds {
130 let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) {
131 (cb.min.clone(), cb.max.clone())
132 } else {
133 return Ok(None);
134 };
135
136 if min_val.is_null() || max_val.is_null() {
137 return Ok(None);
138 }
139
140 if min_val > max_val {
141 return internal_err!("min_val>max_val");
142 }
143
144 if let Some((mi, ma)) =
145 ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val))
146 {
147 (mi, ma)
148 } else {
149 return Ok(None);
150 }
151 } else {
152 return Ok(None);
153 };
154
155 let range = ArrayMap::calculate_range(min_val, max_val);
156 let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
157
158 if num_row >= u32::MAX as usize {
160 return Ok(None);
161 }
162
163 if range == usize::MAX as u64 {
166 return Ok(None);
167 }
168
169 let dense_ratio = (num_row as f64) / ((range + 1) as f64);
170
171 if range >= perfect_hash_join_small_build_threshold as u64
172 && dense_ratio <= perfect_hash_join_min_key_density
173 {
174 return Ok(None);
175 }
176
177 let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
178 reservation.try_grow(mem_size)?;
179
180 let batch = concat_batches(schema, batches)?;
181 let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;
182
183 let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
184
185 Ok(Some((array_map, batch, left_values)))
186}
187
188pub(super) struct JoinLeftData {
190 pub(super) map: Arc<Map>,
193 batch: RecordBatch,
195 values: Vec<ArrayRef>,
197 visited_indices_bitmap: SharedBitmapBuilder,
199 probe_threads_counter: AtomicUsize,
202 _reservation: MemoryReservation,
207 pub(super) bounds: Option<PartitionBounds>,
211 pub(super) membership: PushdownStrategy,
214 pub(super) probe_side_non_empty: AtomicBool,
217 pub(super) probe_side_has_null: AtomicBool,
219}
220
221impl JoinLeftData {
222 pub(super) fn map(&self) -> &Map {
224 &self.map
225 }
226
227 pub(super) fn batch(&self) -> &RecordBatch {
229 &self.batch
230 }
231
232 pub(super) fn values(&self) -> &[ArrayRef] {
234 &self.values
235 }
236
237 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
239 &self.visited_indices_bitmap
240 }
241
242 pub(super) fn membership(&self) -> &PushdownStrategy {
244 &self.membership
245 }
246
247 pub(super) fn report_probe_completed(&self) -> bool {
250 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
251 }
252}
253
254pub struct HashJoinExecBuilder {
267 exec: HashJoinExec,
268 preserve_properties: bool,
269}
270
271impl HashJoinExecBuilder {
272 pub fn new(
274 left: Arc<dyn ExecutionPlan>,
275 right: Arc<dyn ExecutionPlan>,
276 on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
277 join_type: JoinType,
278 ) -> Self {
279 Self {
280 exec: HashJoinExec {
281 left,
282 right,
283 on,
284 filter: None,
285 join_type,
286 left_fut: Default::default(),
287 random_state: HASH_JOIN_SEED,
288 mode: PartitionMode::Auto,
289 fetch: None,
290 metrics: ExecutionPlanMetricsSet::new(),
291 projection: None,
292 column_indices: vec![],
293 null_equality: NullEquality::NullEqualsNothing,
294 null_aware: false,
295 dynamic_filter: None,
296 cache: stub_properties(),
298 join_schema: Arc::new(Schema::empty()),
299 },
300 preserve_properties: false,
303 }
304 }
305
306 pub fn with_type(mut self, join_type: JoinType) -> Self {
308 self.exec.join_type = join_type;
309 self.preserve_properties = false;
310 self
311 }
312
313 pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
315 self.with_projection_ref(projection.map(Into::into))
316 }
317
318 pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self {
320 self.exec.projection = projection;
321 self.preserve_properties = false;
322 self
323 }
324
325 pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
327 self.exec.filter = filter;
328 self
329 }
330
331 pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) -> Self {
333 self.exec.on = on;
334 self.preserve_properties = false;
335 self
336 }
337
338 pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
340 self.exec.mode = mode;
341 self.preserve_properties = false;
342 self
343 }
344
345 pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
347 self.exec.null_equality = null_equality;
348 self
349 }
350
351 pub fn with_null_aware(mut self, null_aware: bool) -> Self {
353 self.exec.null_aware = null_aware;
354 self
355 }
356
357 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
359 self.exec.fetch = fetch;
360 self
361 }
362
363 pub fn recompute_properties(mut self) -> Self {
365 self.preserve_properties = false;
366 self
367 }
368
369 pub fn with_new_children(
371 mut self,
372 mut children: Vec<Arc<dyn ExecutionPlan>>,
373 ) -> Result<Self> {
374 assert_or_internal_err!(
375 children.len() == 2,
376 "wrong number of children passed into `HashJoinExecBuilder`"
377 );
378 self.preserve_properties &= has_same_children_properties(&self.exec, &children)?;
379 self.exec.right = children.swap_remove(1);
380 self.exec.left = children.swap_remove(0);
381 Ok(self)
382 }
383
384 pub fn reset_state(mut self) -> Self {
386 self.exec.left_fut = Default::default();
387 self.exec.dynamic_filter = None;
388 self.exec.metrics = ExecutionPlanMetricsSet::new();
389 self
390 }
391
392 pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
394 self.build().map(|p| Arc::new(p) as _)
395 }
396
397 pub fn build(self) -> Result<HashJoinExec> {
399 let Self {
400 exec,
401 preserve_properties,
402 } = self;
403
404 if exec.null_aware {
406 let join_type = exec.join_type();
407 if !matches!(join_type, JoinType::LeftAnti) {
408 return plan_err!(
409 "null_aware can only be true for LeftAnti joins, got {join_type}"
410 );
411 }
412 let on = exec.on();
413 if on.len() != 1 {
414 return plan_err!(
415 "null_aware anti join only supports single column join key, got {} columns",
416 on.len()
417 );
418 }
419 }
420
421 if preserve_properties {
422 return Ok(exec);
423 }
424
425 let HashJoinExec {
426 left,
427 right,
428 on,
429 filter,
430 join_type,
431 left_fut,
432 random_state,
433 mode,
434 metrics,
435 projection,
436 null_equality,
437 null_aware,
438 dynamic_filter,
439 fetch,
440 join_schema: _,
442 column_indices: _,
443 cache: _,
444 } = exec;
445
446 let left_schema = left.schema();
447 let right_schema = right.schema();
448 if on.is_empty() {
449 return plan_err!("On constraints in HashJoinExec should be non-empty");
450 }
451
452 check_join_is_valid(&left_schema, &right_schema, &on)?;
453 let (join_schema, column_indices) =
454 build_join_schema(&left_schema, &right_schema, &join_type);
455
456 let join_schema = Arc::new(join_schema);
457
458 can_project(&join_schema, projection.as_deref())?;
460
461 let cache = HashJoinExec::compute_properties(
462 &left,
463 &right,
464 &join_schema,
465 join_type,
466 &on,
467 mode,
468 projection.as_deref(),
469 )?;
470
471 Ok(HashJoinExec {
472 left,
473 right,
474 on,
475 filter,
476 join_type,
477 join_schema,
478 left_fut,
479 random_state,
480 mode,
481 metrics,
482 projection,
483 column_indices,
484 null_equality,
485 null_aware,
486 cache: Arc::new(cache),
487 dynamic_filter,
488 fetch,
489 })
490 }
491
492 fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
493 self.exec.dynamic_filter = filter;
494 self
495 }
496}
497
498impl From<&HashJoinExec> for HashJoinExecBuilder {
499 fn from(exec: &HashJoinExec) -> Self {
500 Self {
501 exec: HashJoinExec {
502 left: Arc::clone(exec.left()),
503 right: Arc::clone(exec.right()),
504 on: exec.on.clone(),
505 filter: exec.filter.clone(),
506 join_type: exec.join_type,
507 join_schema: Arc::clone(&exec.join_schema),
508 left_fut: Arc::clone(&exec.left_fut),
509 random_state: exec.random_state.clone(),
510 mode: exec.mode,
511 metrics: exec.metrics.clone(),
512 projection: exec.projection.clone(),
513 column_indices: exec.column_indices.clone(),
514 null_equality: exec.null_equality,
515 null_aware: exec.null_aware,
516 cache: Arc::clone(&exec.cache),
517 dynamic_filter: exec.dynamic_filter.clone(),
518 fetch: exec.fetch,
519 },
520 preserve_properties: true,
521 }
522 }
523}
524
525#[expect(rustdoc::private_intra_doc_links)]
526pub struct HashJoinExec {
718 pub left: Arc<dyn ExecutionPlan>,
720 pub right: Arc<dyn ExecutionPlan>,
722 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
724 pub filter: Option<JoinFilter>,
726 pub join_type: JoinType,
728 join_schema: SchemaRef,
731 left_fut: Arc<OnceAsync<JoinLeftData>>,
738 random_state: SeededRandomState,
740 pub mode: PartitionMode,
742 metrics: ExecutionPlanMetricsSet,
744 pub projection: Option<ProjectionRef>,
746 column_indices: Vec<ColumnIndex>,
748 pub null_equality: NullEquality,
750 pub null_aware: bool,
752 cache: Arc<PlanProperties>,
754 dynamic_filter: Option<HashJoinExecDynamicFilter>,
758 fetch: Option<usize>,
760}
761
762#[derive(Clone)]
763struct HashJoinExecDynamicFilter {
764 filter: Arc<DynamicFilterPhysicalExpr>,
766 build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
769}
770
771impl fmt::Debug for HashJoinExec {
772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773 f.debug_struct("HashJoinExec")
774 .field("left", &self.left)
775 .field("right", &self.right)
776 .field("on", &self.on)
777 .field("filter", &self.filter)
778 .field("join_type", &self.join_type)
779 .field("join_schema", &self.join_schema)
780 .field("left_fut", &self.left_fut)
781 .field("random_state", &self.random_state)
782 .field("mode", &self.mode)
783 .field("metrics", &self.metrics)
784 .field("projection", &self.projection)
785 .field("column_indices", &self.column_indices)
786 .field("null_equality", &self.null_equality)
787 .field("cache", &self.cache)
788 .finish()
790 }
791}
792
793impl EmbeddedProjection for HashJoinExec {
794 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
795 self.with_projection(projection)
796 }
797}
798
799impl HashJoinExec {
800 #[expect(clippy::too_many_arguments)]
805 pub fn try_new(
806 left: Arc<dyn ExecutionPlan>,
807 right: Arc<dyn ExecutionPlan>,
808 on: JoinOn,
809 filter: Option<JoinFilter>,
810 join_type: &JoinType,
811 projection: Option<Vec<usize>>,
812 partition_mode: PartitionMode,
813 null_equality: NullEquality,
814 null_aware: bool,
815 ) -> Result<Self> {
816 HashJoinExecBuilder::new(left, right, on, *join_type)
817 .with_filter(filter)
818 .with_projection(projection)
819 .with_partition_mode(partition_mode)
820 .with_null_equality(null_equality)
821 .with_null_aware(null_aware)
822 .build()
823 }
824
825 pub fn builder(&self) -> HashJoinExecBuilder {
831 self.into()
832 }
833
834 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
835 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
838 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
840 }
841
842 fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
843 let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
844 if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown {
845 return false;
846 }
847
848 if config.optimizer.preserve_file_partitions > 0
855 && self.mode == PartitionMode::Partitioned
856 {
857 return false;
858 }
859
860 true
861 }
862
863 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
865 &self.left
866 }
867
868 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
870 &self.right
871 }
872
873 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
875 &self.on
876 }
877
878 pub fn filter(&self) -> Option<&JoinFilter> {
880 self.filter.as_ref()
881 }
882
883 pub fn join_type(&self) -> &JoinType {
885 &self.join_type
886 }
887
888 pub fn join_schema(&self) -> &SchemaRef {
891 &self.join_schema
892 }
893
894 pub fn partition_mode(&self) -> &PartitionMode {
896 &self.mode
897 }
898
899 pub fn null_equality(&self) -> NullEquality {
901 self.null_equality
902 }
903
904 pub fn dynamic_filter_expr(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
907 self.dynamic_filter.as_ref().map(|df| &df.filter)
908 }
909
910 pub fn with_dynamic_filter_expr(
917 mut self,
918 filter: Arc<DynamicFilterPhysicalExpr>,
919 ) -> Result<Self> {
920 let probe_schema = self.right.schema();
921 for child in filter.children() {
922 child.data_type(&probe_schema)?;
923 }
924 self.dynamic_filter = Some(HashJoinExecDynamicFilter {
925 filter,
926 build_accumulator: OnceLock::new(),
929 });
930 Ok(self)
931 }
932
933 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
935 vec![
936 false,
937 matches!(
938 join_type,
939 JoinType::Inner
940 | JoinType::Right
941 | JoinType::RightAnti
942 | JoinType::RightSemi
943 | JoinType::RightMark
944 ),
945 ]
946 }
947
948 pub fn probe_side() -> JoinSide {
950 JoinSide::Right
952 }
953
954 pub fn contains_projection(&self) -> bool {
956 self.projection.is_some()
957 }
958
959 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
961 let projection = projection.map(Into::into);
962 can_project(&self.schema(), projection.as_deref())?;
964 let projection =
965 combine_projections(projection.as_ref(), self.projection.as_ref())?;
966 self.builder().with_projection_ref(projection).build()
967 }
968
969 fn compute_properties(
971 left: &Arc<dyn ExecutionPlan>,
972 right: &Arc<dyn ExecutionPlan>,
973 schema: &SchemaRef,
974 join_type: JoinType,
975 on: JoinOnRef,
976 mode: PartitionMode,
977 projection: Option<&[usize]>,
978 ) -> Result<PlanProperties> {
979 let mut eq_properties = join_equivalence_properties(
981 left.equivalence_properties().clone(),
982 right.equivalence_properties().clone(),
983 &join_type,
984 Arc::clone(schema),
985 &Self::maintains_input_order(join_type),
986 Some(Self::probe_side()),
987 on,
988 )?;
989
990 let mut output_partitioning = match mode {
991 PartitionMode::CollectLeft => {
992 asymmetric_join_output_partitioning(left, right, &join_type)?
993 }
994 PartitionMode::Auto => Partitioning::UnknownPartitioning(
995 right.output_partitioning().partition_count(),
996 ),
997 PartitionMode::Partitioned => {
998 symmetric_join_output_partitioning(left, right, &join_type)?
999 }
1000 };
1001
1002 let emission_type = if left.boundedness().is_unbounded() {
1003 EmissionType::Final
1004 } else if right.pipeline_behavior() == EmissionType::Incremental {
1005 match join_type {
1006 JoinType::Inner
1009 | JoinType::LeftSemi
1010 | JoinType::RightSemi
1011 | JoinType::Right
1012 | JoinType::RightAnti
1013 | JoinType::RightMark => EmissionType::Incremental,
1014 JoinType::Left
1017 | JoinType::LeftAnti
1018 | JoinType::LeftMark
1019 | JoinType::Full => EmissionType::Both,
1020 }
1021 } else {
1022 right.pipeline_behavior()
1023 };
1024
1025 if let Some(projection) = projection {
1027 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
1029 let out_schema = project_schema(schema, Some(&projection))?;
1030 output_partitioning =
1031 output_partitioning.project(&projection_mapping, &eq_properties);
1032 eq_properties = eq_properties.project(&projection_mapping, out_schema);
1033 }
1034
1035 Ok(PlanProperties::new(
1036 eq_properties,
1037 output_partitioning,
1038 emission_type,
1039 boundedness_from_children([left, right]),
1040 ))
1041 }
1042
1043 pub fn swap_inputs(
1067 &self,
1068 partition_mode: PartitionMode,
1069 ) -> Result<Arc<dyn ExecutionPlan>> {
1070 let left = self.left();
1071 let right = self.right();
1072 let new_join = self
1073 .builder()
1074 .with_type(self.join_type.swap())
1075 .with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
1076 .with_on(
1077 self.on()
1078 .iter()
1079 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
1080 .collect(),
1081 )
1082 .with_filter(self.filter().map(JoinFilter::swap))
1083 .with_projection(swap_join_projection(
1084 left.schema().fields().len(),
1085 right.schema().fields().len(),
1086 self.projection.as_deref(),
1087 self.join_type(),
1088 ))
1089 .with_partition_mode(partition_mode)
1090 .build()?;
1091 if matches!(
1093 self.join_type(),
1094 JoinType::LeftSemi
1095 | JoinType::RightSemi
1096 | JoinType::LeftAnti
1097 | JoinType::RightAnti
1098 | JoinType::LeftMark
1099 | JoinType::RightMark
1100 ) || self.projection.is_some()
1101 {
1102 Ok(Arc::new(new_join))
1103 } else {
1104 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
1105 }
1106 }
1107}
1108
1109impl DisplayAs for HashJoinExec {
1110 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
1111 match t {
1112 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1113 let display_filter = self.filter.as_ref().map_or_else(
1114 || "".to_string(),
1115 |f| format!(", filter={}", f.expression()),
1116 );
1117 let display_projections = if self.contains_projection() {
1118 format!(
1119 ", projection=[{}]",
1120 self.projection
1121 .as_ref()
1122 .unwrap()
1123 .iter()
1124 .map(|index| format!(
1125 "{}@{}",
1126 self.join_schema.fields().get(*index).unwrap().name(),
1127 index
1128 ))
1129 .collect::<Vec<_>>()
1130 .join(", ")
1131 )
1132 } else {
1133 "".to_string()
1134 };
1135 let display_null_equality =
1136 if self.null_equality() == NullEquality::NullEqualsNull {
1137 ", NullsEqual: true"
1138 } else {
1139 ""
1140 };
1141 let display_fetch = self
1142 .fetch
1143 .map_or_else(String::new, |f| format!(", fetch={f}"));
1144 let on = self
1145 .on
1146 .iter()
1147 .map(|(c1, c2)| format!("({c1}, {c2})"))
1148 .collect::<Vec<String>>()
1149 .join(", ");
1150 write!(
1151 f,
1152 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
1153 self.mode,
1154 self.join_type,
1155 on,
1156 display_filter,
1157 display_projections,
1158 display_null_equality,
1159 display_fetch,
1160 )
1161 }
1162 DisplayFormatType::TreeRender => {
1163 let on = self
1164 .on
1165 .iter()
1166 .map(|(c1, c2)| {
1167 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
1168 })
1169 .collect::<Vec<String>>()
1170 .join(", ");
1171
1172 if *self.join_type() != JoinType::Inner {
1173 writeln!(f, "join_type={:?}", self.join_type)?;
1174 }
1175
1176 writeln!(f, "on={on}")?;
1177
1178 if self.null_equality() == NullEquality::NullEqualsNull {
1179 writeln!(f, "NullsEqual: true")?;
1180 }
1181
1182 if let Some(filter) = self.filter.as_ref() {
1183 writeln!(f, "filter={filter}")?;
1184 }
1185
1186 if let Some(fetch) = self.fetch {
1187 writeln!(f, "fetch={fetch}")?;
1188 }
1189
1190 Ok(())
1191 }
1192 }
1193 }
1194}
1195
1196impl ExecutionPlan for HashJoinExec {
1197 fn name(&self) -> &'static str {
1198 "HashJoinExec"
1199 }
1200
1201 fn properties(&self) -> &Arc<PlanProperties> {
1202 &self.cache
1203 }
1204
1205 fn required_input_distribution(&self) -> Vec<Distribution> {
1206 match self.mode {
1207 PartitionMode::CollectLeft => vec![
1208 Distribution::SinglePartition,
1209 Distribution::UnspecifiedDistribution,
1210 ],
1211 PartitionMode::Partitioned => {
1212 let (left_expr, right_expr) = self
1213 .on
1214 .iter()
1215 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1216 .unzip();
1217 vec![
1218 Distribution::HashPartitioned(left_expr),
1219 Distribution::HashPartitioned(right_expr),
1220 ]
1221 }
1222 PartitionMode::Auto => vec![
1223 Distribution::UnspecifiedDistribution,
1224 Distribution::UnspecifiedDistribution,
1225 ],
1226 }
1227 }
1228
1229 fn maintains_input_order(&self) -> Vec<bool> {
1246 Self::maintains_input_order(self.join_type)
1247 }
1248
1249 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1250 vec![&self.left, &self.right]
1251 }
1252
1253 fn with_new_children(
1259 self: Arc<Self>,
1260 children: Vec<Arc<dyn ExecutionPlan>>,
1261 ) -> Result<Arc<dyn ExecutionPlan>> {
1262 self.builder().with_new_children(children)?.build_exec()
1263 }
1264
1265 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1266 self.builder().reset_state().build_exec()
1267 }
1268
1269 fn execute(
1270 &self,
1271 partition: usize,
1272 context: Arc<TaskContext>,
1273 ) -> Result<SendableRecordBatchStream> {
1274 let on_left = self
1275 .on
1276 .iter()
1277 .map(|on| Arc::clone(&on.0))
1278 .collect::<Vec<_>>();
1279 let left_partitions = self.left.output_partitioning().partition_count();
1280 let right_partitions = self.right.output_partitioning().partition_count();
1281
1282 assert_or_internal_err!(
1283 self.mode != PartitionMode::Partitioned
1284 || left_partitions == right_partitions,
1285 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
1286 consider using RepartitionExec"
1287 );
1288
1289 assert_or_internal_err!(
1290 self.mode != PartitionMode::CollectLeft || left_partitions == 1,
1291 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
1292 consider using CoalescePartitionsExec or the EnforceDistribution rule"
1293 );
1294
1295 let enable_dynamic_filter_pushdown = self
1301 .allow_join_dynamic_filter_pushdown(context.session_config().options())
1302 && self
1303 .dynamic_filter
1304 .as_ref()
1305 .map(|df| df.filter.is_used())
1306 .unwrap_or(false);
1307
1308 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
1309
1310 let array_map_created_count = MetricBuilder::new(&self.metrics)
1311 .with_category(MetricCategory::Rows)
1312 .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
1313
1314 let repartition_random_state = REPARTITION_RANDOM_STATE;
1317 let build_accumulator = enable_dynamic_filter_pushdown
1318 .then(|| {
1319 self.dynamic_filter.as_ref().map(|df| {
1320 let filter = Arc::clone(&df.filter);
1321 let on_right = self
1322 .on
1323 .iter()
1324 .map(|(_, right_expr)| Arc::clone(right_expr))
1325 .collect::<Vec<_>>();
1326 Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1327 Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1328 self.mode,
1329 self.left.as_ref(),
1330 self.right.as_ref(),
1331 filter,
1332 on_right,
1333 repartition_random_state,
1334 ))
1335 })))
1336 })
1337 })
1338 .flatten()
1339 .flatten();
1340
1341 let left_fut = match self.mode {
1342 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
1343 let left_stream = self.left.execute(0, Arc::clone(&context))?;
1344
1345 let reservation =
1346 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
1347
1348 Ok(collect_left_input(
1349 self.random_state.random_state().clone(),
1350 left_stream,
1351 on_left.clone(),
1352 join_metrics.clone(),
1353 reservation,
1354 need_produce_result_in_final(self.join_type),
1355 self.right().output_partitioning().partition_count(),
1356 enable_dynamic_filter_pushdown,
1357 Arc::clone(context.session_config().options()),
1358 self.null_equality,
1359 array_map_created_count,
1360 ))
1361 })?,
1362 PartitionMode::Partitioned => {
1363 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
1364
1365 let reservation =
1366 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
1367 .register(context.memory_pool());
1368 OnceFut::new(collect_left_input(
1369 self.random_state.random_state().clone(),
1370 left_stream,
1371 on_left.clone(),
1372 join_metrics.clone(),
1373 reservation,
1374 need_produce_result_in_final(self.join_type),
1375 1,
1376 enable_dynamic_filter_pushdown,
1377 Arc::clone(context.session_config().options()),
1378 self.null_equality,
1379 array_map_created_count,
1380 ))
1381 }
1382 PartitionMode::Auto => {
1383 return plan_err!(
1384 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
1385 PartitionMode::Auto
1386 );
1387 }
1388 };
1389
1390 let batch_size = context.session_config().batch_size();
1391
1392 let right_stream = self.right.execute(partition, context)?;
1395
1396 let column_indices_after_projection = match self.projection.as_ref() {
1398 Some(projection) => projection
1399 .iter()
1400 .map(|i| self.column_indices[*i].clone())
1401 .collect(),
1402 None => self.column_indices.clone(),
1403 };
1404
1405 let on_right = self
1406 .on
1407 .iter()
1408 .map(|(_, right_expr)| Arc::clone(right_expr))
1409 .collect::<Vec<_>>();
1410
1411 Ok(Box::pin(HashJoinStream::new(
1412 partition,
1413 self.schema(),
1414 on_right,
1415 self.filter.clone(),
1416 self.join_type,
1417 right_stream,
1418 self.random_state.random_state().clone(),
1419 join_metrics,
1420 column_indices_after_projection,
1421 self.null_equality,
1422 HashJoinStreamState::WaitBuildSide,
1423 BuildSide::Initial(BuildSideInitialState { left_fut }),
1424 batch_size,
1425 vec![],
1426 self.right.output_ordering().is_some(),
1427 build_accumulator,
1428 self.mode,
1429 self.null_aware,
1430 self.fetch,
1431 )))
1432 }
1433
1434 fn metrics(&self) -> Option<MetricsSet> {
1435 Some(self.metrics.clone_inner())
1436 }
1437
1438 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1439 let stats = match (partition, self.mode) {
1440 (Some(partition), PartitionMode::CollectLeft) => {
1444 let left_stats = self.left.partition_statistics(None)?;
1445 let right_stats = self.right.partition_statistics(Some(partition))?;
1446
1447 estimate_join_statistics(
1448 Arc::unwrap_or_clone(left_stats),
1449 Arc::unwrap_or_clone(right_stats),
1450 &self.on,
1451 &self.join_type,
1452 &self.join_schema,
1453 )?
1454 }
1455
1456 (Some(partition), PartitionMode::Partitioned) => {
1459 let left_stats = self.left.partition_statistics(Some(partition))?;
1460 let right_stats = self.right.partition_statistics(Some(partition))?;
1461
1462 estimate_join_statistics(
1463 Arc::unwrap_or_clone(left_stats),
1464 Arc::unwrap_or_clone(right_stats),
1465 &self.on,
1466 &self.join_type,
1467 &self.join_schema,
1468 )?
1469 }
1470
1471 (None, _) | (Some(_), PartitionMode::Auto) => {
1474 let left_stats = self.left.partition_statistics(None)?;
1478 let right_stats = self.right.partition_statistics(None)?;
1479 estimate_join_statistics(
1480 Arc::unwrap_or_clone(left_stats),
1481 Arc::unwrap_or_clone(right_stats),
1482 &self.on,
1483 &self.join_type,
1484 &self.join_schema,
1485 )?
1486 }
1487 };
1488 let stats = stats.project(self.projection.as_ref());
1490 Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
1492 }
1493
1494 fn try_swapping_with_projection(
1498 &self,
1499 projection: &ProjectionExec,
1500 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1501 if self.contains_projection() {
1503 return Ok(None);
1504 }
1505
1506 let schema = self.schema();
1507 if let Some(JoinData {
1508 projected_left_child,
1509 projected_right_child,
1510 join_filter,
1511 join_on,
1512 }) = try_pushdown_through_join(
1513 projection,
1514 self.left(),
1515 self.right(),
1516 self.on(),
1517 &schema,
1518 self.filter(),
1519 )? {
1520 self.builder()
1521 .with_new_children(vec![
1522 Arc::new(projected_left_child),
1523 Arc::new(projected_right_child),
1524 ])?
1525 .with_on(join_on)
1526 .with_filter(join_filter)
1527 .with_projection(None)
1529 .build_exec()
1530 .map(Some)
1531 } else {
1532 try_embed_projection(projection, self)
1533 }
1534 }
1535
1536 fn gather_filters_for_pushdown(
1537 &self,
1538 phase: FilterPushdownPhase,
1539 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1540 config: &ConfigOptions,
1541 ) -> Result<FilterDescription> {
1542 let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);
1556
1557 let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
1559 Some(projection) => projection
1560 .iter()
1561 .map(|i| self.column_indices[*i].clone())
1562 .collect(),
1563 None => self.column_indices.clone(),
1564 };
1565
1566 let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new());
1567 column_indices
1568 .iter()
1569 .enumerate()
1570 .for_each(|(output_idx, ci)| {
1571 match ci.side {
1572 JoinSide::Left => left_allowed.insert(output_idx),
1573 JoinSide::Right => right_allowed.insert(output_idx),
1574 JoinSide::None => false,
1576 };
1577 });
1578
1579 match self.join_type {
1586 JoinType::LeftSemi | JoinType::LeftAnti => {
1587 let left_key_indices: HashSet<usize> = self
1588 .on
1589 .iter()
1590 .filter_map(|(left_key, _)| {
1591 left_key.downcast_ref::<Column>().map(|c| c.index())
1592 })
1593 .collect();
1594 for (output_idx, ci) in column_indices.iter().enumerate() {
1595 if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) {
1596 right_allowed.insert(output_idx);
1597 }
1598 }
1599 }
1600 JoinType::RightSemi | JoinType::RightAnti => {
1601 let right_key_indices: HashSet<usize> = self
1602 .on
1603 .iter()
1604 .filter_map(|(_, right_key)| {
1605 right_key.downcast_ref::<Column>().map(|c| c.index())
1606 })
1607 .collect();
1608 for (output_idx, ci) in column_indices.iter().enumerate() {
1609 if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index)
1610 {
1611 left_allowed.insert(output_idx);
1612 }
1613 }
1614 }
1615 _ => {}
1616 }
1617
1618 let left_child = if left_preserved {
1619 ChildFilterDescription::from_child_with_allowed_indices(
1620 &parent_filters,
1621 left_allowed,
1622 self.left(),
1623 )?
1624 } else {
1625 ChildFilterDescription::all_unsupported(&parent_filters)
1626 };
1627
1628 let mut right_child = if right_preserved {
1629 ChildFilterDescription::from_child_with_allowed_indices(
1630 &parent_filters,
1631 right_allowed,
1632 self.right(),
1633 )?
1634 } else {
1635 ChildFilterDescription::all_unsupported(&parent_filters)
1636 };
1637
1638 if phase == FilterPushdownPhase::Post
1640 && self.allow_join_dynamic_filter_pushdown(config)
1641 {
1642 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1644 right_child = right_child.with_self_filter(dynamic_filter);
1645 }
1646
1647 Ok(FilterDescription::new()
1648 .with_child(left_child)
1649 .with_child(right_child))
1650 }
1651
1652 fn handle_child_pushdown_result(
1653 &self,
1654 _phase: FilterPushdownPhase,
1655 child_pushdown_result: ChildPushdownResult,
1656 _config: &ConfigOptions,
1657 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1658 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1659 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1663 let predicate = Arc::clone(&filter.predicate);
1666 if let Ok(dynamic_filter) =
1667 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1668 {
1669 let new_node = self
1671 .builder()
1672 .with_dynamic_filter(Some(HashJoinExecDynamicFilter {
1673 filter: dynamic_filter,
1674 build_accumulator: OnceLock::new(),
1675 }))
1676 .build_exec()?;
1677 result = result.with_updated_node(new_node);
1678 }
1679 }
1680 Ok(result)
1681 }
1682
1683 fn supports_limit_pushdown(&self) -> bool {
1684 false
1688 }
1689
1690 fn fetch(&self) -> Option<usize> {
1691 self.fetch
1692 }
1693
1694 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1695 self.builder()
1696 .with_fetch(limit)
1697 .build()
1698 .ok()
1699 .map(|exec| Arc::new(exec) as _)
1700 }
1701}
1702
1703fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
1709 match join_type {
1710 JoinType::Inner => (true, true),
1711 JoinType::Left => (true, false),
1712 JoinType::Right => (false, true),
1713 JoinType::Full => (false, false),
1714 JoinType::LeftSemi | JoinType::LeftAnti => (true, true),
1718 JoinType::RightSemi | JoinType::RightAnti => (true, true),
1719 JoinType::LeftMark => (true, false),
1720 JoinType::RightMark => (false, true),
1721 }
1722}
1723
1724struct CollectLeftAccumulator {
1734 expr: Arc<dyn PhysicalExpr>,
1736 min: MinAccumulator,
1738 max: MaxAccumulator,
1740}
1741
1742impl CollectLeftAccumulator {
1743 fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1752 fn dictionary_value_type(data_type: &DataType) -> DataType {
1754 match data_type {
1755 DataType::Dictionary(_, value_type) => {
1756 dictionary_value_type(value_type.as_ref())
1757 }
1758 _ => data_type.clone(),
1759 }
1760 }
1761
1762 let data_type = expr
1763 .data_type(schema)
1764 .map(|dt| dictionary_value_type(&dt))?;
1766 Ok(Self {
1767 expr,
1768 min: MinAccumulator::try_new(&data_type)?,
1769 max: MaxAccumulator::try_new(&data_type)?,
1770 })
1771 }
1772
1773 fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1784 let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1785 self.min.update_batch(std::slice::from_ref(&array))?;
1786 self.max.update_batch(std::slice::from_ref(&array))?;
1787 Ok(())
1788 }
1789
1790 fn evaluate(mut self) -> Result<ColumnBounds> {
1797 Ok(ColumnBounds::new(
1798 self.min.evaluate()?,
1799 self.max.evaluate()?,
1800 ))
1801 }
1802}
1803
1804struct BuildSideState {
1806 batches: Vec<RecordBatch>,
1807 num_rows: usize,
1808 metrics: BuildProbeJoinMetrics,
1809 reservation: MemoryReservation,
1810 bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1811}
1812
1813impl BuildSideState {
1814 fn try_new(
1816 metrics: BuildProbeJoinMetrics,
1817 reservation: MemoryReservation,
1818 on_left: Vec<Arc<dyn PhysicalExpr>>,
1819 schema: &SchemaRef,
1820 should_compute_dynamic_filters: bool,
1821 ) -> Result<Self> {
1822 Ok(Self {
1823 batches: Vec::new(),
1824 num_rows: 0,
1825 metrics,
1826 reservation,
1827 bounds_accumulators: should_compute_dynamic_filters
1828 .then(|| {
1829 on_left
1830 .into_iter()
1831 .map(|expr| CollectLeftAccumulator::try_new(expr, schema))
1832 .collect::<Result<Vec<_>>>()
1833 })
1834 .transpose()?,
1835 })
1836 }
1837}
1838
1839fn should_collect_min_max_for_perfect_hash(
1840 on_left: &[PhysicalExprRef],
1841 schema: &SchemaRef,
1842) -> Result<bool> {
1843 if on_left.len() != 1 {
1844 return Ok(false);
1845 }
1846
1847 let expr = &on_left[0];
1848 let data_type = expr.data_type(schema)?;
1849 Ok(ArrayMap::is_supported_type(&data_type))
1850}
1851
1852#[expect(clippy::too_many_arguments)]
1881async fn collect_left_input(
1882 random_state: RandomState,
1883 left_stream: SendableRecordBatchStream,
1884 on_left: Vec<PhysicalExprRef>,
1885 metrics: BuildProbeJoinMetrics,
1886 reservation: MemoryReservation,
1887 with_visited_indices_bitmap: bool,
1888 probe_threads_count: usize,
1889 should_compute_dynamic_filters: bool,
1890 config: Arc<ConfigOptions>,
1891 null_equality: NullEquality,
1892 array_map_created_count: Count,
1893) -> Result<JoinLeftData> {
1894 let schema = left_stream.schema();
1895
1896 let should_collect_min_max_for_phj =
1897 should_collect_min_max_for_perfect_hash(&on_left, &schema)?;
1898
1899 let initial = BuildSideState::try_new(
1900 metrics,
1901 reservation,
1902 on_left.clone(),
1903 &schema,
1904 should_compute_dynamic_filters || should_collect_min_max_for_phj,
1905 )?;
1906
1907 let state = left_stream
1908 .try_fold(initial, |mut state, batch| async move {
1909 if let Some(ref mut accumulators) = state.bounds_accumulators {
1911 for accumulator in accumulators {
1912 accumulator.update_batch(&batch)?;
1913 }
1914 }
1915
1916 let batch_size = get_record_batch_memory_size(&batch);
1918 state.reservation.try_grow(batch_size)?;
1920 state.metrics.build_mem_used.add(batch_size);
1922 state.metrics.build_input_batches.add(1);
1923 state.metrics.build_input_rows.add(batch.num_rows());
1924 state.num_rows += batch.num_rows();
1926 state.batches.push(batch);
1928 Ok(state)
1929 })
1930 .await?;
1931
1932 let BuildSideState {
1934 batches,
1935 num_rows,
1936 metrics,
1937 mut reservation,
1938 bounds_accumulators,
1939 } = state;
1940
1941 let mut bounds = match bounds_accumulators {
1943 Some(accumulators) if num_rows > 0 => {
1944 let bounds = accumulators
1945 .into_iter()
1946 .map(CollectLeftAccumulator::evaluate)
1947 .collect::<Result<Vec<_>>>()?;
1948 Some(PartitionBounds::new(bounds))
1949 }
1950 _ => None,
1951 };
1952
1953 let (join_hash_map, batch, left_values) =
1954 if let Some((array_map, batch, left_value)) = try_create_array_map(
1955 &bounds,
1956 &schema,
1957 &batches,
1958 &on_left,
1959 &mut reservation,
1960 config.execution.perfect_hash_join_small_build_threshold,
1961 config.execution.perfect_hash_join_min_key_density,
1962 null_equality,
1963 )? {
1964 array_map_created_count.add(1);
1965 metrics.build_mem_used.add(array_map.size());
1966
1967 (Map::ArrayMap(array_map), batch, left_value)
1968 } else {
1969 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1972 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1973
1974 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1978 let estimated_hashtable_size =
1979 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1980 reservation.try_grow(estimated_hashtable_size)?;
1981 metrics.build_mem_used.add(estimated_hashtable_size);
1982 Box::new(JoinHashMapU64::with_capacity(num_rows))
1983 } else {
1984 let estimated_hashtable_size =
1985 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1986 reservation.try_grow(estimated_hashtable_size)?;
1987 metrics.build_mem_used.add(estimated_hashtable_size);
1988 Box::new(JoinHashMapU32::with_capacity(num_rows))
1989 };
1990
1991 let mut hashes_buffer = Vec::new();
1992 let mut offset = 0;
1993
1994 let batches_iter = batches.iter().rev();
1995
1996 for batch in batches_iter.clone() {
1998 hashes_buffer.clear();
1999 hashes_buffer.resize(batch.num_rows(), 0);
2000 update_hash(
2001 &on_left,
2002 batch,
2003 &mut *hashmap,
2004 offset,
2005 &random_state,
2006 &mut hashes_buffer,
2007 0,
2008 true,
2009 )?;
2010 offset += batch.num_rows();
2011 }
2012
2013 let batch = concat_batches(&schema, batches_iter.clone())?;
2015
2016 let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
2017
2018 (Map::HashMap(hashmap), batch, left_values)
2019 };
2020
2021 let visited_indices_bitmap = if with_visited_indices_bitmap {
2023 let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
2024 reservation.try_grow(bitmap_size)?;
2025 metrics.build_mem_used.add(bitmap_size);
2026
2027 let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
2028 bitmap_buffer.append_n(num_rows, false);
2029 bitmap_buffer
2030 } else {
2031 BooleanBufferBuilder::new(0)
2032 };
2033
2034 let map = Arc::new(join_hash_map);
2035
2036 let membership = if num_rows == 0 {
2037 PushdownStrategy::Empty
2038 } else {
2039 let estimated_size = left_values
2043 .iter()
2044 .map(|arr| arr.get_array_memory_size())
2045 .sum::<usize>();
2046 if left_values.is_empty()
2047 || left_values[0].is_empty()
2048 || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
2049 || map.num_of_distinct_key()
2050 > config
2051 .optimizer
2052 .hash_join_inlist_pushdown_max_distinct_values
2053 {
2054 PushdownStrategy::Map(Arc::clone(&map))
2055 } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
2056 PushdownStrategy::InList(in_list_values)
2057 } else {
2058 PushdownStrategy::Map(Arc::clone(&map))
2059 }
2060 };
2061
2062 if should_collect_min_max_for_phj && !should_compute_dynamic_filters {
2063 bounds = None;
2064 }
2065
2066 let data = JoinLeftData {
2067 map,
2068 batch,
2069 values: left_values,
2070 visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
2071 probe_threads_counter: AtomicUsize::new(probe_threads_count),
2072 _reservation: reservation,
2073 bounds,
2074 membership,
2075 probe_side_non_empty: AtomicBool::new(false),
2076 probe_side_has_null: AtomicBool::new(false),
2077 };
2078
2079 Ok(data)
2080}
2081
2082#[cfg(test)]
2083mod tests {
2084 use super::*;
2085
2086 fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) {
2087 if use_phj {
2088 assert!(
2089 metrics
2090 .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2091 .expect("should have array_map_created_count metrics")
2092 .as_usize()
2093 >= 1
2094 );
2095 } else {
2096 assert_eq!(
2097 metrics
2098 .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
2099 .map(|v| v.as_usize())
2100 .unwrap_or(0),
2101 0
2102 )
2103 }
2104 }
2105
2106 fn build_schema_and_on() -> Result<(SchemaRef, SchemaRef, JoinOn)> {
2107 let left_schema = Arc::new(Schema::new(vec![
2108 Field::new("a1", DataType::Int32, true),
2109 Field::new("b1", DataType::Int32, true),
2110 ]));
2111 let right_schema = Arc::new(Schema::new(vec![
2112 Field::new("a2", DataType::Int32, true),
2113 Field::new("b1", DataType::Int32, true),
2114 ]));
2115 let on = vec![(
2116 Arc::new(Column::new_with_schema("b1", &left_schema)?) as _,
2117 Arc::new(Column::new_with_schema("b1", &right_schema)?) as _,
2118 )];
2119 Ok((left_schema, right_schema, on))
2120 }
2121
2122 use crate::coalesce_partitions::CoalescePartitionsExec;
2123 use crate::joins::hash_join::stream::lookup_join_hashmap;
2124 use crate::test::{TestMemoryExec, assert_join_metrics};
2125 use crate::{
2126 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2127 test::exec::MockExec,
2128 };
2129
2130 use arrow::array::{
2131 Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
2132 };
2133 use arrow::buffer::NullBuffer;
2134 use arrow::datatypes::{DataType, Field};
2135 use datafusion_common::hash_utils::create_hashes;
2136 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
2137 use datafusion_common::{
2138 ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
2139 exec_err, internal_err,
2140 };
2141 use datafusion_execution::config::SessionConfig;
2142 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2143 use datafusion_expr::Operator;
2144 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2145 use hashbrown::HashTable;
2146 use insta::{allow_duplicates, assert_snapshot};
2147 use rstest::*;
2148 use rstest_reuse::*;
2149
2150 fn div_ceil(a: usize, b: usize) -> usize {
2151 a.div_ceil(b)
2152 }
2153
2154 #[template]
2155 #[rstest]
2156 fn hash_join_exec_configs(
2157 #[values(8192, 10, 5, 2, 1)] batch_size: usize,
2158 #[values(true, false)] use_perfect_hash_join_as_possible: bool,
2159 ) {
2160 }
2161
2162 fn prepare_task_ctx(
2163 batch_size: usize,
2164 use_perfect_hash_join_as_possible: bool,
2165 ) -> Arc<TaskContext> {
2166 let mut session_config = SessionConfig::default().with_batch_size(batch_size);
2167
2168 if use_perfect_hash_join_as_possible {
2169 session_config
2170 .options_mut()
2171 .execution
2172 .perfect_hash_join_small_build_threshold = 819200;
2173 session_config
2174 .options_mut()
2175 .execution
2176 .perfect_hash_join_min_key_density = 0.0;
2177 } else {
2178 session_config
2179 .options_mut()
2180 .execution
2181 .perfect_hash_join_small_build_threshold = 0;
2182 session_config
2183 .options_mut()
2184 .execution
2185 .perfect_hash_join_min_key_density = f64::INFINITY;
2186 }
2187 Arc::new(TaskContext::default().with_session_config(session_config))
2188 }
2189
2190 fn build_table(
2191 a: (&str, &Vec<i32>),
2192 b: (&str, &Vec<i32>),
2193 c: (&str, &Vec<i32>),
2194 ) -> Arc<dyn ExecutionPlan> {
2195 let batch = build_table_i32(a, b, c);
2196 let schema = batch.schema();
2197 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2198 }
2199
2200 fn build_table_two_cols(
2202 a: (&str, &Vec<Option<i32>>),
2203 b: (&str, &Vec<Option<i32>>),
2204 ) -> Arc<dyn ExecutionPlan> {
2205 let schema = Arc::new(Schema::new(vec![
2206 Field::new(a.0, DataType::Int32, true),
2207 Field::new(b.0, DataType::Int32, true),
2208 ]));
2209 let batch = RecordBatch::try_new(
2210 Arc::clone(&schema),
2211 vec![
2212 Arc::new(Int32Array::from(a.1.clone())),
2213 Arc::new(Int32Array::from(b.1.clone())),
2214 ],
2215 )
2216 .unwrap();
2217 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
2218 }
2219
2220 fn join(
2221 left: Arc<dyn ExecutionPlan>,
2222 right: Arc<dyn ExecutionPlan>,
2223 on: JoinOn,
2224 join_type: &JoinType,
2225 null_equality: NullEquality,
2226 ) -> Result<HashJoinExec> {
2227 HashJoinExec::try_new(
2228 left,
2229 right,
2230 on,
2231 None,
2232 join_type,
2233 None,
2234 PartitionMode::CollectLeft,
2235 null_equality,
2236 false,
2237 )
2238 }
2239
2240 fn join_with_filter(
2241 left: Arc<dyn ExecutionPlan>,
2242 right: Arc<dyn ExecutionPlan>,
2243 on: JoinOn,
2244 filter: JoinFilter,
2245 join_type: &JoinType,
2246 null_equality: NullEquality,
2247 ) -> Result<HashJoinExec> {
2248 HashJoinExec::try_new(
2249 left,
2250 right,
2251 on,
2252 Some(filter),
2253 join_type,
2254 None,
2255 PartitionMode::CollectLeft,
2256 null_equality,
2257 false,
2258 )
2259 }
2260
2261 fn empty_build_with_probe_error_inputs()
2262 -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2263 let left_batch =
2264 build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
2265 let left_schema = left_batch.schema();
2266 let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
2267 &[vec![left_batch]],
2268 Arc::clone(&left_schema),
2269 None,
2270 )
2271 .unwrap();
2272
2273 let err = exec_err!("bad data error");
2274 let right_batch =
2275 build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2276 let right_schema = right_batch.schema();
2277 let on = vec![(
2278 Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
2279 Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
2280 )];
2281 let right: Arc<dyn ExecutionPlan> = Arc::new(
2282 MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
2283 );
2284
2285 (left, right, on)
2286 }
2287
2288 async fn assert_empty_build_probe_behavior(
2289 join_types: &[JoinType],
2290 expect_probe_error: bool,
2291 with_filter: bool,
2292 ) {
2293 let (left, right, on) = empty_build_with_probe_error_inputs();
2294 let filter = prepare_join_filter();
2295
2296 for join_type in join_types {
2297 let join = if with_filter {
2298 join_with_filter(
2299 Arc::clone(&left),
2300 Arc::clone(&right),
2301 on.clone(),
2302 filter.clone(),
2303 join_type,
2304 NullEquality::NullEqualsNothing,
2305 )
2306 .unwrap()
2307 } else {
2308 join(
2309 Arc::clone(&left),
2310 Arc::clone(&right),
2311 on.clone(),
2312 join_type,
2313 NullEquality::NullEqualsNothing,
2314 )
2315 .unwrap()
2316 };
2317
2318 let result = common::collect(
2319 join.execute(0, Arc::new(TaskContext::default())).unwrap(),
2320 )
2321 .await;
2322
2323 if expect_probe_error {
2324 let result_string = result.unwrap_err().to_string();
2325 assert!(
2326 result_string.contains("bad data error"),
2327 "actual: {result_string}"
2328 );
2329 } else {
2330 let batches = result.unwrap();
2331 assert!(
2332 batches.is_empty(),
2333 "expected no output batches for {join_type}, got {batches:?}"
2334 );
2335 }
2336 }
2337 }
2338
2339 fn hash_join_with_dynamic_filter(
2340 left: Arc<dyn ExecutionPlan>,
2341 right: Arc<dyn ExecutionPlan>,
2342 on: JoinOn,
2343 join_type: JoinType,
2344 ) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2345 hash_join_with_dynamic_filter_and_mode(
2346 left,
2347 right,
2348 on,
2349 join_type,
2350 PartitionMode::CollectLeft,
2351 )
2352 }
2353
2354 fn hash_join_with_dynamic_filter_and_mode(
2355 left: Arc<dyn ExecutionPlan>,
2356 right: Arc<dyn ExecutionPlan>,
2357 on: JoinOn,
2358 join_type: JoinType,
2359 mode: PartitionMode,
2360 ) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2361 let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
2362 let mut join = HashJoinExec::try_new(
2363 left,
2364 right,
2365 on,
2366 None,
2367 &join_type,
2368 None,
2369 mode,
2370 NullEquality::NullEqualsNothing,
2371 false,
2372 )?;
2373 join.dynamic_filter = Some(HashJoinExecDynamicFilter {
2374 filter: Arc::clone(&dynamic_filter),
2375 build_accumulator: OnceLock::new(),
2376 });
2377
2378 Ok((join, dynamic_filter))
2379 }
2380
2381 async fn join_collect(
2382 left: Arc<dyn ExecutionPlan>,
2383 right: Arc<dyn ExecutionPlan>,
2384 on: JoinOn,
2385 join_type: &JoinType,
2386 null_equality: NullEquality,
2387 context: Arc<TaskContext>,
2388 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2389 let join = join(left, right, on, join_type, null_equality)?;
2390 let columns_header = columns(&join.schema());
2391
2392 let stream = join.execute(0, context)?;
2393 let batches = common::collect(stream).await?;
2394 let metrics = join.metrics().unwrap();
2395
2396 Ok((columns_header, batches, metrics))
2397 }
2398
2399 async fn partitioned_join_collect(
2400 left: Arc<dyn ExecutionPlan>,
2401 right: Arc<dyn ExecutionPlan>,
2402 on: JoinOn,
2403 join_type: &JoinType,
2404 null_equality: NullEquality,
2405 context: Arc<TaskContext>,
2406 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2407 join_collect_with_partition_mode(
2408 left,
2409 right,
2410 on,
2411 join_type,
2412 PartitionMode::Partitioned,
2413 null_equality,
2414 context,
2415 )
2416 .await
2417 }
2418
2419 async fn join_collect_with_partition_mode(
2420 left: Arc<dyn ExecutionPlan>,
2421 right: Arc<dyn ExecutionPlan>,
2422 on: JoinOn,
2423 join_type: &JoinType,
2424 partition_mode: PartitionMode,
2425 null_equality: NullEquality,
2426 context: Arc<TaskContext>,
2427 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2428 let partition_count = 4;
2429
2430 let (left_expr, right_expr) = on
2431 .iter()
2432 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
2433 .unzip();
2434
2435 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2436 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
2437 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2438 left,
2439 Partitioning::Hash(left_expr, partition_count),
2440 )?),
2441 PartitionMode::Auto => {
2442 return internal_err!("Unexpected PartitionMode::Auto in join tests");
2443 }
2444 };
2445
2446 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
2447 PartitionMode::CollectLeft => {
2448 let partition_column_name = right.schema().field(0).name().clone();
2449 let partition_expr = vec![Arc::new(Column::new_with_schema(
2450 &partition_column_name,
2451 &right.schema(),
2452 )?) as _];
2453 Arc::new(RepartitionExec::try_new(
2454 right,
2455 Partitioning::Hash(partition_expr, partition_count),
2456 )?) as _
2457 }
2458 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
2459 right,
2460 Partitioning::Hash(right_expr, partition_count),
2461 )?),
2462 PartitionMode::Auto => {
2463 return internal_err!("Unexpected PartitionMode::Auto in join tests");
2464 }
2465 };
2466
2467 let join = HashJoinExec::try_new(
2468 left_repartitioned,
2469 right_repartitioned,
2470 on,
2471 None,
2472 join_type,
2473 None,
2474 partition_mode,
2475 null_equality,
2476 false,
2477 )?;
2478
2479 let columns = columns(&join.schema());
2480
2481 let mut batches = vec![];
2482 for i in 0..partition_count {
2483 let stream = join.execute(i, Arc::clone(&context))?;
2484 let more_batches = common::collect(stream).await?;
2485 batches.extend(
2486 more_batches
2487 .into_iter()
2488 .filter(|b| b.num_rows() > 0)
2489 .collect::<Vec<_>>(),
2490 );
2491 }
2492 let metrics = join.metrics().unwrap();
2493
2494 Ok((columns, batches, metrics))
2495 }
2496
2497 #[apply(hash_join_exec_configs)]
2498 #[tokio::test]
2499 async fn join_inner_one(
2500 batch_size: usize,
2501 use_perfect_hash_join_as_possible: bool,
2502 ) -> Result<()> {
2503 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2504 let left = build_table(
2505 ("a1", &vec![1, 2, 3]),
2506 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2508 );
2509 let right = build_table(
2510 ("a2", &vec![10, 20, 30]),
2511 ("b1", &vec![4, 5, 6]),
2512 ("c2", &vec![70, 80, 90]),
2513 );
2514
2515 let on = vec![(
2516 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2517 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2518 )];
2519
2520 let (columns, batches, metrics) = join_collect(
2521 Arc::clone(&left),
2522 Arc::clone(&right),
2523 on.clone(),
2524 &JoinType::Inner,
2525 NullEquality::NullEqualsNothing,
2526 task_ctx,
2527 )
2528 .await?;
2529
2530 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2531
2532 allow_duplicates! {
2533 assert_snapshot!(batches_to_string(&batches), @r"
2535 +----+----+----+----+----+----+
2536 | a1 | b1 | c1 | a2 | b1 | c2 |
2537 +----+----+----+----+----+----+
2538 | 1 | 4 | 7 | 10 | 4 | 70 |
2539 | 2 | 5 | 8 | 20 | 5 | 80 |
2540 | 3 | 5 | 9 | 20 | 5 | 80 |
2541 +----+----+----+----+----+----+
2542 ");
2543 }
2544
2545 assert_join_metrics!(metrics, 3);
2546 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2547
2548 Ok(())
2549 }
2550
2551 #[apply(hash_join_exec_configs)]
2552 #[tokio::test]
2553 async fn partitioned_join_inner_one(
2554 batch_size: usize,
2555 use_perfect_hash_join_as_possible: bool,
2556 ) -> Result<()> {
2557 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2558 let left = build_table(
2559 ("a1", &vec![1, 2, 3]),
2560 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2562 );
2563 let right = build_table(
2564 ("a2", &vec![10, 20, 30]),
2565 ("b1", &vec![4, 5, 6]),
2566 ("c2", &vec![70, 80, 90]),
2567 );
2568 let on = vec![(
2569 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2570 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2571 )];
2572
2573 let (columns, batches, metrics) = partitioned_join_collect(
2574 Arc::clone(&left),
2575 Arc::clone(&right),
2576 on.clone(),
2577 &JoinType::Inner,
2578 NullEquality::NullEqualsNothing,
2579 task_ctx,
2580 )
2581 .await?;
2582
2583 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2584
2585 allow_duplicates! {
2586 assert_snapshot!(batches_to_sort_string(&batches), @r"
2587 +----+----+----+----+----+----+
2588 | a1 | b1 | c1 | a2 | b1 | c2 |
2589 +----+----+----+----+----+----+
2590 | 1 | 4 | 7 | 10 | 4 | 70 |
2591 | 2 | 5 | 8 | 20 | 5 | 80 |
2592 | 3 | 5 | 9 | 20 | 5 | 80 |
2593 +----+----+----+----+----+----+
2594 ");
2595 }
2596
2597 assert_join_metrics!(metrics, 3);
2598 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
2599
2600 Ok(())
2601 }
2602
2603 #[tokio::test]
2604 async fn join_inner_one_no_shared_column_names() -> Result<()> {
2605 let task_ctx = Arc::new(TaskContext::default());
2606 let left = build_table(
2607 ("a1", &vec![1, 2, 3]),
2608 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2610 );
2611 let right = build_table(
2612 ("a2", &vec![10, 20, 30]),
2613 ("b2", &vec![4, 5, 6]),
2614 ("c2", &vec![70, 80, 90]),
2615 );
2616 let on = vec![(
2617 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2618 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2619 )];
2620
2621 let (columns, batches, metrics) = join_collect(
2622 left,
2623 right,
2624 on,
2625 &JoinType::Inner,
2626 NullEquality::NullEqualsNothing,
2627 task_ctx,
2628 )
2629 .await?;
2630
2631 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2632
2633 allow_duplicates! {
2635 assert_snapshot!(batches_to_string(&batches), @r"
2636 +----+----+----+----+----+----+
2637 | a1 | b1 | c1 | a2 | b2 | c2 |
2638 +----+----+----+----+----+----+
2639 | 1 | 4 | 7 | 10 | 4 | 70 |
2640 | 2 | 5 | 8 | 20 | 5 | 80 |
2641 | 3 | 5 | 9 | 20 | 5 | 80 |
2642 +----+----+----+----+----+----+
2643 ");
2644 }
2645
2646 assert_join_metrics!(metrics, 3);
2647
2648 Ok(())
2649 }
2650
2651 #[tokio::test]
2652 async fn join_inner_one_randomly_ordered() -> Result<()> {
2653 let task_ctx = Arc::new(TaskContext::default());
2654 let left = build_table(
2655 ("a1", &vec![0, 3, 2, 1]),
2656 ("b1", &vec![4, 5, 5, 4]),
2657 ("c1", &vec![6, 9, 8, 7]),
2658 );
2659 let right = build_table(
2660 ("a2", &vec![20, 30, 10]),
2661 ("b2", &vec![5, 6, 4]),
2662 ("c2", &vec![80, 90, 70]),
2663 );
2664 let on = vec![(
2665 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2666 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2667 )];
2668
2669 let (columns, batches, metrics) = join_collect(
2670 left,
2671 right,
2672 on,
2673 &JoinType::Inner,
2674 NullEquality::NullEqualsNothing,
2675 task_ctx,
2676 )
2677 .await?;
2678
2679 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2680
2681 allow_duplicates! {
2683 assert_snapshot!(batches_to_string(&batches), @r"
2684 +----+----+----+----+----+----+
2685 | a1 | b1 | c1 | a2 | b2 | c2 |
2686 +----+----+----+----+----+----+
2687 | 3 | 5 | 9 | 20 | 5 | 80 |
2688 | 2 | 5 | 8 | 20 | 5 | 80 |
2689 | 0 | 4 | 6 | 10 | 4 | 70 |
2690 | 1 | 4 | 7 | 10 | 4 | 70 |
2691 +----+----+----+----+----+----+
2692 ");
2693 }
2694
2695 assert_join_metrics!(metrics, 4);
2696
2697 Ok(())
2698 }
2699
2700 #[apply(hash_join_exec_configs)]
2701 #[tokio::test]
2702 async fn join_inner_two(
2703 batch_size: usize,
2704 use_perfect_hash_join_as_possible: bool,
2705 ) -> Result<()> {
2706 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2707 let left = build_table(
2708 ("a1", &vec![1, 2, 2]),
2709 ("b2", &vec![1, 2, 2]),
2710 ("c1", &vec![7, 8, 9]),
2711 );
2712 let right = build_table(
2713 ("a1", &vec![1, 2, 3]),
2714 ("b2", &vec![1, 2, 2]),
2715 ("c2", &vec![70, 80, 90]),
2716 );
2717 let on = vec![
2718 (
2719 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2720 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2721 ),
2722 (
2723 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2724 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2725 ),
2726 ];
2727
2728 let (columns, batches, metrics) = join_collect(
2729 left,
2730 right,
2731 on,
2732 &JoinType::Inner,
2733 NullEquality::NullEqualsNothing,
2734 task_ctx,
2735 )
2736 .await?;
2737
2738 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2739
2740 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2741 let mut expected_batch_count = div_ceil(3, batch_size);
2744 if batch_size == 1 {
2745 expected_batch_count += 1;
2746 }
2747 expected_batch_count
2748 } else {
2749 div_ceil(9, batch_size)
2752 };
2753
2754 assert!(
2756 batches.len() <= expected_batch_count,
2757 "expected at most {expected_batch_count} batches, got {}",
2758 batches.len()
2759 );
2760
2761 allow_duplicates! {
2763 assert_snapshot!(batches_to_string(&batches), @r"
2764 +----+----+----+----+----+----+
2765 | a1 | b2 | c1 | a1 | b2 | c2 |
2766 +----+----+----+----+----+----+
2767 | 1 | 1 | 7 | 1 | 1 | 70 |
2768 | 2 | 2 | 8 | 2 | 2 | 80 |
2769 | 2 | 2 | 9 | 2 | 2 | 80 |
2770 +----+----+----+----+----+----+
2771 ");
2772 }
2773
2774 assert_join_metrics!(metrics, 3);
2775
2776 Ok(())
2777 }
2778
2779 #[apply(hash_join_exec_configs)]
2781 #[tokio::test]
2782 async fn join_inner_one_two_parts_left(
2783 batch_size: usize,
2784 use_perfect_hash_join_as_possible: bool,
2785 ) -> Result<()> {
2786 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2787 let batch1 = build_table_i32(
2788 ("a1", &vec![1, 2]),
2789 ("b2", &vec![1, 2]),
2790 ("c1", &vec![7, 8]),
2791 );
2792 let batch2 =
2793 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
2794 let schema = batch1.schema();
2795 let left =
2796 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2797 .unwrap();
2798 let left = Arc::new(CoalescePartitionsExec::new(left));
2799
2800 let right = build_table(
2801 ("a1", &vec![1, 2, 3]),
2802 ("b2", &vec![1, 2, 2]),
2803 ("c2", &vec![70, 80, 90]),
2804 );
2805 let on = vec![
2806 (
2807 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2808 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2809 ),
2810 (
2811 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2812 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2813 ),
2814 ];
2815
2816 let (columns, batches, metrics) = join_collect(
2817 left,
2818 right,
2819 on,
2820 &JoinType::Inner,
2821 NullEquality::NullEqualsNothing,
2822 task_ctx,
2823 )
2824 .await?;
2825
2826 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2827
2828 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2829 let mut expected_batch_count = div_ceil(3, batch_size);
2832 if batch_size == 1 {
2833 expected_batch_count += 1;
2834 }
2835 expected_batch_count
2836 } else {
2837 div_ceil(9, batch_size)
2840 };
2841
2842 assert!(
2844 batches.len() <= expected_batch_count,
2845 "expected at most {expected_batch_count} batches, got {}",
2846 batches.len()
2847 );
2848
2849 allow_duplicates! {
2851 assert_snapshot!(batches_to_string(&batches), @r"
2852 +----+----+----+----+----+----+
2853 | a1 | b2 | c1 | a1 | b2 | c2 |
2854 +----+----+----+----+----+----+
2855 | 1 | 1 | 7 | 1 | 1 | 70 |
2856 | 2 | 2 | 8 | 2 | 2 | 80 |
2857 | 2 | 2 | 9 | 2 | 2 | 80 |
2858 +----+----+----+----+----+----+
2859 ");
2860 }
2861
2862 assert_join_metrics!(metrics, 3);
2863
2864 Ok(())
2865 }
2866
2867 #[tokio::test]
2868 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2869 let task_ctx = Arc::new(TaskContext::default());
2870 let batch1 = build_table_i32(
2871 ("a1", &vec![0, 3]),
2872 ("b1", &vec![4, 5]),
2873 ("c1", &vec![6, 9]),
2874 );
2875 let batch2 = build_table_i32(
2876 ("a1", &vec![2, 1]),
2877 ("b1", &vec![5, 4]),
2878 ("c1", &vec![8, 7]),
2879 );
2880 let schema = batch1.schema();
2881
2882 let left =
2883 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2884 .unwrap();
2885 let left = Arc::new(CoalescePartitionsExec::new(left));
2886 let right = build_table(
2887 ("a2", &vec![20, 30, 10]),
2888 ("b2", &vec![5, 6, 4]),
2889 ("c2", &vec![80, 90, 70]),
2890 );
2891 let on = vec![(
2892 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2893 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2894 )];
2895
2896 let (columns, batches, metrics) = join_collect(
2897 left,
2898 right,
2899 on,
2900 &JoinType::Inner,
2901 NullEquality::NullEqualsNothing,
2902 task_ctx,
2903 )
2904 .await?;
2905
2906 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2907
2908 allow_duplicates! {
2910 assert_snapshot!(batches_to_string(&batches), @r"
2911 +----+----+----+----+----+----+
2912 | a1 | b1 | c1 | a2 | b2 | c2 |
2913 +----+----+----+----+----+----+
2914 | 3 | 5 | 9 | 20 | 5 | 80 |
2915 | 2 | 5 | 8 | 20 | 5 | 80 |
2916 | 0 | 4 | 6 | 10 | 4 | 70 |
2917 | 1 | 4 | 7 | 10 | 4 | 70 |
2918 +----+----+----+----+----+----+
2919 ");
2920 }
2921
2922 assert_join_metrics!(metrics, 4);
2923
2924 Ok(())
2925 }
2926
2927 #[apply(hash_join_exec_configs)]
2929 #[tokio::test]
2930 async fn join_inner_one_two_parts_right(
2931 batch_size: usize,
2932 use_perfect_hash_join_as_possible: bool,
2933 ) -> Result<()> {
2934 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
2935 let left = build_table(
2936 ("a1", &vec![1, 2, 3]),
2937 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2939 );
2940
2941 let batch1 = build_table_i32(
2942 ("a2", &vec![10, 20]),
2943 ("b1", &vec![4, 6]),
2944 ("c2", &vec![70, 80]),
2945 );
2946 let batch2 =
2947 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2948 let schema = batch1.schema();
2949 let right =
2950 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2951 .unwrap();
2952
2953 let on = vec![(
2954 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2955 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2956 )];
2957
2958 let join = join(
2959 left,
2960 right,
2961 on,
2962 &JoinType::Inner,
2963 NullEquality::NullEqualsNothing,
2964 )?;
2965
2966 let columns = columns(&join.schema());
2967 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2968
2969 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2971 let batches = common::collect(stream).await?;
2972
2973 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2974 let mut expected_batch_count = div_ceil(1, batch_size);
2977 if batch_size == 1 {
2978 expected_batch_count += 1;
2979 }
2980 expected_batch_count
2981 } else {
2982 div_ceil(6, batch_size)
2985 };
2986 assert!(
2988 batches.len() <= expected_batch_count,
2989 "expected at most {expected_batch_count} batches, got {}",
2990 batches.len()
2991 );
2992
2993 allow_duplicates! {
2995 assert_snapshot!(batches_to_string(&batches), @r"
2996 +----+----+----+----+----+----+
2997 | a1 | b1 | c1 | a2 | b1 | c2 |
2998 +----+----+----+----+----+----+
2999 | 1 | 4 | 7 | 10 | 4 | 70 |
3000 +----+----+----+----+----+----+
3001 ");
3002 }
3003
3004 let stream = join.execute(1, Arc::clone(&task_ctx))?;
3006 let batches = common::collect(stream).await?;
3007
3008 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
3009 div_ceil(2, batch_size)
3011 } else {
3012 div_ceil(3, batch_size)
3015 };
3016 assert!(
3018 batches.len() <= expected_batch_count,
3019 "expected at most {expected_batch_count} batches, got {}",
3020 batches.len()
3021 );
3022
3023 allow_duplicates! {
3025 assert_snapshot!(batches_to_string(&batches), @r"
3026 +----+----+----+----+----+----+
3027 | a1 | b1 | c1 | a2 | b1 | c2 |
3028 +----+----+----+----+----+----+
3029 | 2 | 5 | 8 | 30 | 5 | 90 |
3030 | 3 | 5 | 9 | 30 | 5 | 90 |
3031 +----+----+----+----+----+----+
3032 ");
3033 }
3034
3035 let metrics = join.metrics().unwrap();
3036 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3037
3038 Ok(())
3039 }
3040
3041 fn build_table_two_batches(
3042 a: (&str, &Vec<i32>),
3043 b: (&str, &Vec<i32>),
3044 c: (&str, &Vec<i32>),
3045 ) -> Arc<dyn ExecutionPlan> {
3046 let batch = build_table_i32(a, b, c);
3047 let schema = batch.schema();
3048 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
3049 }
3050
3051 #[apply(hash_join_exec_configs)]
3052 #[tokio::test]
3053 async fn join_left_multi_batch(
3054 batch_size: usize,
3055 use_perfect_hash_join_as_possible: bool,
3056 ) -> Result<()> {
3057 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3058 let left = build_table(
3059 ("a1", &vec![1, 2, 3]),
3060 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3062 );
3063 let right = build_table_two_batches(
3064 ("a2", &vec![10, 20, 30]),
3065 ("b1", &vec![4, 5, 6]),
3066 ("c2", &vec![70, 80, 90]),
3067 );
3068 let on = vec![(
3069 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3070 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3071 )];
3072
3073 let join = join(
3074 Arc::clone(&left),
3075 Arc::clone(&right),
3076 on.clone(),
3077 &JoinType::Left,
3078 NullEquality::NullEqualsNothing,
3079 )
3080 .unwrap();
3081
3082 let columns = columns(&join.schema());
3083 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3084
3085 let (_, batches, metrics) = join_collect(
3086 Arc::clone(&left),
3087 Arc::clone(&right),
3088 on.clone(),
3089 &JoinType::Left,
3090 NullEquality::NullEqualsNothing,
3091 task_ctx,
3092 )
3093 .await?;
3094
3095 allow_duplicates! {
3096 assert_snapshot!(batches_to_sort_string(&batches), @r"
3097 +----+----+----+----+----+----+
3098 | a1 | b1 | c1 | a2 | b1 | c2 |
3099 +----+----+----+----+----+----+
3100 | 1 | 4 | 7 | 10 | 4 | 70 |
3101 | 1 | 4 | 7 | 10 | 4 | 70 |
3102 | 2 | 5 | 8 | 20 | 5 | 80 |
3103 | 2 | 5 | 8 | 20 | 5 | 80 |
3104 | 3 | 7 | 9 | | | |
3105 +----+----+----+----+----+----+
3106 ");
3107 }
3108
3109 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3110 return Ok(());
3111 }
3112
3113 #[apply(hash_join_exec_configs)]
3114 #[tokio::test]
3115 async fn join_full_multi_batch(
3116 batch_size: usize,
3117 use_perfect_hash_join_as_possible: bool,
3118 ) {
3119 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3120 let left = build_table(
3121 ("a1", &vec![1, 2, 3]),
3122 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3124 );
3125 let right = build_table_two_batches(
3127 ("a2", &vec![10, 20, 30]),
3128 ("b2", &vec![4, 5, 6]),
3129 ("c2", &vec![70, 80, 90]),
3130 );
3131 let on = vec![(
3132 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3133 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3134 )];
3135
3136 let join = join(
3137 left,
3138 right,
3139 on,
3140 &JoinType::Full,
3141 NullEquality::NullEqualsNothing,
3142 )
3143 .unwrap();
3144
3145 let columns = columns(&join.schema());
3146 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3147
3148 let stream = join.execute(0, task_ctx).unwrap();
3149 let batches = common::collect(stream).await.unwrap();
3150 let metrics = join.metrics().unwrap();
3151
3152 allow_duplicates! {
3153 assert_snapshot!(batches_to_sort_string(&batches), @r"
3154 +----+----+----+----+----+----+
3155 | a1 | b1 | c1 | a2 | b2 | c2 |
3156 +----+----+----+----+----+----+
3157 | | | | 30 | 6 | 90 |
3158 | | | | 30 | 6 | 90 |
3159 | 1 | 4 | 7 | 10 | 4 | 70 |
3160 | 1 | 4 | 7 | 10 | 4 | 70 |
3161 | 2 | 5 | 8 | 20 | 5 | 80 |
3162 | 2 | 5 | 8 | 20 | 5 | 80 |
3163 | 3 | 7 | 9 | | | |
3164 +----+----+----+----+----+----+
3165 ");
3166 }
3167
3168 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3169 }
3170
3171 #[apply(hash_join_exec_configs)]
3172 #[tokio::test]
3173 async fn join_left_empty_right(
3174 batch_size: usize,
3175 use_perfect_hash_join_as_possible: bool,
3176 ) {
3177 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3178 let left = build_table(
3179 ("a1", &vec![1, 2, 3]),
3180 ("b1", &vec![4, 5, 7]),
3181 ("c1", &vec![7, 8, 9]),
3182 );
3183 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
3184 let on = vec![(
3185 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3186 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
3187 )];
3188 let schema = right.schema();
3189 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3190 let join = join(
3191 left,
3192 right,
3193 on,
3194 &JoinType::Left,
3195 NullEquality::NullEqualsNothing,
3196 )
3197 .unwrap();
3198
3199 let columns = columns(&join.schema());
3200 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3201
3202 let stream = join.execute(0, task_ctx).unwrap();
3203 let batches = common::collect(stream).await.unwrap();
3204 let metrics = join.metrics().unwrap();
3205
3206 allow_duplicates! {
3207 assert_snapshot!(batches_to_sort_string(&batches), @r"
3208 +----+----+----+----+----+----+
3209 | a1 | b1 | c1 | a2 | b1 | c2 |
3210 +----+----+----+----+----+----+
3211 | 1 | 4 | 7 | | | |
3212 | 2 | 5 | 8 | | | |
3213 | 3 | 7 | 9 | | | |
3214 +----+----+----+----+----+----+
3215 ");
3216 }
3217
3218 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3219 }
3220
3221 #[apply(hash_join_exec_configs)]
3222 #[tokio::test]
3223 async fn join_full_empty_right(
3224 batch_size: usize,
3225 use_perfect_hash_join_as_possible: bool,
3226 ) {
3227 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3228 let left = build_table(
3229 ("a1", &vec![1, 2, 3]),
3230 ("b1", &vec![4, 5, 7]),
3231 ("c1", &vec![7, 8, 9]),
3232 );
3233 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
3234 let on = vec![(
3235 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3236 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3237 )];
3238 let schema = right.schema();
3239 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
3240 let join = join(
3241 left,
3242 right,
3243 on,
3244 &JoinType::Full,
3245 NullEquality::NullEqualsNothing,
3246 )
3247 .unwrap();
3248
3249 let columns = columns(&join.schema());
3250 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3251
3252 let stream = join.execute(0, task_ctx).unwrap();
3253 let batches = common::collect(stream).await.unwrap();
3254 let metrics = join.metrics().unwrap();
3255
3256 allow_duplicates! {
3257 assert_snapshot!(batches_to_sort_string(&batches), @r"
3258 +----+----+----+----+----+----+
3259 | a1 | b1 | c1 | a2 | b2 | c2 |
3260 +----+----+----+----+----+----+
3261 | 1 | 4 | 7 | | | |
3262 | 2 | 5 | 8 | | | |
3263 | 3 | 7 | 9 | | | |
3264 +----+----+----+----+----+----+
3265 ");
3266 }
3267
3268 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3269 }
3270
3271 #[apply(hash_join_exec_configs)]
3272 #[tokio::test]
3273 async fn join_left_one(
3274 batch_size: usize,
3275 use_perfect_hash_join_as_possible: bool,
3276 ) -> Result<()> {
3277 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3278 let left = build_table(
3279 ("a1", &vec![1, 2, 3]),
3280 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3282 );
3283 let right = build_table(
3284 ("a2", &vec![10, 20, 30]),
3285 ("b1", &vec![4, 5, 6]),
3286 ("c2", &vec![70, 80, 90]),
3287 );
3288 let on = vec![(
3289 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3290 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3291 )];
3292
3293 let (columns, batches, metrics) = join_collect(
3294 Arc::clone(&left),
3295 Arc::clone(&right),
3296 on.clone(),
3297 &JoinType::Left,
3298 NullEquality::NullEqualsNothing,
3299 task_ctx,
3300 )
3301 .await?;
3302
3303 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3304
3305 allow_duplicates! {
3306 assert_snapshot!(batches_to_sort_string(&batches), @r"
3307 +----+----+----+----+----+----+
3308 | a1 | b1 | c1 | a2 | b1 | c2 |
3309 +----+----+----+----+----+----+
3310 | 1 | 4 | 7 | 10 | 4 | 70 |
3311 | 2 | 5 | 8 | 20 | 5 | 80 |
3312 | 3 | 7 | 9 | | | |
3313 +----+----+----+----+----+----+
3314 ");
3315 }
3316
3317 assert_join_metrics!(metrics, 3);
3318 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3319
3320 Ok(())
3321 }
3322
3323 #[apply(hash_join_exec_configs)]
3324 #[tokio::test]
3325 async fn partitioned_join_left_one(
3326 batch_size: usize,
3327 use_perfect_hash_join_as_possible: bool,
3328 ) -> Result<()> {
3329 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3330 let left = build_table(
3331 ("a1", &vec![1, 2, 3]),
3332 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3334 );
3335 let right = build_table(
3336 ("a2", &vec![10, 20, 30]),
3337 ("b1", &vec![4, 5, 6]),
3338 ("c2", &vec![70, 80, 90]),
3339 );
3340 let on = vec![(
3341 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3342 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3343 )];
3344
3345 let (columns, batches, metrics) = partitioned_join_collect(
3346 Arc::clone(&left),
3347 Arc::clone(&right),
3348 on.clone(),
3349 &JoinType::Left,
3350 NullEquality::NullEqualsNothing,
3351 task_ctx,
3352 )
3353 .await?;
3354
3355 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3356
3357 allow_duplicates! {
3358 assert_snapshot!(batches_to_sort_string(&batches), @r"
3359 +----+----+----+----+----+----+
3360 | a1 | b1 | c1 | a2 | b1 | c2 |
3361 +----+----+----+----+----+----+
3362 | 1 | 4 | 7 | 10 | 4 | 70 |
3363 | 2 | 5 | 8 | 20 | 5 | 80 |
3364 | 3 | 7 | 9 | | | |
3365 +----+----+----+----+----+----+
3366 ");
3367 }
3368
3369 assert_join_metrics!(metrics, 3);
3370 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3371
3372 Ok(())
3373 }
3374
3375 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
3376 build_table(
3379 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
3380 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
3381 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
3382 )
3383 }
3384
3385 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
3386 build_table(
3389 ("a2", &vec![8, 12, 6, 2, 10, 4]),
3390 ("b2", &vec![8, 10, 6, 2, 10, 4]),
3391 ("c2", &vec![20, 40, 60, 80, 100, 120]),
3392 )
3393 }
3394
3395 #[apply(hash_join_exec_configs)]
3396 #[tokio::test]
3397 async fn join_left_semi(
3398 batch_size: usize,
3399 use_perfect_hash_join_as_possible: bool,
3400 ) -> Result<()> {
3401 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3402 let left = build_semi_anti_left_table();
3403 let right = build_semi_anti_right_table();
3404 let on = vec![(
3406 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3407 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3408 )];
3409
3410 let join = join(
3411 left,
3412 right,
3413 on,
3414 &JoinType::LeftSemi,
3415 NullEquality::NullEqualsNothing,
3416 )?;
3417
3418 let columns = columns(&join.schema());
3419 assert_eq!(columns, vec!["a1", "b1", "c1"]);
3420
3421 let stream = join.execute(0, task_ctx)?;
3422 let batches = common::collect(stream).await?;
3423
3424 allow_duplicates! {
3426 assert_snapshot!(batches_to_sort_string(&batches), @r"
3427 +----+----+-----+
3428 | a1 | b1 | c1 |
3429 +----+----+-----+
3430 | 11 | 8 | 110 |
3431 | 13 | 10 | 130 |
3432 | 9 | 8 | 90 |
3433 +----+----+-----+
3434 ");
3435 }
3436
3437 let metrics = join.metrics().unwrap();
3438 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3439
3440 Ok(())
3441 }
3442
3443 #[apply(hash_join_exec_configs)]
3444 #[tokio::test]
3445 async fn join_left_semi_with_filter(
3446 batch_size: usize,
3447 use_perfect_hash_join_as_possible: bool,
3448 ) -> Result<()> {
3449 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3450 let left = build_semi_anti_left_table();
3451 let right = build_semi_anti_right_table();
3452
3453 let on = vec![(
3455 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3456 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3457 )];
3458
3459 let column_indices = vec![ColumnIndex {
3460 index: 0,
3461 side: JoinSide::Right,
3462 }];
3463 let intermediate_schema =
3464 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3465
3466 let filter_expression = Arc::new(BinaryExpr::new(
3467 Arc::new(Column::new("x", 0)),
3468 Operator::NotEq,
3469 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3470 )) as Arc<dyn PhysicalExpr>;
3471
3472 let filter = JoinFilter::new(
3473 filter_expression,
3474 column_indices.clone(),
3475 Arc::new(intermediate_schema.clone()),
3476 );
3477
3478 let join = join_with_filter(
3479 Arc::clone(&left),
3480 Arc::clone(&right),
3481 on.clone(),
3482 filter,
3483 &JoinType::LeftSemi,
3484 NullEquality::NullEqualsNothing,
3485 )?;
3486
3487 let columns_header = columns(&join.schema());
3488 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
3489
3490 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3491 let batches = common::collect(stream).await?;
3492
3493 allow_duplicates! {
3494 assert_snapshot!(batches_to_sort_string(&batches), @r"
3495 +----+----+-----+
3496 | a1 | b1 | c1 |
3497 +----+----+-----+
3498 | 11 | 8 | 110 |
3499 | 13 | 10 | 130 |
3500 | 9 | 8 | 90 |
3501 +----+----+-----+
3502 ");
3503 }
3504
3505 let metrics = join.metrics().unwrap();
3506 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3507
3508 let filter_expression = Arc::new(BinaryExpr::new(
3510 Arc::new(Column::new("x", 0)),
3511 Operator::Gt,
3512 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3513 )) as Arc<dyn PhysicalExpr>;
3514 let filter = JoinFilter::new(
3515 filter_expression,
3516 column_indices,
3517 Arc::new(intermediate_schema),
3518 );
3519
3520 let join = join_with_filter(
3521 left,
3522 right,
3523 on,
3524 filter,
3525 &JoinType::LeftSemi,
3526 NullEquality::NullEqualsNothing,
3527 )?;
3528
3529 let columns_header = columns(&join.schema());
3530 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3531
3532 let stream = join.execute(0, task_ctx)?;
3533 let batches = common::collect(stream).await?;
3534
3535 allow_duplicates! {
3536 assert_snapshot!(batches_to_sort_string(&batches), @r"
3537 +----+----+-----+
3538 | a1 | b1 | c1 |
3539 +----+----+-----+
3540 | 13 | 10 | 130 |
3541 +----+----+-----+
3542 ");
3543 }
3544
3545 let metrics = join.metrics().unwrap();
3546 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3547
3548 Ok(())
3549 }
3550
3551 #[apply(hash_join_exec_configs)]
3552 #[tokio::test]
3553 async fn join_right_semi(
3554 batch_size: usize,
3555 use_perfect_hash_join_as_possible: bool,
3556 ) -> Result<()> {
3557 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3558 let left = build_semi_anti_left_table();
3559 let right = build_semi_anti_right_table();
3560
3561 let on = vec![(
3563 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3564 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3565 )];
3566
3567 let join = join(
3568 left,
3569 right,
3570 on,
3571 &JoinType::RightSemi,
3572 NullEquality::NullEqualsNothing,
3573 )?;
3574
3575 let columns = columns(&join.schema());
3576 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3577
3578 let stream = join.execute(0, task_ctx)?;
3579 let batches = common::collect(stream).await?;
3580
3581 allow_duplicates! {
3583 assert_snapshot!(batches_to_string(&batches), @r"
3584 +----+----+-----+
3585 | a2 | b2 | c2 |
3586 +----+----+-----+
3587 | 8 | 8 | 20 |
3588 | 12 | 10 | 40 |
3589 | 10 | 10 | 100 |
3590 +----+----+-----+
3591 ");
3592 }
3593
3594 let metrics = join.metrics().unwrap();
3595 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3596
3597 Ok(())
3598 }
3599
3600 #[apply(hash_join_exec_configs)]
3601 #[tokio::test]
3602 async fn join_right_semi_with_filter(
3603 batch_size: usize,
3604 use_perfect_hash_join_as_possible: bool,
3605 ) -> Result<()> {
3606 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3607 let left = build_semi_anti_left_table();
3608 let right = build_semi_anti_right_table();
3609
3610 let on = vec![(
3612 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3613 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3614 )];
3615
3616 let column_indices = vec![ColumnIndex {
3617 index: 0,
3618 side: JoinSide::Left,
3619 }];
3620 let intermediate_schema =
3621 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3622
3623 let filter_expression = Arc::new(BinaryExpr::new(
3624 Arc::new(Column::new("x", 0)),
3625 Operator::NotEq,
3626 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
3627 )) as Arc<dyn PhysicalExpr>;
3628
3629 let filter = JoinFilter::new(
3630 filter_expression,
3631 column_indices.clone(),
3632 Arc::new(intermediate_schema.clone()),
3633 );
3634
3635 let join = join_with_filter(
3636 Arc::clone(&left),
3637 Arc::clone(&right),
3638 on.clone(),
3639 filter,
3640 &JoinType::RightSemi,
3641 NullEquality::NullEqualsNothing,
3642 )?;
3643
3644 let columns = columns(&join.schema());
3645 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3646
3647 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3648 let batches = common::collect(stream).await?;
3649
3650 allow_duplicates! {
3652 assert_snapshot!(batches_to_string(&batches), @r"
3653 +----+----+-----+
3654 | a2 | b2 | c2 |
3655 +----+----+-----+
3656 | 8 | 8 | 20 |
3657 | 12 | 10 | 40 |
3658 | 10 | 10 | 100 |
3659 +----+----+-----+
3660 ");
3661 }
3662
3663 let metrics = join.metrics().unwrap();
3664 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3665
3666 let filter_expression = Arc::new(BinaryExpr::new(
3668 Arc::new(Column::new("x", 0)),
3669 Operator::Gt,
3670 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
3671 )) as Arc<dyn PhysicalExpr>;
3672
3673 let filter = JoinFilter::new(
3674 filter_expression,
3675 column_indices,
3676 Arc::new(intermediate_schema.clone()),
3677 );
3678
3679 let join = join_with_filter(
3680 left,
3681 right,
3682 on,
3683 filter,
3684 &JoinType::RightSemi,
3685 NullEquality::NullEqualsNothing,
3686 )?;
3687 let stream = join.execute(0, task_ctx)?;
3688 let batches = common::collect(stream).await?;
3689
3690 allow_duplicates! {
3692 assert_snapshot!(batches_to_string(&batches), @r"
3693 +----+----+-----+
3694 | a2 | b2 | c2 |
3695 +----+----+-----+
3696 | 12 | 10 | 40 |
3697 | 10 | 10 | 100 |
3698 +----+----+-----+
3699 ");
3700 }
3701
3702 let metrics = join.metrics().unwrap();
3703 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3704
3705 Ok(())
3706 }
3707
3708 #[apply(hash_join_exec_configs)]
3709 #[tokio::test]
3710 async fn join_left_anti(
3711 batch_size: usize,
3712 use_perfect_hash_join_as_possible: bool,
3713 ) -> Result<()> {
3714 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3715 let left = build_semi_anti_left_table();
3716 let right = build_semi_anti_right_table();
3717 let on = vec![(
3719 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3720 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3721 )];
3722
3723 let join = join(
3724 left,
3725 right,
3726 on,
3727 &JoinType::LeftAnti,
3728 NullEquality::NullEqualsNothing,
3729 )?;
3730
3731 let columns = columns(&join.schema());
3732 assert_eq!(columns, vec!["a1", "b1", "c1"]);
3733
3734 let stream = join.execute(0, task_ctx)?;
3735 let batches = common::collect(stream).await?;
3736
3737 allow_duplicates! {
3738 assert_snapshot!(batches_to_sort_string(&batches), @r"
3739 +----+----+----+
3740 | a1 | b1 | c1 |
3741 +----+----+----+
3742 | 1 | 1 | 10 |
3743 | 3 | 3 | 30 |
3744 | 5 | 5 | 50 |
3745 | 7 | 7 | 70 |
3746 +----+----+----+
3747 ");
3748 }
3749
3750 let metrics = join.metrics().unwrap();
3751 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3752
3753 Ok(())
3754 }
3755
3756 #[apply(hash_join_exec_configs)]
3757 #[tokio::test]
3758 async fn join_left_anti_with_filter(
3759 batch_size: usize,
3760 use_perfect_hash_join_as_possible: bool,
3761 ) -> Result<()> {
3762 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3763 let left = build_semi_anti_left_table();
3764 let right = build_semi_anti_right_table();
3765 let on = vec![(
3767 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3768 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3769 )];
3770
3771 let column_indices = vec![ColumnIndex {
3772 index: 0,
3773 side: JoinSide::Right,
3774 }];
3775 let intermediate_schema =
3776 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3777 let filter_expression = Arc::new(BinaryExpr::new(
3778 Arc::new(Column::new("x", 0)),
3779 Operator::NotEq,
3780 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3781 )) as Arc<dyn PhysicalExpr>;
3782
3783 let filter = JoinFilter::new(
3784 filter_expression,
3785 column_indices.clone(),
3786 Arc::new(intermediate_schema.clone()),
3787 );
3788
3789 let join = join_with_filter(
3790 Arc::clone(&left),
3791 Arc::clone(&right),
3792 on.clone(),
3793 filter,
3794 &JoinType::LeftAnti,
3795 NullEquality::NullEqualsNothing,
3796 )?;
3797
3798 let columns_header = columns(&join.schema());
3799 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3800
3801 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3802 let batches = common::collect(stream).await?;
3803
3804 allow_duplicates! {
3805 assert_snapshot!(batches_to_sort_string(&batches), @r"
3806 +----+----+-----+
3807 | a1 | b1 | c1 |
3808 +----+----+-----+
3809 | 1 | 1 | 10 |
3810 | 11 | 8 | 110 |
3811 | 3 | 3 | 30 |
3812 | 5 | 5 | 50 |
3813 | 7 | 7 | 70 |
3814 | 9 | 8 | 90 |
3815 +----+----+-----+
3816 ");
3817 }
3818
3819 let metrics = join.metrics().unwrap();
3820 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3821
3822 let filter_expression = Arc::new(BinaryExpr::new(
3824 Arc::new(Column::new("x", 0)),
3825 Operator::NotEq,
3826 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3827 )) as Arc<dyn PhysicalExpr>;
3828
3829 let filter = JoinFilter::new(
3830 filter_expression,
3831 column_indices,
3832 Arc::new(intermediate_schema),
3833 );
3834
3835 let join = join_with_filter(
3836 left,
3837 right,
3838 on,
3839 filter,
3840 &JoinType::LeftAnti,
3841 NullEquality::NullEqualsNothing,
3842 )?;
3843
3844 let columns_header = columns(&join.schema());
3845 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
3846
3847 let stream = join.execute(0, task_ctx)?;
3848 let batches = common::collect(stream).await?;
3849
3850 allow_duplicates! {
3851 assert_snapshot!(batches_to_sort_string(&batches), @r"
3852 +----+----+-----+
3853 | a1 | b1 | c1 |
3854 +----+----+-----+
3855 | 1 | 1 | 10 |
3856 | 11 | 8 | 110 |
3857 | 3 | 3 | 30 |
3858 | 5 | 5 | 50 |
3859 | 7 | 7 | 70 |
3860 | 9 | 8 | 90 |
3861 +----+----+-----+
3862 ");
3863 }
3864
3865 let metrics = join.metrics().unwrap();
3866 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3867
3868 Ok(())
3869 }
3870
3871 #[apply(hash_join_exec_configs)]
3872 #[tokio::test]
3873 async fn join_right_anti(
3874 batch_size: usize,
3875 use_perfect_hash_join_as_possible: bool,
3876 ) -> Result<()> {
3877 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3878 let left = build_semi_anti_left_table();
3879 let right = build_semi_anti_right_table();
3880 let on = vec![(
3881 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3882 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3883 )];
3884
3885 let join = join(
3886 left,
3887 right,
3888 on,
3889 &JoinType::RightAnti,
3890 NullEquality::NullEqualsNothing,
3891 )?;
3892
3893 let columns = columns(&join.schema());
3894 assert_eq!(columns, vec!["a2", "b2", "c2"]);
3895
3896 let stream = join.execute(0, task_ctx)?;
3897 let batches = common::collect(stream).await?;
3898
3899 allow_duplicates! {
3901 assert_snapshot!(batches_to_string(&batches), @r"
3902 +----+----+-----+
3903 | a2 | b2 | c2 |
3904 +----+----+-----+
3905 | 6 | 6 | 60 |
3906 | 2 | 2 | 80 |
3907 | 4 | 4 | 120 |
3908 +----+----+-----+
3909 ");
3910 }
3911
3912 let metrics = join.metrics().unwrap();
3913 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3914
3915 Ok(())
3916 }
3917
3918 #[apply(hash_join_exec_configs)]
3919 #[tokio::test]
3920 async fn join_right_anti_with_filter(
3921 batch_size: usize,
3922 use_perfect_hash_join_as_possible: bool,
3923 ) -> Result<()> {
3924 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
3925 let left = build_semi_anti_left_table();
3926 let right = build_semi_anti_right_table();
3927 let on = vec![(
3929 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3930 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3931 )];
3932
3933 let column_indices = vec![ColumnIndex {
3934 index: 0,
3935 side: JoinSide::Left,
3936 }];
3937 let intermediate_schema =
3938 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3939
3940 let filter_expression = Arc::new(BinaryExpr::new(
3941 Arc::new(Column::new("x", 0)),
3942 Operator::NotEq,
3943 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3944 )) as Arc<dyn PhysicalExpr>;
3945
3946 let filter = JoinFilter::new(
3947 filter_expression,
3948 column_indices,
3949 Arc::new(intermediate_schema.clone()),
3950 );
3951
3952 let join = join_with_filter(
3953 Arc::clone(&left),
3954 Arc::clone(&right),
3955 on.clone(),
3956 filter,
3957 &JoinType::RightAnti,
3958 NullEquality::NullEqualsNothing,
3959 )?;
3960
3961 let columns_header = columns(&join.schema());
3962 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3963
3964 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3965 let batches = common::collect(stream).await?;
3966
3967 allow_duplicates! {
3969 assert_snapshot!(batches_to_string(&batches), @r"
3970 +----+----+-----+
3971 | a2 | b2 | c2 |
3972 +----+----+-----+
3973 | 12 | 10 | 40 |
3974 | 6 | 6 | 60 |
3975 | 2 | 2 | 80 |
3976 | 10 | 10 | 100 |
3977 | 4 | 4 | 120 |
3978 +----+----+-----+
3979 ");
3980 }
3981
3982 let metrics = join.metrics().unwrap();
3983 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
3984
3985 let column_indices = vec![ColumnIndex {
3987 index: 1,
3988 side: JoinSide::Right,
3989 }];
3990 let filter_expression = Arc::new(BinaryExpr::new(
3991 Arc::new(Column::new("x", 0)),
3992 Operator::NotEq,
3993 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3994 )) as Arc<dyn PhysicalExpr>;
3995
3996 let filter = JoinFilter::new(
3997 filter_expression,
3998 column_indices,
3999 Arc::new(intermediate_schema),
4000 );
4001
4002 let join = join_with_filter(
4003 left,
4004 right,
4005 on,
4006 filter,
4007 &JoinType::RightAnti,
4008 NullEquality::NullEqualsNothing,
4009 )?;
4010
4011 let columns_header = columns(&join.schema());
4012 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
4013
4014 let stream = join.execute(0, task_ctx)?;
4015 let batches = common::collect(stream).await?;
4016
4017 allow_duplicates! {
4019 assert_snapshot!(batches_to_string(&batches), @r"
4020 +----+----+-----+
4021 | a2 | b2 | c2 |
4022 +----+----+-----+
4023 | 8 | 8 | 20 |
4024 | 6 | 6 | 60 |
4025 | 2 | 2 | 80 |
4026 | 4 | 4 | 120 |
4027 +----+----+-----+
4028 ");
4029 }
4030
4031 let metrics = join.metrics().unwrap();
4032 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4033
4034 Ok(())
4035 }
4036
4037 #[apply(hash_join_exec_configs)]
4038 #[tokio::test]
4039 async fn join_right_one(
4040 batch_size: usize,
4041 use_perfect_hash_join_as_possible: bool,
4042 ) -> Result<()> {
4043 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4044 let left = build_table(
4045 ("a1", &vec![1, 2, 3]),
4046 ("b1", &vec![4, 5, 7]),
4047 ("c1", &vec![7, 8, 9]),
4048 );
4049 let right = build_table(
4050 ("a2", &vec![10, 20, 30]),
4051 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
4053 );
4054 let on = vec![(
4055 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4056 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4057 )];
4058
4059 let (columns, batches, metrics) = join_collect(
4060 left,
4061 right,
4062 on,
4063 &JoinType::Right,
4064 NullEquality::NullEqualsNothing,
4065 task_ctx,
4066 )
4067 .await?;
4068
4069 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
4070
4071 allow_duplicates! {
4072 assert_snapshot!(batches_to_sort_string(&batches), @r"
4073 +----+----+----+----+----+----+
4074 | a1 | b1 | c1 | a2 | b1 | c2 |
4075 +----+----+----+----+----+----+
4076 | | | | 30 | 6 | 90 |
4077 | 1 | 4 | 7 | 10 | 4 | 70 |
4078 | 2 | 5 | 8 | 20 | 5 | 80 |
4079 +----+----+----+----+----+----+
4080 ");
4081 }
4082
4083 assert_join_metrics!(metrics, 3);
4084 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4085
4086 Ok(())
4087 }
4088
4089 #[apply(hash_join_exec_configs)]
4090 #[tokio::test]
4091 async fn partitioned_join_right_one(
4092 batch_size: usize,
4093 use_perfect_hash_join_as_possible: bool,
4094 ) -> Result<()> {
4095 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4096 let left = build_table(
4097 ("a1", &vec![1, 2, 3]),
4098 ("b1", &vec![4, 5, 7]),
4099 ("c1", &vec![7, 8, 9]),
4100 );
4101 let right = build_table(
4102 ("a2", &vec![10, 20, 30]),
4103 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
4105 );
4106 let on = vec![(
4107 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4108 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4109 )];
4110
4111 let (columns, batches, metrics) = partitioned_join_collect(
4112 left,
4113 right,
4114 on,
4115 &JoinType::Right,
4116 NullEquality::NullEqualsNothing,
4117 task_ctx,
4118 )
4119 .await?;
4120
4121 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
4122
4123 allow_duplicates! {
4124 assert_snapshot!(batches_to_sort_string(&batches), @r"
4125 +----+----+----+----+----+----+
4126 | a1 | b1 | c1 | a2 | b1 | c2 |
4127 +----+----+----+----+----+----+
4128 | | | | 30 | 6 | 90 |
4129 | 1 | 4 | 7 | 10 | 4 | 70 |
4130 | 2 | 5 | 8 | 20 | 5 | 80 |
4131 +----+----+----+----+----+----+
4132 ");
4133 }
4134
4135 assert_join_metrics!(metrics, 3);
4136 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4137
4138 Ok(())
4139 }
4140
4141 #[apply(hash_join_exec_configs)]
4142 #[tokio::test]
4143 async fn join_full_one(
4144 batch_size: usize,
4145 use_perfect_hash_join_as_possible: bool,
4146 ) -> Result<()> {
4147 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4148 let left = build_table(
4149 ("a1", &vec![1, 2, 3]),
4150 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4152 );
4153 let right = build_table(
4154 ("a2", &vec![10, 20, 30]),
4155 ("b2", &vec![4, 5, 6]),
4156 ("c2", &vec![70, 80, 90]),
4157 );
4158 let on = vec![(
4159 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4160 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4161 )];
4162
4163 let join = join(
4164 left,
4165 right,
4166 on,
4167 &JoinType::Full,
4168 NullEquality::NullEqualsNothing,
4169 )?;
4170
4171 let columns = columns(&join.schema());
4172 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
4173
4174 let stream = join.execute(0, task_ctx)?;
4175 let batches = common::collect(stream).await?;
4176
4177 allow_duplicates! {
4178 assert_snapshot!(batches_to_sort_string(&batches), @r"
4179 +----+----+----+----+----+----+
4180 | a1 | b1 | c1 | a2 | b2 | c2 |
4181 +----+----+----+----+----+----+
4182 | | | | 30 | 6 | 90 |
4183 | 1 | 4 | 7 | 10 | 4 | 70 |
4184 | 2 | 5 | 8 | 20 | 5 | 80 |
4185 | 3 | 7 | 9 | | | |
4186 +----+----+----+----+----+----+
4187 ");
4188 }
4189
4190 let metrics = join.metrics().unwrap();
4191 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4192
4193 Ok(())
4194 }
4195
4196 #[apply(hash_join_exec_configs)]
4197 #[tokio::test]
4198 async fn join_left_mark(
4199 batch_size: usize,
4200 use_perfect_hash_join_as_possible: bool,
4201 ) -> Result<()> {
4202 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4203 let left = build_table(
4204 ("a1", &vec![1, 2, 3]),
4205 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4207 );
4208 let right = build_table(
4209 ("a2", &vec![10, 20, 30]),
4210 ("b1", &vec![4, 5, 6]),
4211 ("c2", &vec![70, 80, 90]),
4212 );
4213 let on = vec![(
4214 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4215 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4216 )];
4217
4218 let (columns, batches, metrics) = join_collect(
4219 Arc::clone(&left),
4220 Arc::clone(&right),
4221 on.clone(),
4222 &JoinType::LeftMark,
4223 NullEquality::NullEqualsNothing,
4224 task_ctx,
4225 )
4226 .await?;
4227
4228 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4229
4230 allow_duplicates! {
4231 assert_snapshot!(batches_to_sort_string(&batches), @r"
4232 +----+----+----+-------+
4233 | a1 | b1 | c1 | mark |
4234 +----+----+----+-------+
4235 | 1 | 4 | 7 | true |
4236 | 2 | 5 | 8 | true |
4237 | 3 | 7 | 9 | false |
4238 +----+----+----+-------+
4239 ");
4240 }
4241
4242 assert_join_metrics!(metrics, 3);
4243 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4244
4245 Ok(())
4246 }
4247
4248 #[apply(hash_join_exec_configs)]
4249 #[tokio::test]
4250 async fn partitioned_join_left_mark(
4251 batch_size: usize,
4252 use_perfect_hash_join_as_possible: bool,
4253 ) -> Result<()> {
4254 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4255 let left = build_table(
4256 ("a1", &vec![1, 2, 3]),
4257 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4259 );
4260 let right = build_table(
4261 ("a2", &vec![10, 20, 30, 40]),
4262 ("b1", &vec![4, 4, 5, 6]),
4263 ("c2", &vec![60, 70, 80, 90]),
4264 );
4265 let on = vec![(
4266 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4267 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4268 )];
4269
4270 let (columns, batches, metrics) = partitioned_join_collect(
4271 Arc::clone(&left),
4272 Arc::clone(&right),
4273 on.clone(),
4274 &JoinType::LeftMark,
4275 NullEquality::NullEqualsNothing,
4276 task_ctx,
4277 )
4278 .await?;
4279
4280 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
4281
4282 allow_duplicates! {
4283 assert_snapshot!(batches_to_sort_string(&batches), @r"
4284 +----+----+----+-------+
4285 | a1 | b1 | c1 | mark |
4286 +----+----+----+-------+
4287 | 1 | 4 | 7 | true |
4288 | 2 | 5 | 8 | true |
4289 | 3 | 7 | 9 | false |
4290 +----+----+----+-------+
4291 ");
4292 }
4293
4294 assert_join_metrics!(metrics, 3);
4295 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4296
4297 Ok(())
4298 }
4299
4300 #[apply(hash_join_exec_configs)]
4301 #[tokio::test]
4302 async fn join_right_mark(
4303 batch_size: usize,
4304 use_perfect_hash_join_as_possible: bool,
4305 ) -> Result<()> {
4306 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4307 let left = build_table(
4308 ("a1", &vec![1, 2, 3]),
4309 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4311 );
4312 let right = build_table(
4313 ("a2", &vec![10, 20, 30]),
4314 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
4316 );
4317 let on = vec![(
4318 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4319 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4320 )];
4321
4322 let (columns, batches, metrics) = join_collect(
4323 Arc::clone(&left),
4324 Arc::clone(&right),
4325 on.clone(),
4326 &JoinType::RightMark,
4327 NullEquality::NullEqualsNothing,
4328 task_ctx,
4329 )
4330 .await?;
4331
4332 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4333
4334 let expected = [
4335 "+----+----+----+-------+",
4336 "| a2 | b1 | c2 | mark |",
4337 "+----+----+----+-------+",
4338 "| 10 | 4 | 70 | true |",
4339 "| 20 | 5 | 80 | true |",
4340 "| 30 | 6 | 90 | false |",
4341 "+----+----+----+-------+",
4342 ];
4343 assert_batches_sorted_eq!(expected, &batches);
4344
4345 assert_join_metrics!(metrics, 3);
4346 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4347
4348 Ok(())
4349 }
4350
4351 #[apply(hash_join_exec_configs)]
4352 #[tokio::test]
4353 async fn partitioned_join_right_mark(
4354 batch_size: usize,
4355 use_perfect_hash_join_as_possible: bool,
4356 ) -> Result<()> {
4357 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4358 let left = build_table(
4359 ("a1", &vec![1, 2, 3]),
4360 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
4362 );
4363 let right = build_table(
4364 ("a2", &vec![10, 20, 30, 40]),
4365 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
4367 );
4368 let on = vec![(
4369 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4370 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4371 )];
4372
4373 let (columns, batches, metrics) = partitioned_join_collect(
4374 Arc::clone(&left),
4375 Arc::clone(&right),
4376 on.clone(),
4377 &JoinType::RightMark,
4378 NullEquality::NullEqualsNothing,
4379 task_ctx,
4380 )
4381 .await?;
4382
4383 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
4384
4385 let expected = [
4386 "+----+----+----+-------+",
4387 "| a2 | b1 | c2 | mark |",
4388 "+----+----+----+-------+",
4389 "| 10 | 4 | 60 | true |",
4390 "| 20 | 4 | 70 | true |",
4391 "| 30 | 5 | 80 | true |",
4392 "| 40 | 6 | 90 | false |",
4393 "+----+----+----+-------+",
4394 ];
4395 assert_batches_sorted_eq!(expected, &batches);
4396
4397 assert_join_metrics!(metrics, 4);
4398 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4399
4400 Ok(())
4401 }
4402
4403 #[test]
4404 fn join_with_hash_collisions_64() -> Result<()> {
4405 let mut hashmap_left = HashTable::with_capacity(4);
4406 let left = build_table_i32(
4407 ("a", &vec![10, 20]),
4408 ("x", &vec![100, 200]),
4409 ("y", &vec![200, 300]),
4410 );
4411
4412 let random_state = RandomState::with_seed(0);
4413 let hashes_buff = &mut vec![0; left.num_rows()];
4414 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4415
4416 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
4421 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
4422
4423 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
4424 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
4425
4426 let next = vec![2, 0];
4427
4428 let right = build_table_i32(
4429 ("a", &vec![10, 20]),
4430 ("b", &vec![0, 0]),
4431 ("c", &vec![30, 40]),
4432 );
4433
4434 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4436
4437 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
4438
4439 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4440 let right_keys_values =
4441 key_column.evaluate(&right)?.into_array(right.num_rows())?;
4442 let mut hashes_buffer = vec![0; right.num_rows()];
4443 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4444
4445 let mut probe_indices_buffer = Vec::new();
4446 let mut build_indices_buffer = Vec::new();
4447 let (l, r, _) = lookup_join_hashmap(
4448 &join_hash_map,
4449 &[left_keys_values],
4450 &[right_keys_values],
4451 NullEquality::NullEqualsNothing,
4452 &hashes_buffer,
4453 8192,
4454 (0, None),
4455 &mut probe_indices_buffer,
4456 &mut build_indices_buffer,
4457 )?;
4458
4459 let left_ids: UInt64Array = vec![0, 1].into();
4460
4461 let right_ids: UInt32Array = vec![0, 1].into();
4462
4463 assert_eq!(left_ids, l);
4464
4465 assert_eq!(right_ids, r);
4466
4467 Ok(())
4468 }
4469
4470 #[test]
4471 fn join_with_hash_collisions_u32() -> Result<()> {
4472 let mut hashmap_left = HashTable::with_capacity(4);
4473 let left = build_table_i32(
4474 ("a", &vec![10, 20]),
4475 ("x", &vec![100, 200]),
4476 ("y", &vec![200, 300]),
4477 );
4478
4479 let random_state = RandomState::with_seed(0);
4480 let hashes_buff = &mut vec![0; left.num_rows()];
4481 let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
4482
4483 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
4484 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
4485 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
4486 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
4487
4488 let next: Vec<u32> = vec![2, 0];
4489
4490 let right = build_table_i32(
4491 ("a", &vec![10, 20]),
4492 ("b", &vec![0, 0]),
4493 ("c", &vec![30, 40]),
4494 );
4495
4496 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
4497
4498 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
4499
4500 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
4501 let right_keys_values =
4502 key_column.evaluate(&right)?.into_array(right.num_rows())?;
4503 let mut hashes_buffer = vec![0; right.num_rows()];
4504 create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
4505
4506 let mut probe_indices_buffer = Vec::new();
4507 let mut build_indices_buffer = Vec::new();
4508 let (l, r, _) = lookup_join_hashmap(
4509 &join_hash_map,
4510 &[left_keys_values],
4511 &[right_keys_values],
4512 NullEquality::NullEqualsNothing,
4513 &hashes_buffer,
4514 8192,
4515 (0, None),
4516 &mut probe_indices_buffer,
4517 &mut build_indices_buffer,
4518 )?;
4519
4520 let left_ids: UInt64Array = vec![0, 1].into();
4522 let right_ids: UInt32Array = vec![0, 1].into();
4523
4524 assert_eq!(left_ids, l);
4525 assert_eq!(right_ids, r);
4526
4527 Ok(())
4528 }
4529
4530 #[tokio::test]
4531 async fn join_with_duplicated_column_names() -> Result<()> {
4532 let task_ctx = Arc::new(TaskContext::default());
4533 let left = build_table(
4534 ("a", &vec![1, 2, 3]),
4535 ("b", &vec![4, 5, 7]),
4536 ("c", &vec![7, 8, 9]),
4537 );
4538 let right = build_table(
4539 ("a", &vec![10, 20, 30]),
4540 ("b", &vec![1, 2, 7]),
4541 ("c", &vec![70, 80, 90]),
4542 );
4543 let on = vec![(
4544 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4546 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4547 )];
4548
4549 let join = join(
4550 left,
4551 right,
4552 on,
4553 &JoinType::Inner,
4554 NullEquality::NullEqualsNothing,
4555 )?;
4556
4557 let columns = columns(&join.schema());
4558 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4559
4560 let stream = join.execute(0, task_ctx)?;
4561 let batches = common::collect(stream).await?;
4562
4563 allow_duplicates! {
4564 assert_snapshot!(batches_to_sort_string(&batches), @r"
4565 +---+---+---+----+---+----+
4566 | a | b | c | a | b | c |
4567 +---+---+---+----+---+----+
4568 | 1 | 4 | 7 | 10 | 1 | 70 |
4569 | 2 | 5 | 8 | 20 | 2 | 80 |
4570 +---+---+---+----+---+----+
4571 ");
4572 }
4573
4574 Ok(())
4575 }
4576
4577 fn prepare_join_filter() -> JoinFilter {
4578 let column_indices = vec![
4579 ColumnIndex {
4580 index: 2,
4581 side: JoinSide::Left,
4582 },
4583 ColumnIndex {
4584 index: 2,
4585 side: JoinSide::Right,
4586 },
4587 ];
4588 let intermediate_schema = Schema::new(vec![
4589 Field::new("c", DataType::Int32, true),
4590 Field::new("c", DataType::Int32, true),
4591 ]);
4592 let filter_expression = Arc::new(BinaryExpr::new(
4593 Arc::new(Column::new("c", 0)),
4594 Operator::Gt,
4595 Arc::new(Column::new("c", 1)),
4596 )) as Arc<dyn PhysicalExpr>;
4597
4598 JoinFilter::new(
4599 filter_expression,
4600 column_indices,
4601 Arc::new(intermediate_schema),
4602 )
4603 }
4604
4605 #[apply(hash_join_exec_configs)]
4606 #[tokio::test]
4607 async fn join_inner_with_filter(
4608 batch_size: usize,
4609 use_perfect_hash_join_as_possible: bool,
4610 ) -> Result<()> {
4611 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4612 let left = build_table(
4613 ("a", &vec![0, 1, 2, 2]),
4614 ("b", &vec![4, 5, 7, 8]),
4615 ("c", &vec![7, 8, 9, 1]),
4616 );
4617 let right = build_table(
4618 ("a", &vec![10, 20, 30, 40]),
4619 ("b", &vec![2, 2, 3, 4]),
4620 ("c", &vec![7, 5, 6, 4]),
4621 );
4622 let on = vec![(
4623 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4624 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4625 )];
4626 let filter = prepare_join_filter();
4627
4628 let join = join_with_filter(
4629 left,
4630 right,
4631 on,
4632 filter,
4633 &JoinType::Inner,
4634 NullEquality::NullEqualsNothing,
4635 )?;
4636
4637 let columns = columns(&join.schema());
4638 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4639
4640 let stream = join.execute(0, task_ctx)?;
4641 let batches = common::collect(stream).await?;
4642
4643 allow_duplicates! {
4644 assert_snapshot!(batches_to_sort_string(&batches), @r"
4645 +---+---+---+----+---+---+
4646 | a | b | c | a | b | c |
4647 +---+---+---+----+---+---+
4648 | 2 | 7 | 9 | 10 | 2 | 7 |
4649 | 2 | 7 | 9 | 20 | 2 | 5 |
4650 +---+---+---+----+---+---+
4651 ");
4652 }
4653
4654 let metrics = join.metrics().unwrap();
4655 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4656
4657 Ok(())
4658 }
4659
4660 #[apply(hash_join_exec_configs)]
4661 #[tokio::test]
4662 async fn join_left_with_filter(
4663 batch_size: usize,
4664 use_perfect_hash_join_as_possible: bool,
4665 ) -> Result<()> {
4666 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4667 let left = build_table(
4668 ("a", &vec![0, 1, 2, 2]),
4669 ("b", &vec![4, 5, 7, 8]),
4670 ("c", &vec![7, 8, 9, 1]),
4671 );
4672 let right = build_table(
4673 ("a", &vec![10, 20, 30, 40]),
4674 ("b", &vec![2, 2, 3, 4]),
4675 ("c", &vec![7, 5, 6, 4]),
4676 );
4677 let on = vec![(
4678 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4679 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4680 )];
4681 let filter = prepare_join_filter();
4682
4683 let join = join_with_filter(
4684 left,
4685 right,
4686 on,
4687 filter,
4688 &JoinType::Left,
4689 NullEquality::NullEqualsNothing,
4690 )?;
4691
4692 let columns = columns(&join.schema());
4693 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4694
4695 let stream = join.execute(0, task_ctx)?;
4696 let batches = common::collect(stream).await?;
4697
4698 allow_duplicates! {
4699 assert_snapshot!(batches_to_sort_string(&batches), @r"
4700 +---+---+---+----+---+---+
4701 | a | b | c | a | b | c |
4702 +---+---+---+----+---+---+
4703 | 0 | 4 | 7 | | | |
4704 | 1 | 5 | 8 | | | |
4705 | 2 | 7 | 9 | 10 | 2 | 7 |
4706 | 2 | 7 | 9 | 20 | 2 | 5 |
4707 | 2 | 8 | 1 | | | |
4708 +---+---+---+----+---+---+
4709 ");
4710 }
4711
4712 let metrics = join.metrics().unwrap();
4713 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4714
4715 Ok(())
4716 }
4717
4718 #[apply(hash_join_exec_configs)]
4719 #[tokio::test]
4720 async fn join_right_with_filter(
4721 batch_size: usize,
4722 use_perfect_hash_join_as_possible: bool,
4723 ) -> Result<()> {
4724 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4725 let left = build_table(
4726 ("a", &vec![0, 1, 2, 2]),
4727 ("b", &vec![4, 5, 7, 8]),
4728 ("c", &vec![7, 8, 9, 1]),
4729 );
4730 let right = build_table(
4731 ("a", &vec![10, 20, 30, 40]),
4732 ("b", &vec![2, 2, 3, 4]),
4733 ("c", &vec![7, 5, 6, 4]),
4734 );
4735 let on = vec![(
4736 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4737 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4738 )];
4739 let filter = prepare_join_filter();
4740
4741 let join = join_with_filter(
4742 left,
4743 right,
4744 on,
4745 filter,
4746 &JoinType::Right,
4747 NullEquality::NullEqualsNothing,
4748 )?;
4749
4750 let columns = columns(&join.schema());
4751 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4752
4753 let stream = join.execute(0, task_ctx)?;
4754 let batches = common::collect(stream).await?;
4755
4756 allow_duplicates! {
4757 assert_snapshot!(batches_to_sort_string(&batches), @r"
4758 +---+---+---+----+---+---+
4759 | a | b | c | a | b | c |
4760 +---+---+---+----+---+---+
4761 | | | | 30 | 3 | 6 |
4762 | | | | 40 | 4 | 4 |
4763 | 2 | 7 | 9 | 10 | 2 | 7 |
4764 | 2 | 7 | 9 | 20 | 2 | 5 |
4765 +---+---+---+----+---+---+
4766 ");
4767 }
4768
4769 let metrics = join.metrics().unwrap();
4770 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4771
4772 Ok(())
4773 }
4774
4775 #[apply(hash_join_exec_configs)]
4776 #[tokio::test]
4777 async fn join_full_with_filter(
4778 batch_size: usize,
4779 use_perfect_hash_join_as_possible: bool,
4780 ) -> Result<()> {
4781 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
4782 let left = build_table(
4783 ("a", &vec![0, 1, 2, 2]),
4784 ("b", &vec![4, 5, 7, 8]),
4785 ("c", &vec![7, 8, 9, 1]),
4786 );
4787 let right = build_table(
4788 ("a", &vec![10, 20, 30, 40]),
4789 ("b", &vec![2, 2, 3, 4]),
4790 ("c", &vec![7, 5, 6, 4]),
4791 );
4792 let on = vec![(
4793 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
4794 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
4795 )];
4796 let filter = prepare_join_filter();
4797
4798 let join = join_with_filter(
4799 left,
4800 right,
4801 on,
4802 filter,
4803 &JoinType::Full,
4804 NullEquality::NullEqualsNothing,
4805 )?;
4806
4807 let columns = columns(&join.schema());
4808 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
4809
4810 let stream = join.execute(0, task_ctx)?;
4811 let batches = common::collect(stream).await?;
4812
4813 let expected = [
4814 "+---+---+---+----+---+---+",
4815 "| a | b | c | a | b | c |",
4816 "+---+---+---+----+---+---+",
4817 "| | | | 30 | 3 | 6 |",
4818 "| | | | 40 | 4 | 4 |",
4819 "| 2 | 7 | 9 | 10 | 2 | 7 |",
4820 "| 2 | 7 | 9 | 20 | 2 | 5 |",
4821 "| 0 | 4 | 7 | | | |",
4822 "| 1 | 5 | 8 | | | |",
4823 "| 2 | 8 | 1 | | | |",
4824 "+---+---+---+----+---+---+",
4825 ];
4826 assert_batches_sorted_eq!(expected, &batches);
4827
4828 let metrics = join.metrics().unwrap();
4829 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
4830
4831 Ok(())
4849 }
4850
4851 #[tokio::test]
4853 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
4854 let task_ctx = Arc::new(TaskContext::default());
4855 let left = build_table(
4856 ("a1", &vec![1, 2, 3]),
4857 ("b1", &vec![4, 5, 7]),
4858 ("c1", &vec![7, 8, 9]),
4859 );
4860 let right = build_table(
4861 ("a2", &vec![10, 20, 30]),
4862 ("b2", &vec![4, 5, 6]),
4863 ("c2", &vec![70, 80, 90]),
4864 );
4865 let on = vec![(
4866 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4867 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4868 )];
4869
4870 let expected_inner = vec![
4871 "+----+----+----+----+----+----+",
4872 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4873 "+----+----+----+----+----+----+",
4874 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4875 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4876 "+----+----+----+----+----+----+",
4877 ];
4878 let expected_left = vec![
4879 "+----+----+----+----+----+----+",
4880 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4881 "+----+----+----+----+----+----+",
4882 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4883 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4884 "| 3 | 7 | 9 | | | |",
4885 "+----+----+----+----+----+----+",
4886 ];
4887 let expected_right = vec![
4888 "+----+----+----+----+----+----+",
4889 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4890 "+----+----+----+----+----+----+",
4891 "| | | | 30 | 6 | 90 |",
4892 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4893 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4894 "+----+----+----+----+----+----+",
4895 ];
4896 let expected_full = vec![
4897 "+----+----+----+----+----+----+",
4898 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4899 "+----+----+----+----+----+----+",
4900 "| | | | 30 | 6 | 90 |",
4901 "| 1 | 4 | 7 | 10 | 4 | 70 |",
4902 "| 2 | 5 | 8 | 20 | 5 | 80 |",
4903 "| 3 | 7 | 9 | | | |",
4904 "+----+----+----+----+----+----+",
4905 ];
4906 let expected_left_semi = vec![
4907 "+----+----+----+",
4908 "| a1 | b1 | c1 |",
4909 "+----+----+----+",
4910 "| 1 | 4 | 7 |",
4911 "| 2 | 5 | 8 |",
4912 "+----+----+----+",
4913 ];
4914 let expected_left_anti = vec![
4915 "+----+----+----+",
4916 "| a1 | b1 | c1 |",
4917 "+----+----+----+",
4918 "| 3 | 7 | 9 |",
4919 "+----+----+----+",
4920 ];
4921 let expected_right_semi = vec![
4922 "+----+----+----+",
4923 "| a2 | b2 | c2 |",
4924 "+----+----+----+",
4925 "| 10 | 4 | 70 |",
4926 "| 20 | 5 | 80 |",
4927 "+----+----+----+",
4928 ];
4929 let expected_right_anti = vec![
4930 "+----+----+----+",
4931 "| a2 | b2 | c2 |",
4932 "+----+----+----+",
4933 "| 30 | 6 | 90 |",
4934 "+----+----+----+",
4935 ];
4936 let expected_left_mark = vec![
4937 "+----+----+----+-------+",
4938 "| a1 | b1 | c1 | mark |",
4939 "+----+----+----+-------+",
4940 "| 1 | 4 | 7 | true |",
4941 "| 2 | 5 | 8 | true |",
4942 "| 3 | 7 | 9 | false |",
4943 "+----+----+----+-------+",
4944 ];
4945 let expected_right_mark = vec![
4946 "+----+----+----+-------+",
4947 "| a2 | b2 | c2 | mark |",
4948 "+----+----+----+-------+",
4949 "| 10 | 4 | 70 | true |",
4950 "| 20 | 5 | 80 | true |",
4951 "| 30 | 6 | 90 | false |",
4952 "+----+----+----+-------+",
4953 ];
4954
4955 let test_cases = vec![
4956 (JoinType::Inner, expected_inner),
4957 (JoinType::Left, expected_left),
4958 (JoinType::Right, expected_right),
4959 (JoinType::Full, expected_full),
4960 (JoinType::LeftSemi, expected_left_semi),
4961 (JoinType::LeftAnti, expected_left_anti),
4962 (JoinType::RightSemi, expected_right_semi),
4963 (JoinType::RightAnti, expected_right_anti),
4964 (JoinType::LeftMark, expected_left_mark),
4965 (JoinType::RightMark, expected_right_mark),
4966 ];
4967
4968 for (join_type, expected) in test_cases {
4969 let (_, batches, metrics) = join_collect_with_partition_mode(
4970 Arc::clone(&left),
4971 Arc::clone(&right),
4972 on.clone(),
4973 &join_type,
4974 PartitionMode::CollectLeft,
4975 NullEquality::NullEqualsNothing,
4976 Arc::clone(&task_ctx),
4977 )
4978 .await?;
4979 assert_batches_sorted_eq!(expected, &batches);
4980 assert_join_metrics!(metrics, expected.len() - 4);
4981 }
4982
4983 Ok(())
4984 }
4985
4986 #[tokio::test]
4987 async fn join_date32() -> Result<()> {
4988 let schema = Arc::new(Schema::new(vec![
4989 Field::new("date", DataType::Date32, false),
4990 Field::new("n", DataType::Int32, false),
4991 ]));
4992
4993 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4994 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4995 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4996 let left =
4997 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4998 .unwrap();
4999 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
5000 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
5001 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
5002 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
5003 let on = vec![(
5004 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
5005 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
5006 )];
5007
5008 let join = join(
5009 left,
5010 right,
5011 on,
5012 &JoinType::Inner,
5013 NullEquality::NullEqualsNothing,
5014 )?;
5015
5016 let task_ctx = Arc::new(TaskContext::default());
5017 let stream = join.execute(0, task_ctx)?;
5018 let batches = common::collect(stream).await?;
5019
5020 allow_duplicates! {
5021 assert_snapshot!(batches_to_sort_string(&batches), @r"
5022 +------------+---+------------+---+
5023 | date | n | date | n |
5024 +------------+---+------------+---+
5025 | 2022-04-26 | 2 | 2022-04-26 | 4 |
5026 | 2022-04-26 | 2 | 2022-04-26 | 5 |
5027 | 2022-04-27 | 3 | 2022-04-27 | 6 |
5028 +------------+---+------------+---+
5029 ");
5030 }
5031
5032 Ok(())
5033 }
5034
5035 #[tokio::test]
5036 async fn join_with_error_right() {
5037 let left = build_table(
5038 ("a1", &vec![1, 2, 3]),
5039 ("b1", &vec![4, 5, 7]),
5040 ("c1", &vec![7, 8, 9]),
5041 );
5042
5043 let err = exec_err!("bad data error");
5046 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
5047
5048 let on = vec![(
5049 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
5050 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
5051 )];
5052 let schema = right.schema();
5053 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
5054 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
5055
5056 let join_types = vec![
5057 JoinType::Inner,
5058 JoinType::Left,
5059 JoinType::Right,
5060 JoinType::Full,
5061 JoinType::LeftSemi,
5062 JoinType::LeftAnti,
5063 JoinType::RightSemi,
5064 JoinType::RightAnti,
5065 ];
5066
5067 for join_type in join_types {
5068 let join = join(
5069 Arc::clone(&left),
5070 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
5071 on.clone(),
5072 &join_type,
5073 NullEquality::NullEqualsNothing,
5074 )
5075 .unwrap();
5076 let task_ctx = Arc::new(TaskContext::default());
5077
5078 let stream = join.execute(0, task_ctx).unwrap();
5079
5080 let result_string = common::collect(stream).await.unwrap_err().to_string();
5082 assert!(
5083 result_string.contains("bad data error"),
5084 "actual: {result_string}"
5085 );
5086 }
5087 }
5088
5089 #[tokio::test]
5090 async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
5091 assert_empty_build_probe_behavior(
5092 &[
5093 JoinType::Inner,
5094 JoinType::Left,
5095 JoinType::LeftSemi,
5096 JoinType::LeftAnti,
5097 JoinType::LeftMark,
5098 JoinType::RightSemi,
5099 ],
5100 false,
5101 false,
5102 )
5103 .await;
5104 }
5105
5106 #[tokio::test]
5107 async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
5108 assert_empty_build_probe_behavior(
5109 &[
5110 JoinType::Inner,
5111 JoinType::Left,
5112 JoinType::LeftSemi,
5113 JoinType::LeftAnti,
5114 JoinType::LeftMark,
5115 JoinType::RightSemi,
5116 ],
5117 false,
5118 true,
5119 )
5120 .await;
5121 }
5122
5123 #[tokio::test]
5124 async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5125 assert_empty_build_probe_behavior(
5126 &[
5127 JoinType::Right,
5128 JoinType::Full,
5129 JoinType::RightAnti,
5130 JoinType::RightMark,
5131 ],
5132 true,
5133 false,
5134 )
5135 .await;
5136 }
5137
5138 #[tokio::test]
5139 async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
5140 assert_empty_build_probe_behavior(
5141 &[
5142 JoinType::Right,
5143 JoinType::Full,
5144 JoinType::RightAnti,
5145 JoinType::RightMark,
5146 ],
5147 true,
5148 true,
5149 )
5150 .await;
5151 }
5152
5153 #[tokio::test]
5154 async fn join_split_batch() {
5155 let left = build_table(
5156 ("a1", &vec![1, 2, 3, 4]),
5157 ("b1", &vec![1, 1, 1, 1]),
5158 ("c1", &vec![0, 0, 0, 0]),
5159 );
5160 let right = build_table(
5161 ("a2", &vec![10, 20, 30, 40, 50]),
5162 ("b2", &vec![1, 1, 1, 1, 1]),
5163 ("c2", &vec![0, 0, 0, 0, 0]),
5164 );
5165 let on = vec![(
5166 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
5167 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5168 )];
5169
5170 let join_types = vec![
5171 JoinType::Inner,
5172 JoinType::Left,
5173 JoinType::Right,
5174 JoinType::Full,
5175 JoinType::RightSemi,
5176 JoinType::RightAnti,
5177 JoinType::LeftSemi,
5178 JoinType::LeftAnti,
5179 ];
5180 let expected_resultset_records = 20;
5181 let common_result = [
5182 "+----+----+----+----+----+----+",
5183 "| a1 | b1 | c1 | a2 | b2 | c2 |",
5184 "+----+----+----+----+----+----+",
5185 "| 1 | 1 | 0 | 10 | 1 | 0 |",
5186 "| 2 | 1 | 0 | 10 | 1 | 0 |",
5187 "| 3 | 1 | 0 | 10 | 1 | 0 |",
5188 "| 4 | 1 | 0 | 10 | 1 | 0 |",
5189 "| 1 | 1 | 0 | 20 | 1 | 0 |",
5190 "| 2 | 1 | 0 | 20 | 1 | 0 |",
5191 "| 3 | 1 | 0 | 20 | 1 | 0 |",
5192 "| 4 | 1 | 0 | 20 | 1 | 0 |",
5193 "| 1 | 1 | 0 | 30 | 1 | 0 |",
5194 "| 2 | 1 | 0 | 30 | 1 | 0 |",
5195 "| 3 | 1 | 0 | 30 | 1 | 0 |",
5196 "| 4 | 1 | 0 | 30 | 1 | 0 |",
5197 "| 1 | 1 | 0 | 40 | 1 | 0 |",
5198 "| 2 | 1 | 0 | 40 | 1 | 0 |",
5199 "| 3 | 1 | 0 | 40 | 1 | 0 |",
5200 "| 4 | 1 | 0 | 40 | 1 | 0 |",
5201 "| 1 | 1 | 0 | 50 | 1 | 0 |",
5202 "| 2 | 1 | 0 | 50 | 1 | 0 |",
5203 "| 3 | 1 | 0 | 50 | 1 | 0 |",
5204 "| 4 | 1 | 0 | 50 | 1 | 0 |",
5205 "+----+----+----+----+----+----+",
5206 ];
5207 let left_batch = [
5208 "+----+----+----+",
5209 "| a1 | b1 | c1 |",
5210 "+----+----+----+",
5211 "| 1 | 1 | 0 |",
5212 "| 2 | 1 | 0 |",
5213 "| 3 | 1 | 0 |",
5214 "| 4 | 1 | 0 |",
5215 "+----+----+----+",
5216 ];
5217 let right_batch = [
5218 "+----+----+----+",
5219 "| a2 | b2 | c2 |",
5220 "+----+----+----+",
5221 "| 10 | 1 | 0 |",
5222 "| 20 | 1 | 0 |",
5223 "| 30 | 1 | 0 |",
5224 "| 40 | 1 | 0 |",
5225 "| 50 | 1 | 0 |",
5226 "+----+----+----+",
5227 ];
5228 let right_empty = [
5229 "+----+----+----+",
5230 "| a2 | b2 | c2 |",
5231 "+----+----+----+",
5232 "+----+----+----+",
5233 ];
5234 let left_empty = [
5235 "+----+----+----+",
5236 "| a1 | b1 | c1 |",
5237 "+----+----+----+",
5238 "+----+----+----+",
5239 ];
5240
5241 for join_type in join_types {
5243 for batch_size in (1..21).rev() {
5244 let task_ctx = prepare_task_ctx(batch_size, true);
5245
5246 let join = join(
5247 Arc::clone(&left),
5248 Arc::clone(&right),
5249 on.clone(),
5250 &join_type,
5251 NullEquality::NullEqualsNothing,
5252 )
5253 .unwrap();
5254
5255 let stream = join.execute(0, task_ctx).unwrap();
5256 let batches = common::collect(stream).await.unwrap();
5257
5258 let expected_batch_count = match join_type {
5263 JoinType::Inner
5264 | JoinType::Right
5265 | JoinType::RightSemi
5266 | JoinType::RightAnti => {
5267 div_ceil(expected_resultset_records, batch_size)
5268 }
5269 _ => div_ceil(expected_resultset_records, batch_size) + 1,
5270 };
5271 assert!(
5273 batches.len() <= expected_batch_count,
5274 "expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
5275 batches.len()
5276 );
5277
5278 let expected = match join_type {
5279 JoinType::RightSemi => right_batch.to_vec(),
5280 JoinType::RightAnti => right_empty.to_vec(),
5281 JoinType::LeftSemi => left_batch.to_vec(),
5282 JoinType::LeftAnti => left_empty.to_vec(),
5283 _ => common_result.to_vec(),
5284 };
5285 if batches.is_empty() {
5288 assert!(
5290 matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
5291 "Unexpected empty result for {join_type} join"
5292 );
5293 } else {
5294 assert_batches_eq!(expected, &batches);
5295 }
5296 }
5297 }
5298 }
5299
5300 #[tokio::test]
5301 async fn single_partition_join_overallocation() -> Result<()> {
5302 let left = build_table(
5303 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5304 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5305 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5306 );
5307 let right = build_table(
5308 ("a2", &vec![10, 11]),
5309 ("b2", &vec![12, 13]),
5310 ("c2", &vec![14, 15]),
5311 );
5312 let on = vec![(
5313 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
5314 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
5315 )];
5316
5317 let join_types = vec![
5318 JoinType::Inner,
5319 JoinType::Left,
5320 JoinType::Right,
5321 JoinType::Full,
5322 JoinType::LeftSemi,
5323 JoinType::LeftAnti,
5324 JoinType::RightSemi,
5325 JoinType::RightAnti,
5326 JoinType::LeftMark,
5327 JoinType::RightMark,
5328 ];
5329
5330 for join_type in join_types {
5331 let runtime = RuntimeEnvBuilder::new()
5332 .with_memory_limit(100, 1.0)
5333 .build_arc()?;
5334 let task_ctx = TaskContext::default().with_runtime(runtime);
5335 let task_ctx = Arc::new(task_ctx);
5336
5337 let join = join(
5338 Arc::clone(&left),
5339 Arc::clone(&right),
5340 on.clone(),
5341 &join_type,
5342 NullEquality::NullEqualsNothing,
5343 )?;
5344
5345 let stream = join.execute(0, task_ctx)?;
5346 let err = common::collect(stream).await.unwrap_err();
5347
5348 assert_contains!(
5350 err.to_string(),
5351 "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
5352 );
5353
5354 assert_contains!(
5355 err.to_string(),
5356 "Failed to allocate additional 120.0 B for HashJoinInput"
5357 );
5358 }
5359
5360 Ok(())
5361 }
5362
5363 #[tokio::test]
5364 async fn partitioned_join_overallocation() -> Result<()> {
5365 let left_batch = build_table_i32(
5368 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5369 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5370 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
5371 );
5372 let left = TestMemoryExec::try_new_exec(
5373 &[vec![left_batch.clone()], vec![left_batch.clone()]],
5374 left_batch.schema(),
5375 None,
5376 )
5377 .unwrap();
5378 let right_batch = build_table_i32(
5379 ("a2", &vec![10, 11]),
5380 ("b2", &vec![12, 13]),
5381 ("c2", &vec![14, 15]),
5382 );
5383 let right = TestMemoryExec::try_new_exec(
5384 &[vec![right_batch.clone()], vec![right_batch.clone()]],
5385 right_batch.schema(),
5386 None,
5387 )
5388 .unwrap();
5389 let on = vec![(
5390 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
5391 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
5392 )];
5393
5394 let join_types = vec![
5395 JoinType::Inner,
5396 JoinType::Left,
5397 JoinType::Right,
5398 JoinType::Full,
5399 JoinType::LeftSemi,
5400 JoinType::LeftAnti,
5401 JoinType::RightSemi,
5402 JoinType::RightAnti,
5403 ];
5404
5405 for join_type in join_types {
5406 let runtime = RuntimeEnvBuilder::new()
5407 .with_memory_limit(100, 1.0)
5408 .build_arc()?;
5409 let session_config = SessionConfig::default().with_batch_size(50);
5410 let task_ctx = TaskContext::default()
5411 .with_session_config(session_config)
5412 .with_runtime(runtime);
5413 let task_ctx = Arc::new(task_ctx);
5414
5415 let join = HashJoinExec::try_new(
5416 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
5417 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
5418 on.clone(),
5419 None,
5420 &join_type,
5421 None,
5422 PartitionMode::Partitioned,
5423 NullEquality::NullEqualsNothing,
5424 false,
5425 )?;
5426
5427 let stream = join.execute(1, task_ctx)?;
5428 let err = common::collect(stream).await.unwrap_err();
5429
5430 assert_contains!(
5432 err.to_string(),
5433 "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
5434 );
5435
5436 assert_contains!(
5437 err.to_string(),
5438 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
5439 );
5440 }
5441
5442 Ok(())
5443 }
5444
5445 fn build_table_struct(
5446 struct_name: &str,
5447 field_name_and_values: (&str, &Vec<Option<i32>>),
5448 nulls: Option<NullBuffer>,
5449 ) -> Arc<dyn ExecutionPlan> {
5450 let (field_name, values) = field_name_and_values;
5451 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
5452 let schema = Schema::new(vec![Field::new(
5453 struct_name,
5454 DataType::Struct(inner_fields.clone().into()),
5455 nulls.is_some(),
5456 )]);
5457
5458 let batch = RecordBatch::try_new(
5459 Arc::new(schema),
5460 vec![Arc::new(StructArray::new(
5461 inner_fields.into(),
5462 vec![Arc::new(Int32Array::from(values.clone()))],
5463 nulls,
5464 ))],
5465 )
5466 .unwrap();
5467 let schema_ref = batch.schema();
5468 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
5469 }
5470
5471 #[tokio::test]
5472 async fn join_on_struct() -> Result<()> {
5473 let task_ctx = Arc::new(TaskContext::default());
5474 let left =
5475 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
5476 let right =
5477 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
5478 let on = vec![(
5479 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5480 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5481 )];
5482
5483 let (columns, batches, metrics) = join_collect(
5484 left,
5485 right,
5486 on,
5487 &JoinType::Inner,
5488 NullEquality::NullEqualsNothing,
5489 task_ctx,
5490 )
5491 .await?;
5492
5493 assert_eq!(columns, vec!["n1", "n2"]);
5494
5495 allow_duplicates! {
5496 assert_snapshot!(batches_to_string(&batches), @r"
5497 +--------+--------+
5498 | n1 | n2 |
5499 +--------+--------+
5500 | {a: } | {a: } |
5501 | {a: 1} | {a: 1} |
5502 | {a: 2} | {a: 2} |
5503 +--------+--------+
5504 ");
5505 }
5506
5507 assert_join_metrics!(metrics, 3);
5508
5509 Ok(())
5510 }
5511
5512 #[tokio::test]
5513 async fn join_on_struct_with_nulls() -> Result<()> {
5514 let task_ctx = Arc::new(TaskContext::default());
5515 let left =
5516 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5517 let right =
5518 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
5519 let on = vec![(
5520 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
5521 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
5522 )];
5523
5524 let (_, batches_null_eq, metrics) = join_collect(
5525 Arc::clone(&left),
5526 Arc::clone(&right),
5527 on.clone(),
5528 &JoinType::Inner,
5529 NullEquality::NullEqualsNull,
5530 Arc::clone(&task_ctx),
5531 )
5532 .await?;
5533
5534 allow_duplicates! {
5535 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
5536 +----+----+
5537 | n1 | n2 |
5538 +----+----+
5539 | | |
5540 +----+----+
5541 ");
5542 }
5543
5544 assert_join_metrics!(metrics, 1);
5545
5546 let (_, batches_null_neq, metrics) = join_collect(
5547 left,
5548 right,
5549 on,
5550 &JoinType::Inner,
5551 NullEquality::NullEqualsNothing,
5552 task_ctx,
5553 )
5554 .await?;
5555
5556 assert_join_metrics!(metrics, 0);
5557
5558 if batches_null_neq.is_empty() {
5561 } else {
5563 let expected_null_neq =
5564 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
5565 assert_batches_eq!(expected_null_neq, &batches_null_neq);
5566 }
5567
5568 Ok(())
5569 }
5570
5571 fn columns(schema: &Schema) -> Vec<String> {
5573 schema.fields().iter().map(|f| f.name().clone()).collect()
5574 }
5575
5576 #[tokio::test]
5578 async fn test_hash_join_marks_filter_complete() -> Result<()> {
5579 let task_ctx = Arc::new(TaskContext::default());
5580 let left = build_table(
5581 ("a1", &vec![1, 2, 3]),
5582 ("b1", &vec![4, 5, 6]),
5583 ("c1", &vec![7, 8, 9]),
5584 );
5585 let right = build_table(
5586 ("a2", &vec![10, 20, 30]),
5587 ("b1", &vec![4, 5, 6]),
5588 ("c2", &vec![70, 80, 90]),
5589 );
5590
5591 let on = vec![(
5592 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5593 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5594 )];
5595
5596 let (join, dynamic_filter) =
5597 hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5598
5599 let stream = join.execute(0, task_ctx)?;
5601 let _batches = common::collect(stream).await?;
5602
5603 dynamic_filter.wait_complete().await;
5606
5607 Ok(())
5608 }
5609
5610 #[tokio::test]
5612 async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
5613 let task_ctx = Arc::new(TaskContext::default());
5614 let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
5616 let right = build_table(
5617 ("a2", &vec![10, 20, 30]),
5618 ("b1", &vec![4, 5, 6]),
5619 ("c2", &vec![70, 80, 90]),
5620 );
5621
5622 let on = vec![(
5623 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
5624 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
5625 )];
5626
5627 let (join, dynamic_filter) =
5628 hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5629
5630 let stream = join.execute(0, task_ctx)?;
5632 let _batches = common::collect(stream).await?;
5633
5634 dynamic_filter.wait_complete().await;
5637
5638 Ok(())
5639 }
5640
5641 #[tokio::test]
5642 async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
5643 -> Result<()> {
5644 let mut session_config = SessionConfig::default();
5645 session_config
5646 .options_mut()
5647 .optimizer
5648 .enable_dynamic_filter_pushdown = true;
5649 let task_ctx =
5650 Arc::new(TaskContext::default().with_session_config(session_config));
5651
5652 let child_left_schema = Arc::new(Schema::new(vec![
5653 Field::new("child_left_payload", DataType::Int32, false),
5654 Field::new("child_key", DataType::Int32, false),
5655 Field::new("child_left_extra", DataType::Int32, false),
5656 ]));
5657 let child_right_schema = Arc::new(Schema::new(vec![
5658 Field::new("child_right_payload", DataType::Int32, false),
5659 Field::new("child_right_key", DataType::Int32, false),
5660 Field::new("child_right_extra", DataType::Int32, false),
5661 ]));
5662 let parent_left_schema = Arc::new(Schema::new(vec![
5663 Field::new("parent_payload", DataType::Int32, false),
5664 Field::new("parent_key", DataType::Int32, false),
5665 Field::new("parent_extra", DataType::Int32, false),
5666 ]));
5667
5668 let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5669 &[
5670 vec![build_table_i32(
5671 ("child_left_payload", &vec![10]),
5672 ("child_key", &vec![0]),
5673 ("child_left_extra", &vec![100]),
5674 )],
5675 vec![build_table_i32(
5676 ("child_left_payload", &vec![11]),
5677 ("child_key", &vec![1]),
5678 ("child_left_extra", &vec![101]),
5679 )],
5680 vec![build_table_i32(
5681 ("child_left_payload", &vec![12]),
5682 ("child_key", &vec![2]),
5683 ("child_left_extra", &vec![102]),
5684 )],
5685 vec![build_table_i32(
5686 ("child_left_payload", &vec![13]),
5687 ("child_key", &vec![3]),
5688 ("child_left_extra", &vec![103]),
5689 )],
5690 ],
5691 Arc::clone(&child_left_schema),
5692 None,
5693 )?;
5694 let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5695 &[
5696 vec![build_table_i32(
5697 ("child_right_payload", &vec![20]),
5698 ("child_right_key", &vec![0]),
5699 ("child_right_extra", &vec![200]),
5700 )],
5701 vec![build_table_i32(
5702 ("child_right_payload", &vec![21]),
5703 ("child_right_key", &vec![1]),
5704 ("child_right_extra", &vec![201]),
5705 )],
5706 vec![build_table_i32(
5707 ("child_right_payload", &vec![22]),
5708 ("child_right_key", &vec![2]),
5709 ("child_right_extra", &vec![202]),
5710 )],
5711 vec![build_table_i32(
5712 ("child_right_payload", &vec![23]),
5713 ("child_right_key", &vec![3]),
5714 ("child_right_extra", &vec![203]),
5715 )],
5716 ],
5717 Arc::clone(&child_right_schema),
5718 None,
5719 )?;
5720 let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5721 &[
5722 vec![build_table_i32(
5723 ("parent_payload", &vec![30]),
5724 ("parent_key", &vec![0]),
5725 ("parent_extra", &vec![300]),
5726 )],
5727 vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5728 vec![build_table_i32(
5729 ("parent_payload", &vec![32]),
5730 ("parent_key", &vec![2]),
5731 ("parent_extra", &vec![302]),
5732 )],
5733 vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5734 ],
5735 Arc::clone(&parent_left_schema),
5736 None,
5737 )?;
5738
5739 let child_on = vec![(
5740 Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
5741 Arc::new(Column::new_with_schema(
5742 "child_right_key",
5743 &child_right_schema,
5744 )?) as _,
5745 )];
5746 let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
5747 child_left,
5748 child_right,
5749 child_on,
5750 JoinType::Inner,
5751 PartitionMode::Partitioned,
5752 )?;
5753 let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);
5754
5755 let parent_on = vec![(
5756 Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
5757 Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
5758 )];
5759 let parent_join = HashJoinExec::try_new(
5760 parent_left,
5761 child_join,
5762 parent_on,
5763 None,
5764 &JoinType::RightSemi,
5765 None,
5766 PartitionMode::Partitioned,
5767 NullEquality::NullEqualsNothing,
5768 false,
5769 )?;
5770
5771 let batches = tokio::time::timeout(
5772 std::time::Duration::from_secs(5),
5773 crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
5774 )
5775 .await
5776 .expect("partitioned right-semi join should not hang")?;
5777
5778 assert_batches_sorted_eq!(
5779 [
5780 "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5781 "| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
5782 "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5783 "| 10 | 0 | 100 | 20 | 0 | 200 |",
5784 "| 12 | 2 | 102 | 22 | 2 | 202 |",
5785 "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5786 ],
5787 &batches
5788 );
5789
5790 Ok(())
5791 }
5792
5793 #[tokio::test]
5794 async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
5795 -> Result<()> {
5796 let task_ctx = Arc::new(TaskContext::default());
5797 let (left, right, on) = empty_build_with_probe_error_inputs();
5798
5799 let (join, dynamic_filter) =
5803 hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5804
5805 let stream = join.execute(0, task_ctx)?;
5806 let batches = common::collect(stream).await?;
5807 assert!(batches.is_empty());
5808
5809 dynamic_filter.wait_complete().await;
5810
5811 Ok(())
5812 }
5813
5814 #[tokio::test]
5815 async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> {
5816 let task_ctx = prepare_task_ctx(8192, true);
5817 let (left_schema, right_schema, on) = build_schema_and_on()?;
5818
5819 let left_batch = RecordBatch::try_new(
5820 Arc::clone(&left_schema),
5821 vec![
5822 Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
5823 Arc::new(Int32Array::from(vec![-1, 0, 1])) as ArrayRef,
5824 ],
5825 )?;
5826 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5827
5828 let right_batch = RecordBatch::try_new(
5829 Arc::clone(&right_schema),
5830 vec![
5831 Arc::new(Int32Array::from(vec![10, 20, 30, 40])) as ArrayRef,
5832 Arc::new(Int32Array::from(vec![1, -1, 0, 2])) as ArrayRef,
5833 ],
5834 )?;
5835 let right =
5836 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5837
5838 let (columns, batches, metrics) = join_collect(
5839 left,
5840 right,
5841 on,
5842 &JoinType::Inner,
5843 NullEquality::NullEqualsNothing,
5844 task_ctx,
5845 )
5846 .await?;
5847
5848 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5849
5850 assert_batches_sorted_eq!(
5851 [
5852 "+----+----+----+----+",
5853 "| a1 | b1 | a2 | b1 |",
5854 "+----+----+----+----+",
5855 "| 1 | -1 | 20 | -1 |",
5856 "| 2 | 0 | 30 | 0 |",
5857 "| 3 | 1 | 10 | 1 |",
5858 "+----+----+----+----+",
5859 ],
5860 &batches
5861 );
5862
5863 assert_phj_used(&metrics, true);
5864
5865 Ok(())
5866 }
5867
5868 #[tokio::test]
5869 async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
5870 let task_ctx = prepare_task_ctx(8192, true);
5871 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5872 let batch = RecordBatch::try_new(
5873 Arc::clone(&schema),
5874 vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
5875 )?;
5876 let left = TestMemoryExec::try_new_exec(
5877 &[vec![batch.clone()]],
5878 Arc::clone(&schema),
5879 None,
5880 )?;
5881 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
5882 let on: JoinOn = vec![(
5883 Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
5884 Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
5885 )];
5886 let (_columns, batches, _metrics) = join_collect(
5887 left,
5888 right,
5889 on,
5890 &JoinType::Inner,
5891 NullEquality::NullEqualsNothing,
5892 task_ctx,
5893 )
5894 .await?;
5895 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
5896 assert_eq!(total_rows, 2);
5897 Ok(())
5898 }
5899
5900 #[apply(hash_join_exec_configs)]
5901 #[tokio::test]
5902 async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
5903 batch_size: usize,
5904 use_perfect_hash_join_as_possible: bool,
5905 ) -> Result<()> {
5906 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5907 let (left_schema, right_schema, on) = build_schema_and_on()?;
5908
5909 let left_batch = RecordBatch::try_new(
5910 Arc::clone(&left_schema),
5911 vec![
5912 Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
5913 Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef,
5914 ],
5915 )?;
5916 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5917
5918 let right_batch = RecordBatch::try_new(
5919 Arc::clone(&right_schema),
5920 vec![
5921 Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef,
5922 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5923 ],
5924 )?;
5925 let right =
5926 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5927
5928 let (columns, batches, metrics) = join_collect(
5929 left,
5930 right,
5931 on,
5932 &JoinType::Inner,
5933 NullEquality::NullEqualsNull,
5934 task_ctx,
5935 )
5936 .await?;
5937
5938 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5939 assert_batches_sorted_eq!(
5940 [
5941 "+----+----+----+----+",
5942 "| a1 | b1 | a2 | b1 |",
5943 "+----+----+----+----+",
5944 "| 1 | 10 | 3 | 10 |",
5945 "+----+----+----+----+",
5946 ],
5947 &batches
5948 );
5949
5950 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
5951
5952 Ok(())
5953 }
5954
5955 #[apply(hash_join_exec_configs)]
5956 #[tokio::test]
5957 async fn test_phj_null_equals_nothing_build_probe_all_have_nulls(
5958 batch_size: usize,
5959 use_perfect_hash_join_as_possible: bool,
5960 ) -> Result<()> {
5961 let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
5962 let (left_schema, right_schema, on) = build_schema_and_on()?;
5963
5964 let left_batch = RecordBatch::try_new(
5965 Arc::clone(&left_schema),
5966 vec![
5967 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef,
5968 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5969 ],
5970 )?;
5971 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
5972
5973 let right_batch = RecordBatch::try_new(
5974 Arc::clone(&right_schema),
5975 vec![
5976 Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
5977 Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
5978 ],
5979 )?;
5980 let right =
5981 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
5982
5983 let (columns, batches, metrics) = join_collect(
5984 left,
5985 right,
5986 on,
5987 &JoinType::Inner,
5988 NullEquality::NullEqualsNothing,
5989 task_ctx,
5990 )
5991 .await?;
5992
5993 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
5994 assert_batches_sorted_eq!(
5995 [
5996 "+----+----+----+----+",
5997 "| a1 | b1 | a2 | b1 |",
5998 "+----+----+----+----+",
5999 "| 1 | 10 | 3 | 10 |",
6000 "+----+----+----+----+",
6001 ],
6002 &batches
6003 );
6004
6005 assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
6006
6007 Ok(())
6008 }
6009
6010 #[tokio::test]
6011 async fn test_phj_null_equals_null_build_have_nulls() -> Result<()> {
6012 let task_ctx = prepare_task_ctx(8192, true);
6013 let (left_schema, right_schema, on) = build_schema_and_on()?;
6014
6015 let left_batch = RecordBatch::try_new(
6016 Arc::clone(&left_schema),
6017 vec![
6018 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef,
6019 Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef,
6020 ],
6021 )?;
6022 let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
6023
6024 let right_batch = RecordBatch::try_new(
6025 Arc::clone(&right_schema),
6026 vec![
6027 Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
6028 Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef,
6029 ],
6030 )?;
6031 let right =
6032 TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
6033
6034 let (columns, batches, metrics) = join_collect(
6035 left,
6036 right,
6037 on,
6038 &JoinType::Inner,
6039 NullEquality::NullEqualsNull,
6040 task_ctx,
6041 )
6042 .await?;
6043
6044 assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
6045 assert_batches_sorted_eq!(
6046 [
6047 "+----+----+----+----+",
6048 "| a1 | b1 | a2 | b1 |",
6049 "+----+----+----+----+",
6050 "| 1 | 10 | 3 | 10 |",
6051 "+----+----+----+----+",
6052 ],
6053 &batches
6054 );
6055
6056 assert_phj_used(&metrics, false);
6057
6058 Ok(())
6059 }
6060
6061 #[apply(hash_join_exec_configs)]
6064 #[tokio::test]
6065 async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> {
6066 let task_ctx = prepare_task_ctx(batch_size, false);
6067
6068 let left = build_table_two_cols(
6070 ("c1", &vec![Some(1), Some(2), Some(3), Some(4)]),
6071 ("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]),
6072 );
6073
6074 let right = build_table_two_cols(
6076 ("c2", &vec![Some(1), Some(2), Some(3), None]),
6077 ("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]),
6078 );
6079
6080 let on = vec![(
6081 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6082 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6083 )];
6084
6085 let join = HashJoinExec::try_new(
6087 left,
6088 right,
6089 on,
6090 None,
6091 &JoinType::LeftAnti,
6092 None,
6093 PartitionMode::CollectLeft,
6094 NullEquality::NullEqualsNothing,
6095 true, )?;
6097
6098 let stream = join.execute(0, task_ctx)?;
6099 let batches = common::collect(stream).await?;
6100
6101 allow_duplicates! {
6103 assert_snapshot!(batches_to_sort_string(&batches), @r"
6104 ++
6105 ++
6106 ");
6107 }
6108 Ok(())
6109 }
6110
6111 #[apply(hash_join_exec_configs)]
6114 #[tokio::test]
6115 async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> {
6116 let task_ctx = prepare_task_ctx(batch_size, false);
6117
6118 let left = build_table_two_cols(
6120 ("c1", &vec![Some(1), Some(4), None]),
6121 ("dummy", &vec![Some(10), Some(40), Some(0)]),
6122 );
6123
6124 let right = build_table_two_cols(
6126 ("c2", &vec![Some(1), Some(2), Some(3)]),
6127 ("dummy", &vec![Some(100), Some(200), Some(300)]),
6128 );
6129
6130 let on = vec![(
6131 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6132 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6133 )];
6134
6135 let join = HashJoinExec::try_new(
6137 left,
6138 right,
6139 on,
6140 None,
6141 &JoinType::LeftAnti,
6142 None,
6143 PartitionMode::CollectLeft,
6144 NullEquality::NullEqualsNothing,
6145 true, )?;
6147
6148 let stream = join.execute(0, task_ctx)?;
6149 let batches = common::collect(stream).await?;
6150
6151 allow_duplicates! {
6153 assert_snapshot!(batches_to_sort_string(&batches), @r"
6154 +----+-------+
6155 | c1 | dummy |
6156 +----+-------+
6157 | 4 | 40 |
6158 +----+-------+
6159 ");
6160 }
6161 Ok(())
6162 }
6163
6164 #[apply(hash_join_exec_configs)]
6166 #[tokio::test]
6167 async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> {
6168 let task_ctx = prepare_task_ctx(batch_size, false);
6169
6170 let left = build_table_two_cols(
6172 ("c1", &vec![Some(1), Some(2), Some(4), Some(5)]),
6173 ("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]),
6174 );
6175
6176 let right = build_table_two_cols(
6178 ("c2", &vec![Some(1), Some(2), Some(3)]),
6179 ("dummy", &vec![Some(100), Some(200), Some(300)]),
6180 );
6181
6182 let on = vec![(
6183 Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
6184 Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
6185 )];
6186
6187 let join = HashJoinExec::try_new(
6189 left,
6190 right,
6191 on,
6192 None,
6193 &JoinType::LeftAnti,
6194 None,
6195 PartitionMode::CollectLeft,
6196 NullEquality::NullEqualsNothing,
6197 true, )?;
6199
6200 let stream = join.execute(0, task_ctx)?;
6201 let batches = common::collect(stream).await?;
6202
6203 allow_duplicates! {
6205 assert_snapshot!(batches_to_sort_string(&batches), @r"
6206 +----+-------+
6207 | c1 | dummy |
6208 +----+-------+
6209 | 4 | 40 |
6210 | 5 | 50 |
6211 +----+-------+
6212 ");
6213 }
6214 Ok(())
6215 }
6216
6217 #[tokio::test]
6219 async fn test_null_aware_validation_wrong_join_type() {
6220 let left =
6221 build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)]));
6222 let right =
6223 build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)]));
6224
6225 let on = vec![(
6226 Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _,
6227 Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _,
6228 )];
6229
6230 let result = HashJoinExec::try_new(
6232 left,
6233 right,
6234 on,
6235 None,
6236 &JoinType::Inner,
6237 None,
6238 PartitionMode::CollectLeft,
6239 NullEquality::NullEqualsNothing,
6240 true, );
6242
6243 assert!(result.is_err());
6244 assert!(
6245 result
6246 .unwrap_err()
6247 .to_string()
6248 .contains("null_aware can only be true for LeftAnti joins")
6249 );
6250 }
6251
6252 #[tokio::test]
6254 async fn test_null_aware_validation_multi_column() {
6255 let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3]));
6256 let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3]));
6257
6258 let on = vec![
6260 (
6261 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
6262 Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _,
6263 ),
6264 (
6265 Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _,
6266 Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _,
6267 ),
6268 ];
6269
6270 let result = HashJoinExec::try_new(
6272 left,
6273 right,
6274 on,
6275 None,
6276 &JoinType::LeftAnti,
6277 None,
6278 PartitionMode::CollectLeft,
6279 NullEquality::NullEqualsNothing,
6280 true, );
6282
6283 assert!(result.is_err());
6284 assert!(
6285 result
6286 .unwrap_err()
6287 .to_string()
6288 .contains("null_aware anti join only supports single column join key")
6289 );
6290 }
6291
6292 #[test]
6293 fn test_lr_is_preserved() {
6294 assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
6295 assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
6296 assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
6297 assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
6298 assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true));
6299 assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true));
6300 assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
6301 assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true));
6302 assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true));
6303 assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
6304 }
6305
6306 #[test]
6307 fn test_with_dynamic_filter() -> Result<()> {
6308 let (_, _, on) = build_schema_and_on()?;
6309 let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1]));
6310 let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1]));
6311
6312 let join = HashJoinExec::try_new(
6313 left,
6314 right,
6315 on,
6316 None,
6317 &JoinType::Inner,
6318 None,
6319 PartitionMode::CollectLeft,
6320 NullEquality::NullEqualsNothing,
6321 false,
6322 )?;
6323 assert!(join.dynamic_filter_expr().is_none());
6324
6325 let df = Arc::new(DynamicFilterPhysicalExpr::new(
6326 vec![Arc::new(Column::new("b1", 1)) as _],
6327 lit(true),
6328 ));
6329 let join = join.with_dynamic_filter_expr(Arc::clone(&df))?;
6330
6331 let restored = join
6332 .dynamic_filter_expr()
6333 .expect("should have dynamic filter");
6334 assert_eq!(
6335 restored
6336 .expression_id()
6337 .expect("DynamicFilterPhysicalExpr always has an expression_id"),
6338 df.expression_id()
6339 .expect("DynamicFilterPhysicalExpr always has an expression_id"),
6340 );
6341 Ok(())
6342 }
6343
6344 #[test]
6345 fn test_with_dynamic_filter_rejects_invalid_columns() -> Result<()> {
6346 let (_, _, on) = build_schema_and_on()?;
6347 let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1]));
6348 let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1]));
6349
6350 let join = HashJoinExec::try_new(
6351 left,
6352 right,
6353 on,
6354 None,
6355 &JoinType::Inner,
6356 None,
6357 PartitionMode::CollectLeft,
6358 NullEquality::NullEqualsNothing,
6359 false,
6360 )?;
6361
6362 let df = Arc::new(DynamicFilterPhysicalExpr::new(
6364 vec![Arc::new(Column::new("bad", 99)) as _],
6365 lit(true),
6366 ));
6367 assert!(join.with_dynamic_filter_expr(df).is_err());
6368 Ok(())
6369 }
6370}