use std::collections::HashSet;
use std::fmt;
use std::mem::size_of;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::{any::Any, vec};
use crate::ExecutionPlanProperties;
use crate::execution_plan::{
EmissionType, boundedness_from_children, has_same_children_properties,
stub_properties,
};
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::Map;
use crate::joins::array_map::ArrayMap;
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
use crate::joins::hash_join::shared_bounds::{
ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
use crate::joins::utils::{
OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
swap_join_projection, update_hash,
};
use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
use crate::metrics::{Count, MetricBuilder};
use crate::projection::{
EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
try_pushdown_through_join,
};
use crate::repartition::REPARTITION_RANDOM_STATE;
use crate::spill::get_record_batch_memory_size;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
common::can_project,
joins::utils::{
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
build_join_schema, check_join_is_valid, estimate_join_statistics,
need_produce_result_in_final, symmetric_join_output_partitioning,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
};
use arrow::array::{ArrayRef, BooleanBufferBuilder};
use arrow::compute::concat_batches;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_schema::{DataType, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
plan_err, project_schema,
};
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_expr::Accumulator;
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::equivalence::{
ProjectionMapping, join_equivalence_properties,
};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use ahash::RandomState;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use parking_lot::Mutex;
use super::partitioned_hash_eval::SeededRandomState;
pub(crate) const HASH_JOIN_SEED: SeededRandomState =
SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";
#[expect(clippy::too_many_arguments)]
fn try_create_array_map(
bounds: &Option<PartitionBounds>,
schema: &SchemaRef,
batches: &[RecordBatch],
on_left: &[PhysicalExprRef],
reservation: &mut MemoryReservation,
perfect_hash_join_small_build_threshold: usize,
perfect_hash_join_min_key_density: f64,
null_equality: NullEquality,
) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
if on_left.len() != 1 {
return Ok(None);
}
if null_equality == NullEquality::NullEqualsNull {
for batch in batches.iter() {
let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
if arrays[0].null_count() > 0 {
return Ok(None);
}
}
}
let (min_val, max_val) = if let Some(bounds) = bounds {
let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) {
(cb.min.clone(), cb.max.clone())
} else {
return Ok(None);
};
if min_val.is_null() || max_val.is_null() {
return Ok(None);
}
if min_val > max_val {
return internal_err!("min_val>max_val");
}
if let Some((mi, ma)) =
ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val))
{
(mi, ma)
} else {
return Ok(None);
}
} else {
return Ok(None);
};
let range = ArrayMap::calculate_range(min_val, max_val);
let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
if num_row >= u32::MAX as usize {
return Ok(None);
}
if range == usize::MAX as u64 {
return Ok(None);
}
let dense_ratio = (num_row as f64) / ((range + 1) as f64);
if range >= perfect_hash_join_small_build_threshold as u64
&& dense_ratio <= perfect_hash_join_min_key_density
{
return Ok(None);
}
let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
reservation.try_grow(mem_size)?;
let batch = concat_batches(schema, batches)?;
let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;
let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
Ok(Some((array_map, batch, left_values)))
}
pub(super) struct JoinLeftData {
pub(super) map: Arc<Map>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
_reservation: MemoryReservation,
pub(super) bounds: Option<PartitionBounds>,
pub(super) membership: PushdownStrategy,
pub(super) probe_side_non_empty: AtomicBool,
pub(super) probe_side_has_null: AtomicBool,
}
impl JoinLeftData {
pub(super) fn map(&self) -> &Map {
&self.map
}
pub(super) fn batch(&self) -> &RecordBatch {
&self.batch
}
pub(super) fn values(&self) -> &[ArrayRef] {
&self.values
}
pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
&self.visited_indices_bitmap
}
pub(super) fn membership(&self) -> &PushdownStrategy {
&self.membership
}
pub(super) fn report_probe_completed(&self) -> bool {
self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
}
}
pub struct HashJoinExecBuilder {
exec: HashJoinExec,
preserve_properties: bool,
}
impl HashJoinExecBuilder {
pub fn new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
join_type: JoinType,
) -> Self {
Self {
exec: HashJoinExec {
left,
right,
on,
filter: None,
join_type,
left_fut: Default::default(),
random_state: HASH_JOIN_SEED,
mode: PartitionMode::Auto,
fetch: None,
metrics: ExecutionPlanMetricsSet::new(),
projection: None,
column_indices: vec![],
null_equality: NullEquality::NullEqualsNothing,
null_aware: false,
dynamic_filter: None,
cache: stub_properties(),
join_schema: Arc::new(Schema::empty()),
},
preserve_properties: false,
}
}
pub fn with_type(mut self, join_type: JoinType) -> Self {
self.exec.join_type = join_type;
self.preserve_properties = false;
self
}
pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
self.with_projection_ref(projection.map(Into::into))
}
pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self {
self.exec.projection = projection;
self.preserve_properties = false;
self
}
pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
self.exec.filter = filter;
self
}
pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) -> Self {
self.exec.on = on;
self.preserve_properties = false;
self
}
pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
self.exec.mode = mode;
self.preserve_properties = false;
self
}
pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
self.exec.null_equality = null_equality;
self
}
pub fn with_null_aware(mut self, null_aware: bool) -> Self {
self.exec.null_aware = null_aware;
self
}
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.exec.fetch = fetch;
self
}
pub fn recompute_properties(mut self) -> Self {
self.preserve_properties = false;
self
}
pub fn with_new_children(
mut self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Self> {
assert_or_internal_err!(
children.len() == 2,
"wrong number of children passed into `HashJoinExecBuilder`"
);
self.preserve_properties &= has_same_children_properties(&self.exec, &children)?;
self.exec.right = children.swap_remove(1);
self.exec.left = children.swap_remove(0);
Ok(self)
}
pub fn reset_state(mut self) -> Self {
self.exec.left_fut = Default::default();
self.exec.dynamic_filter = None;
self.exec.metrics = ExecutionPlanMetricsSet::new();
self
}
pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
self.build().map(|p| Arc::new(p) as _)
}
pub fn build(self) -> Result<HashJoinExec> {
let Self {
exec,
preserve_properties,
} = self;
if exec.null_aware {
let join_type = exec.join_type();
if !matches!(join_type, JoinType::LeftAnti) {
return plan_err!(
"null_aware can only be true for LeftAnti joins, got {join_type}"
);
}
let on = exec.on();
if on.len() != 1 {
return plan_err!(
"null_aware anti join only supports single column join key, got {} columns",
on.len()
);
}
}
if preserve_properties {
return Ok(exec);
}
let HashJoinExec {
left,
right,
on,
filter,
join_type,
left_fut,
random_state,
mode,
metrics,
projection,
null_equality,
null_aware,
dynamic_filter,
fetch,
join_schema: _,
column_indices: _,
cache: _,
} = exec;
let left_schema = left.schema();
let right_schema = right.schema();
if on.is_empty() {
return plan_err!("On constraints in HashJoinExec should be non-empty");
}
check_join_is_valid(&left_schema, &right_schema, &on)?;
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, &join_type);
let join_schema = Arc::new(join_schema);
can_project(&join_schema, projection.as_deref())?;
let cache = HashJoinExec::compute_properties(
&left,
&right,
&join_schema,
join_type,
&on,
mode,
projection.as_deref(),
)?;
Ok(HashJoinExec {
left,
right,
on,
filter,
join_type,
join_schema,
left_fut,
random_state,
mode,
metrics,
projection,
column_indices,
null_equality,
null_aware,
cache: Arc::new(cache),
dynamic_filter,
fetch,
})
}
fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
self.exec.dynamic_filter = filter;
self
}
}
impl From<&HashJoinExec> for HashJoinExecBuilder {
fn from(exec: &HashJoinExec) -> Self {
Self {
exec: HashJoinExec {
left: Arc::clone(exec.left()),
right: Arc::clone(exec.right()),
on: exec.on.clone(),
filter: exec.filter.clone(),
join_type: exec.join_type,
join_schema: Arc::clone(&exec.join_schema),
left_fut: Arc::clone(&exec.left_fut),
random_state: exec.random_state.clone(),
mode: exec.mode,
metrics: exec.metrics.clone(),
projection: exec.projection.clone(),
column_indices: exec.column_indices.clone(),
null_equality: exec.null_equality,
null_aware: exec.null_aware,
cache: Arc::clone(&exec.cache),
dynamic_filter: exec.dynamic_filter.clone(),
fetch: exec.fetch,
},
preserve_properties: true,
}
}
}
#[expect(rustdoc::private_intra_doc_links)]
pub struct HashJoinExec {
pub left: Arc<dyn ExecutionPlan>,
pub right: Arc<dyn ExecutionPlan>,
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
join_schema: SchemaRef,
left_fut: Arc<OnceAsync<JoinLeftData>>,
random_state: SeededRandomState,
pub mode: PartitionMode,
metrics: ExecutionPlanMetricsSet,
pub projection: Option<ProjectionRef>,
column_indices: Vec<ColumnIndex>,
pub null_equality: NullEquality,
pub null_aware: bool,
cache: Arc<PlanProperties>,
dynamic_filter: Option<HashJoinExecDynamicFilter>,
fetch: Option<usize>,
}
#[derive(Clone)]
struct HashJoinExecDynamicFilter {
filter: Arc<DynamicFilterPhysicalExpr>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}
impl fmt::Debug for HashJoinExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HashJoinExec")
.field("left", &self.left)
.field("right", &self.right)
.field("on", &self.on)
.field("filter", &self.filter)
.field("join_type", &self.join_type)
.field("join_schema", &self.join_schema)
.field("left_fut", &self.left_fut)
.field("random_state", &self.random_state)
.field("mode", &self.mode)
.field("metrics", &self.metrics)
.field("projection", &self.projection)
.field("column_indices", &self.column_indices)
.field("null_equality", &self.null_equality)
.field("cache", &self.cache)
.finish()
}
}
impl EmbeddedProjection for HashJoinExec {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
self.with_projection(projection)
}
}
impl HashJoinExec {
#[expect(clippy::too_many_arguments)]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
partition_mode: PartitionMode,
null_equality: NullEquality,
null_aware: bool,
) -> Result<Self> {
HashJoinExecBuilder::new(left, right, on, *join_type)
.with_filter(filter)
.with_projection(projection)
.with_partition_mode(partition_mode)
.with_null_equality(null_equality)
.with_null_aware(null_aware)
.build()
}
pub fn builder(&self) -> HashJoinExecBuilder {
self.into()
}
fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
}
fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
if self.join_type != JoinType::Inner
|| !config.optimizer.enable_join_dynamic_filter_pushdown
{
return false;
}
if config.optimizer.preserve_file_partitions > 0
&& self.mode == PartitionMode::Partitioned
{
return false;
}
true
}
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
}
pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
&self.right
}
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
&self.on
}
pub fn filter(&self) -> Option<&JoinFilter> {
self.filter.as_ref()
}
pub fn join_type(&self) -> &JoinType {
&self.join_type
}
pub fn join_schema(&self) -> &SchemaRef {
&self.join_schema
}
pub fn partition_mode(&self) -> &PartitionMode {
&self.mode
}
pub fn null_equality(&self) -> NullEquality {
self.null_equality
}
#[doc(hidden)]
pub fn dynamic_filter_for_test(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
vec![
false,
matches!(
join_type,
JoinType::Inner
| JoinType::Right
| JoinType::RightAnti
| JoinType::RightSemi
| JoinType::RightMark
),
]
}
pub fn probe_side() -> JoinSide {
JoinSide::Right
}
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
}
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
let projection = projection.map(Into::into);
can_project(&self.schema(), projection.as_deref())?;
let projection =
combine_projections(projection.as_ref(), self.projection.as_ref())?;
self.builder().with_projection_ref(projection).build()
}
fn compute_properties(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
join_type: JoinType,
on: JoinOnRef,
mode: PartitionMode,
projection: Option<&[usize]>,
) -> Result<PlanProperties> {
let mut eq_properties = join_equivalence_properties(
left.equivalence_properties().clone(),
right.equivalence_properties().clone(),
&join_type,
Arc::clone(schema),
&Self::maintains_input_order(join_type),
Some(Self::probe_side()),
on,
)?;
let mut output_partitioning = match mode {
PartitionMode::CollectLeft => {
asymmetric_join_output_partitioning(left, right, &join_type)?
}
PartitionMode::Auto => Partitioning::UnknownPartitioning(
right.output_partitioning().partition_count(),
),
PartitionMode::Partitioned => {
symmetric_join_output_partitioning(left, right, &join_type)?
}
};
let emission_type = if left.boundedness().is_unbounded() {
EmissionType::Final
} else if right.pipeline_behavior() == EmissionType::Incremental {
match join_type {
JoinType::Inner
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::Right
| JoinType::RightAnti
| JoinType::RightMark => EmissionType::Incremental,
JoinType::Left
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::Full => EmissionType::Both,
}
} else {
right.pipeline_behavior()
};
if let Some(projection) = projection {
let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
let out_schema = project_schema(schema, Some(&projection))?;
output_partitioning =
output_partitioning.project(&projection_mapping, &eq_properties);
eq_properties = eq_properties.project(&projection_mapping, out_schema);
}
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
emission_type,
boundedness_from_children([left, right]),
))
}
pub fn swap_inputs(
&self,
partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
let new_join = self
.builder()
.with_type(self.join_type.swap())
.with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
.with_on(
self.on()
.iter()
.map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
.collect(),
)
.with_filter(self.filter().map(JoinFilter::swap))
.with_projection(swap_join_projection(
left.schema().fields().len(),
right.schema().fields().len(),
self.projection.as_deref(),
self.join_type(),
))
.with_partition_mode(partition_mode)
.build()?;
if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Ok(Arc::new(new_join))
} else {
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
}
}
}
impl DisplayAs for HashJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.contains_projection() {
format!(
", projection=[{}]",
self.projection
.as_ref()
.unwrap()
.iter()
.map(|index| format!(
"{}@{}",
self.join_schema.fields().get(*index).unwrap().name(),
index
))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
let display_null_equality =
if self.null_equality() == NullEquality::NullEqualsNull {
", NullsEqual: true"
} else {
""
};
let display_fetch = self
.fetch
.map_or_else(String::new, |f| format!(", fetch={f}"));
let on = self
.on
.iter()
.map(|(c1, c2)| format!("({c1}, {c2})"))
.collect::<Vec<String>>()
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
display_fetch,
)
}
DisplayFormatType::TreeRender => {
let on = self
.on
.iter()
.map(|(c1, c2)| {
format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
})
.collect::<Vec<String>>()
.join(", ");
if *self.join_type() != JoinType::Inner {
writeln!(f, "join_type={:?}", self.join_type)?;
}
writeln!(f, "on={on}")?;
if self.null_equality() == NullEquality::NullEqualsNull {
writeln!(f, "NullsEqual: true")?;
}
if let Some(filter) = self.filter.as_ref() {
writeln!(f, "filter={filter}")?;
}
if let Some(fetch) = self.fetch {
writeln!(f, "fetch={fetch}")?;
}
Ok(())
}
}
}
}
impl ExecutionPlan for HashJoinExec {
fn name(&self) -> &'static str {
"HashJoinExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
PartitionMode::CollectLeft => vec![
Distribution::SinglePartition,
Distribution::UnspecifiedDistribution,
],
PartitionMode::Partitioned => {
let (left_expr, right_expr) = self
.on
.iter()
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
vec![
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
]
}
PartitionMode::Auto => vec![
Distribution::UnspecifiedDistribution,
Distribution::UnspecifiedDistribution,
],
}
}
fn maintains_input_order(&self) -> Vec<bool> {
Self::maintains_input_order(self.join_type)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.left, &self.right]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.builder().with_new_children(children)?.build_exec()
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
self.builder().reset_state().build_exec()
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let on_left = self
.on
.iter()
.map(|on| Arc::clone(&on.0))
.collect::<Vec<_>>();
let left_partitions = self.left.output_partitioning().partition_count();
let right_partitions = self.right.output_partitioning().partition_count();
assert_or_internal_err!(
self.mode != PartitionMode::Partitioned
|| left_partitions == right_partitions,
"Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
consider using RepartitionExec"
);
assert_or_internal_err!(
self.mode != PartitionMode::CollectLeft || left_partitions == 1,
"Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);
let enable_dynamic_filter_pushdown = self
.allow_join_dynamic_filter_pushdown(context.session_config().options())
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let array_map_created_count = MetricBuilder::new(&self.metrics)
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
Ok(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
Arc::clone(context.session_config().options()),
self.null_equality,
array_map_created_count,
))
})?,
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;
let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());
OnceFut::new(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown,
Arc::clone(context.session_config().options()),
self.null_equality,
array_map_created_count,
))
}
PartitionMode::Auto => {
return plan_err!(
"Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
PartitionMode::Auto
);
}
};
let batch_size = context.session_config().batch_size();
let repartition_random_state = REPARTITION_RANDOM_STATE;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();
let right_stream = self.right.execute(partition, context)?;
let column_indices_after_projection = match self.projection.as_ref() {
Some(projection) => projection
.iter()
.map(|i| self.column_indices[*i].clone())
.collect(),
None => self.column_indices.clone(),
};
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Ok(Box::pin(HashJoinStream::new(
partition,
self.schema(),
on_right,
self.filter.clone(),
self.join_type,
right_stream,
self.random_state.random_state().clone(),
join_metrics,
column_indices_after_projection,
self.null_equality,
HashJoinStreamState::WaitBuildSide,
BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
vec![],
self.right.output_ordering().is_some(),
build_accumulator,
self.mode,
self.null_aware,
self.fetch,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
}
let stats = estimate_join_statistics(
self.left.partition_statistics(None)?,
self.right.partition_statistics(None)?,
&self.on,
&self.join_type,
&self.join_schema,
)?;
let stats = stats.project(self.projection.as_ref());
stats.with_fetch(self.fetch, 0, 1)
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if self.contains_projection() {
return Ok(None);
}
let schema = self.schema();
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
join_on,
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
self.on(),
&schema,
self.filter(),
)? {
self.builder()
.with_new_children(vec![
Arc::new(projected_left_child),
Arc::new(projected_right_child),
])?
.with_on(join_on)
.with_filter(join_filter)
.with_projection(None)
.build_exec()
.map(Some)
} else {
try_embed_projection(projection, self)
}
}
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterDescription> {
let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);
let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
Some(projection) => projection
.iter()
.map(|i| self.column_indices[*i].clone())
.collect(),
None => self.column_indices.clone(),
};
let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new());
column_indices
.iter()
.enumerate()
.for_each(|(output_idx, ci)| {
match ci.side {
JoinSide::Left => left_allowed.insert(output_idx),
JoinSide::Right => right_allowed.insert(output_idx),
JoinSide::None => false,
};
});
match self.join_type {
JoinType::LeftSemi | JoinType::LeftAnti => {
let left_key_indices: HashSet<usize> = self
.on
.iter()
.filter_map(|(left_key, _)| {
left_key
.as_any()
.downcast_ref::<Column>()
.map(|c| c.index())
})
.collect();
for (output_idx, ci) in column_indices.iter().enumerate() {
if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) {
right_allowed.insert(output_idx);
}
}
}
JoinType::RightSemi | JoinType::RightAnti => {
let right_key_indices: HashSet<usize> = self
.on
.iter()
.filter_map(|(_, right_key)| {
right_key
.as_any()
.downcast_ref::<Column>()
.map(|c| c.index())
})
.collect();
for (output_idx, ci) in column_indices.iter().enumerate() {
if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index)
{
left_allowed.insert(output_idx);
}
}
}
_ => {}
}
let left_child = if left_preserved {
ChildFilterDescription::from_child_with_allowed_indices(
&parent_filters,
left_allowed,
self.left(),
)?
} else {
ChildFilterDescription::all_unsupported(&parent_filters)
};
let mut right_child = if right_preserved {
ChildFilterDescription::from_child_with_allowed_indices(
&parent_filters,
right_allowed,
self.right(),
)?
} else {
ChildFilterDescription::all_unsupported(&parent_filters)
};
if phase == FilterPushdownPhase::Post
&& self.allow_join_dynamic_filter_pushdown(config)
{
let dynamic_filter = Self::create_dynamic_filter(&self.on);
right_child = right_child.with_self_filter(dynamic_filter);
}
Ok(FilterDescription::new()
.with_child(left_child)
.with_child(right_child))
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
let predicate = Arc::clone(&filter.predicate);
if let Ok(dynamic_filter) =
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
{
let new_node = self
.builder()
.with_dynamic_filter(Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
}))
.build_exec()?;
result = result.with_updated_node(new_node);
}
}
Ok(result)
}
fn supports_limit_pushdown(&self) -> bool {
false
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
self.builder()
.with_fetch(limit)
.build()
.ok()
.map(|exec| Arc::new(exec) as _)
}
}
fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
match join_type {
JoinType::Inner => (true, true),
JoinType::Left => (true, false),
JoinType::Right => (false, true),
JoinType::Full => (false, false),
JoinType::LeftSemi | JoinType::LeftAnti => (true, true),
JoinType::RightSemi | JoinType::RightAnti => (true, true),
JoinType::LeftMark => (true, false),
JoinType::RightMark => (false, true),
}
}
struct CollectLeftAccumulator {
expr: Arc<dyn PhysicalExpr>,
min: MinAccumulator,
max: MaxAccumulator,
}
impl CollectLeftAccumulator {
fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
fn dictionary_value_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Dictionary(_, value_type) => {
dictionary_value_type(value_type.as_ref())
}
_ => data_type.clone(),
}
}
let data_type = expr
.data_type(schema)
.map(|dt| dictionary_value_type(&dt))?;
Ok(Self {
expr,
min: MinAccumulator::try_new(&data_type)?,
max: MaxAccumulator::try_new(&data_type)?,
})
}
fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
self.min.update_batch(std::slice::from_ref(&array))?;
self.max.update_batch(std::slice::from_ref(&array))?;
Ok(())
}
fn evaluate(mut self) -> Result<ColumnBounds> {
Ok(ColumnBounds::new(
self.min.evaluate()?,
self.max.evaluate()?,
))
}
}
struct BuildSideState {
batches: Vec<RecordBatch>,
num_rows: usize,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
}
impl BuildSideState {
fn try_new(
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
on_left: Vec<Arc<dyn PhysicalExpr>>,
schema: &SchemaRef,
should_compute_dynamic_filters: bool,
) -> Result<Self> {
Ok(Self {
batches: Vec::new(),
num_rows: 0,
metrics,
reservation,
bounds_accumulators: should_compute_dynamic_filters
.then(|| {
on_left
.into_iter()
.map(|expr| CollectLeftAccumulator::try_new(expr, schema))
.collect::<Result<Vec<_>>>()
})
.transpose()?,
})
}
}
fn should_collect_min_max_for_perfect_hash(
on_left: &[PhysicalExprRef],
schema: &SchemaRef,
) -> Result<bool> {
if on_left.len() != 1 {
return Ok(false);
}
let expr = &on_left[0];
let data_type = expr.data_type(schema)?;
Ok(ArrayMap::is_supported_type(&data_type))
}
#[expect(clippy::too_many_arguments)]
async fn collect_left_input(
random_state: RandomState,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
should_compute_dynamic_filters: bool,
config: Arc<ConfigOptions>,
null_equality: NullEquality,
array_map_created_count: Count,
) -> Result<JoinLeftData> {
let schema = left_stream.schema();
let should_collect_min_max_for_phj =
should_collect_min_max_for_perfect_hash(&on_left, &schema)?;
let initial = BuildSideState::try_new(
metrics,
reservation,
on_left.clone(),
&schema,
should_compute_dynamic_filters || should_collect_min_max_for_phj,
)?;
let state = left_stream
.try_fold(initial, |mut state, batch| async move {
if let Some(ref mut accumulators) = state.bounds_accumulators {
for accumulator in accumulators {
accumulator.update_batch(&batch)?;
}
}
let batch_size = get_record_batch_memory_size(&batch);
state.reservation.try_grow(batch_size)?;
state.metrics.build_mem_used.add(batch_size);
state.metrics.build_input_batches.add(1);
state.metrics.build_input_rows.add(batch.num_rows());
state.num_rows += batch.num_rows();
state.batches.push(batch);
Ok(state)
})
.await?;
let BuildSideState {
batches,
num_rows,
metrics,
mut reservation,
bounds_accumulators,
} = state;
let mut bounds = match bounds_accumulators {
Some(accumulators) if num_rows > 0 => {
let bounds = accumulators
.into_iter()
.map(CollectLeftAccumulator::evaluate)
.collect::<Result<Vec<_>>>()?;
Some(PartitionBounds::new(bounds))
}
_ => None,
};
let (join_hash_map, batch, left_values) =
if let Some((array_map, batch, left_value)) = try_create_array_map(
&bounds,
&schema,
&batches,
&on_left,
&mut reservation,
config.execution.perfect_hash_join_small_build_threshold,
config.execution.perfect_hash_join_min_key_density,
null_equality,
)? {
array_map_created_count.add(1);
metrics.build_mem_used.add(array_map.size());
(Map::ArrayMap(array_map), batch, left_value)
} else {
let fixed_size_u32 = size_of::<JoinHashMapU32>();
let fixed_size_u64 = size_of::<JoinHashMapU64>();
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU64::with_capacity(num_rows))
} else {
let estimated_hashtable_size =
estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU32::with_capacity(num_rows))
};
let mut hashes_buffer = Vec::new();
let mut offset = 0;
let batches_iter = batches.iter().rev();
for batch in batches_iter.clone() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
&on_left,
batch,
&mut *hashmap,
offset,
&random_state,
&mut hashes_buffer,
0,
true,
)?;
offset += batch.num_rows();
}
let batch = concat_batches(&schema, batches_iter.clone())?;
let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
(Map::HashMap(hashmap), batch, left_values)
};
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
bitmap_buffer.append_n(num_rows, false);
bitmap_buffer
} else {
BooleanBufferBuilder::new(0)
};
let map = Arc::new(join_hash_map);
let membership = if num_rows == 0 {
PushdownStrategy::Empty
} else {
let estimated_size = left_values
.iter()
.map(|arr| arr.get_array_memory_size())
.sum::<usize>();
if left_values.is_empty()
|| left_values[0].is_empty()
|| estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
|| map.num_of_distinct_key()
> config
.optimizer
.hash_join_inlist_pushdown_max_distinct_values
{
PushdownStrategy::Map(Arc::clone(&map))
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
PushdownStrategy::InList(in_list_values)
} else {
PushdownStrategy::Map(Arc::clone(&map))
}
};
if should_collect_min_max_for_phj && !should_compute_dynamic_filters {
bounds = None;
}
let data = JoinLeftData {
map,
batch,
values: left_values,
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
probe_threads_counter: AtomicUsize::new(probe_threads_count),
_reservation: reservation,
bounds,
membership,
probe_side_non_empty: AtomicBool::new(false),
probe_side_has_null: AtomicBool::new(false),
};
Ok(data)
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) {
if use_phj {
assert!(
metrics
.sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
.expect("should have array_map_created_count metrics")
.as_usize()
>= 1
);
} else {
assert_eq!(
metrics
.sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME)
.map(|v| v.as_usize())
.unwrap_or(0),
0
)
}
}
fn build_schema_and_on() -> Result<(SchemaRef, SchemaRef, JoinOn)> {
let left_schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::Int32, true),
Field::new("b1", DataType::Int32, true),
]));
let right_schema = Arc::new(Schema::new(vec![
Field::new("a2", DataType::Int32, true),
Field::new("b1", DataType::Int32, true),
]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left_schema)?) as _,
Arc::new(Column::new_with_schema("b1", &right_schema)?) as _,
)];
Ok((left_schema, right_schema, on))
}
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::joins::hash_join::stream::lookup_join_hashmap;
use crate::test::{TestMemoryExec, assert_join_metrics};
use crate::{
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
test::exec::MockExec,
};
use arrow::array::{
Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{
ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
exec_err, internal_err,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use hashbrown::HashTable;
use insta::{allow_duplicates, assert_snapshot};
use rstest::*;
use rstest_reuse::*;
fn div_ceil(a: usize, b: usize) -> usize {
a.div_ceil(b)
}
#[template]
#[rstest]
fn hash_join_exec_configs(
#[values(8192, 10, 5, 2, 1)] batch_size: usize,
#[values(true, false)] use_perfect_hash_join_as_possible: bool,
) {
}
fn prepare_task_ctx(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Arc<TaskContext> {
let mut session_config = SessionConfig::default().with_batch_size(batch_size);
if use_perfect_hash_join_as_possible {
session_config
.options_mut()
.execution
.perfect_hash_join_small_build_threshold = 819200;
session_config
.options_mut()
.execution
.perfect_hash_join_min_key_density = 0.0;
} else {
session_config
.options_mut()
.execution
.perfect_hash_join_small_build_threshold = 0;
session_config
.options_mut()
.execution
.perfect_hash_join_min_key_density = f64::INFINITY;
}
Arc::new(TaskContext::default().with_session_config(session_config))
}
fn build_table(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
c: (&str, &Vec<i32>),
) -> Arc<dyn ExecutionPlan> {
let batch = build_table_i32(a, b, c);
let schema = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
}
fn build_table_two_cols(
a: (&str, &Vec<Option<i32>>),
b: (&str, &Vec<Option<i32>>),
) -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new(a.0, DataType::Int32, true),
Field::new(b.0, DataType::Int32, true),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(a.1.clone())),
Arc::new(Int32Array::from(b.1.clone())),
],
)
.unwrap();
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
}
fn join(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
) -> Result<HashJoinExec> {
HashJoinExec::try_new(
left,
right,
on,
None,
join_type,
None,
PartitionMode::CollectLeft,
null_equality,
false,
)
}
fn join_with_filter(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: JoinFilter,
join_type: &JoinType,
null_equality: NullEquality,
) -> Result<HashJoinExec> {
HashJoinExec::try_new(
left,
right,
on,
Some(filter),
join_type,
None,
PartitionMode::CollectLeft,
null_equality,
false,
)
}
async fn join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
let join = join(left, right, on, join_type, null_equality)?;
let columns_header = columns(&join.schema());
let stream = join.execute(0, context)?;
let batches = common::collect(stream).await?;
let metrics = join.metrics().unwrap();
Ok((columns_header, batches, metrics))
}
async fn partitioned_join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
join_collect_with_partition_mode(
left,
right,
on,
join_type,
PartitionMode::Partitioned,
null_equality,
context,
)
.await
}
async fn join_collect_with_partition_mode(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
partition_mode: PartitionMode,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
let partition_count = 4;
let (left_expr, right_expr) = on
.iter()
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
left,
Partitioning::Hash(left_expr, partition_count),
)?),
PartitionMode::Auto => {
return internal_err!("Unexpected PartitionMode::Auto in join tests");
}
};
let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
PartitionMode::CollectLeft => {
let partition_column_name = right.schema().field(0).name().clone();
let partition_expr = vec![Arc::new(Column::new_with_schema(
&partition_column_name,
&right.schema(),
)?) as _];
Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(partition_expr, partition_count),
)?) as _
}
PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(right_expr, partition_count),
)?),
PartitionMode::Auto => {
return internal_err!("Unexpected PartitionMode::Auto in join tests");
}
};
let join = HashJoinExec::try_new(
left_repartitioned,
right_repartitioned,
on,
None,
join_type,
None,
partition_mode,
null_equality,
false,
)?;
let columns = columns(&join.schema());
let mut batches = vec![];
for i in 0..partition_count {
let stream = join.execute(i, Arc::clone(&context))?;
let more_batches = common::collect(stream).await?;
batches.extend(
more_batches
.into_iter()
.filter(|b| b.num_rows() > 0)
.collect::<Vec<_>>(),
);
}
let metrics = join.metrics().unwrap();
Ok((columns, batches, metrics))
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_inner_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn partitioned_join_inner_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[tokio::test]
async fn join_inner_one_no_shared_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_inner_one_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![0, 3, 2, 1]),
("b1", &vec![4, 5, 5, 4]),
("c1", &vec![6, 9, 8, 7]),
);
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 3 | 5 | 9 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 0 | 4 | 6 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 4);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_inner_two(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 2]),
("b2", &vec![1, 2, 2]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a1", &vec![1, 2, 3]),
("b2", &vec![1, 2, 2]),
("c2", &vec![70, 80, 90]),
);
let on = vec![
(
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
),
(
Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
),
];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(3, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(9, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b2 | c1 | a1 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 1 | 7 | 1 | 1 | 70 |
| 2 | 2 | 8 | 2 | 2 | 80 |
| 2 | 2 | 9 | 2 | 2 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_inner_one_two_parts_left(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let batch1 = build_table_i32(
("a1", &vec![1, 2]),
("b2", &vec![1, 2]),
("c1", &vec![7, 8]),
);
let batch2 =
build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
let schema = batch1.schema();
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a1", &vec![1, 2, 3]),
("b2", &vec![1, 2, 2]),
("c2", &vec![70, 80, 90]),
);
let on = vec![
(
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
),
(
Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
),
];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(3, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(9, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b2 | c1 | a1 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 1 | 7 | 1 | 1 | 70 |
| 2 | 2 | 8 | 2 | 2 | 80 |
| 2 | 2 | 9 | 2 | 2 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch1 = build_table_i32(
("a1", &vec![0, 3]),
("b1", &vec![4, 5]),
("c1", &vec![6, 9]),
);
let batch2 = build_table_i32(
("a1", &vec![2, 1]),
("b1", &vec![5, 4]),
("c1", &vec![8, 7]),
);
let schema = batch1.schema();
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 3 | 5 | 9 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 0 | 4 | 6 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 4);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_inner_one_two_parts_right(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let batch1 = build_table_i32(
("a2", &vec![10, 20]),
("b1", &vec![4, 6]),
("c2", &vec![70, 80]),
);
let batch2 =
build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
let schema = batch1.schema();
let right =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(1, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(6, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
let stream = join.execute(1, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
div_ceil(2, batch_size)
} else {
div_ceil(3, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 2 | 5 | 8 | 30 | 5 | 90 |
| 3 | 5 | 9 | 30 | 5 | 90 |
+----+----+----+----+----+----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
fn build_table_two_batches(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
c: (&str, &Vec<i32>),
) -> Arc<dyn ExecutionPlan> {
let batch = build_table_i32(a, b, c);
let schema = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_multi_batch(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table_two_batches(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let (_, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
return Ok(());
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_full_multi_batch(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table_two_batches(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let metrics = join.metrics().unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_empty_right(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
let join = join(
left,
right,
on,
&JoinType::Left,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let metrics = join.metrics().unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | | | |
| 2 | 5 | 8 | | | |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_full_empty_right(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let metrics = join.metrics().unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | | | |
| 2 | 5 | 8 | | | |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn partitioned_join_left_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
build_table(
("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
)
}
fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
build_table(
("a2", &vec![8, 12, 6, 2, 10, 4]),
("b2", &vec![8, 10, 6, 2, 10, 4]),
("c2", &vec![20, 40, 60, 80, 100, 120]),
)
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_semi(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 11 | 8 | 110 |
| 13 | 10 | 130 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_semi_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Right,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 11 | 8 | 110 |
| 13 | 10 | 130 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 13 | 10 | 130 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_semi(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_semi_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Left,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_anti(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+
| a1 | b1 | c1 |
+----+----+----+
| 1 | 1 | 10 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
+----+----+----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_anti_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Right,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 1 | 1 | 10 |
| 11 | 8 | 110 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 1 | 1 | 10 |
| 11 | 8 | 110 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_anti(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_anti_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Left,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 12 | 10 | 40 |
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 10 | 10 | 100 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
let column_indices = vec![ColumnIndex {
index: 1,
side: JoinSide::Right,
}];
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Right,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn partitioned_join_right_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
left,
right,
on,
&JoinType::Right,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_full_one(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_mark(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::LeftMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+-------+
| a1 | b1 | c1 | mark |
+----+----+----+-------+
| 1 | 4 | 7 | true |
| 2 | 5 | 8 | true |
| 3 | 7 | 9 | false |
+----+----+----+-------+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn partitioned_join_left_mark(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40]),
("b1", &vec![4, 4, 5, 6]),
("c2", &vec![60, 70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::LeftMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+-------+
| a1 | b1 | c1 | mark |
+----+----+----+-------+
| 1 | 4 | 7 | true |
| 2 | 5 | 8 | true |
| 3 | 7 | 9 | false |
+----+----+----+-------+
");
}
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_mark(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::RightMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
let expected = [
"+----+----+----+-------+",
"| a2 | b1 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 70 | true |",
"| 20 | 5 | 80 | true |",
"| 30 | 6 | 90 | false |",
"+----+----+----+-------+",
];
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, 3);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn partitioned_join_right_mark(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40]),
("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::RightMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
let expected = [
"+----+----+----+-------+",
"| a2 | b1 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 60 | true |",
"| 20 | 4 | 70 | true |",
"| 30 | 5 | 80 | true |",
"| 40 | 6 | 90 | false |",
"+----+----+----+-------+",
];
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, 4);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[test]
fn join_with_hash_collisions_64() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
let next = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();
assert_eq!(left_ids, l);
assert_eq!(right_ids, r);
Ok(())
}
#[test]
fn join_with_hash_collisions_u32() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
let next: Vec<u32> = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();
assert_eq!(left_ids, l);
assert_eq!(right_ids, r);
Ok(())
}
#[tokio::test]
async fn join_with_duplicated_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![1, 2, 3]),
("b", &vec![4, 5, 7]),
("c", &vec![7, 8, 9]),
);
let right = build_table(
("a", &vec![10, 20, 30]),
("b", &vec![1, 2, 7]),
("c", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+----+
| a | b | c | a | b | c |
+---+---+---+----+---+----+
| 1 | 4 | 7 | 10 | 1 | 70 |
| 2 | 5 | 8 | 20 | 2 | 80 |
+---+---+---+----+---+----+
");
}
Ok(())
}
fn prepare_join_filter() -> JoinFilter {
let column_indices = vec![
ColumnIndex {
index: 2,
side: JoinSide::Left,
},
ColumnIndex {
index: 2,
side: JoinSide::Right,
},
];
let intermediate_schema = Schema::new(vec![
Field::new("c", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Gt,
Arc::new(Column::new("c", 1)),
)) as Arc<dyn PhysicalExpr>;
JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
)
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_inner_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
+---+---+---+----+---+---+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_left_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Left,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| 0 | 4 | 7 | | | |
| 1 | 5 | 8 | | | |
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
| 2 | 8 | 1 | | | |
+---+---+---+----+---+---+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_right_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Right,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| | | | 30 | 3 | 6 |
| | | | 40 | 4 | 4 |
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
+---+---+---+----+---+---+
");
}
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn join_full_with_filter(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = [
"+---+---+---+----+---+---+",
"| a | b | c | a | b | c |",
"+---+---+---+----+---+---+",
"| | | | 30 | 3 | 6 |",
"| | | | 40 | 4 | 4 |",
"| 2 | 7 | 9 | 10 | 2 | 7 |",
"| 2 | 7 | 9 | 20 | 2 | 5 |",
"| 0 | 4 | 7 | | | |",
"| 1 | 5 | 8 | | | |",
"| 2 | 8 | 1 | | | |",
"+---+---+---+----+---+---+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = join.metrics().unwrap();
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[tokio::test]
async fn test_collect_left_multiple_partitions_join() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let expected_inner = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
let expected_left = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 3 | 7 | 9 | | | |",
"+----+----+----+----+----+----+",
];
let expected_right = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| | | | 30 | 6 | 90 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
let expected_full = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| | | | 30 | 6 | 90 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 3 | 7 | 9 | | | |",
"+----+----+----+----+----+----+",
];
let expected_left_semi = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 1 | 4 | 7 |",
"| 2 | 5 | 8 |",
"+----+----+----+",
];
let expected_left_anti = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 3 | 7 | 9 |",
"+----+----+----+",
];
let expected_right_semi = vec![
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 10 | 4 | 70 |",
"| 20 | 5 | 80 |",
"+----+----+----+",
];
let expected_right_anti = vec![
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 30 | 6 | 90 |",
"+----+----+----+",
];
let expected_left_mark = vec![
"+----+----+----+-------+",
"| a1 | b1 | c1 | mark |",
"+----+----+----+-------+",
"| 1 | 4 | 7 | true |",
"| 2 | 5 | 8 | true |",
"| 3 | 7 | 9 | false |",
"+----+----+----+-------+",
];
let expected_right_mark = vec![
"+----+----+----+-------+",
"| a2 | b2 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 70 | true |",
"| 20 | 5 | 80 | true |",
"| 30 | 6 | 90 | false |",
"+----+----+----+-------+",
];
let test_cases = vec![
(JoinType::Inner, expected_inner),
(JoinType::Left, expected_left),
(JoinType::Right, expected_right),
(JoinType::Full, expected_full),
(JoinType::LeftSemi, expected_left_semi),
(JoinType::LeftAnti, expected_left_anti),
(JoinType::RightSemi, expected_right_semi),
(JoinType::RightAnti, expected_right_anti),
(JoinType::LeftMark, expected_left_mark),
(JoinType::RightMark, expected_right_mark),
];
for (join_type, expected) in test_cases {
let (_, batches, metrics) = join_collect_with_partition_mode(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
Arc::clone(&task_ctx),
)
.await?;
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, expected.len() - 4);
}
Ok(())
}
#[tokio::test]
async fn join_date32() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("date", DataType::Date32, false),
Field::new("n", DataType::Int32, false),
]));
let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
let left =
TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
.unwrap();
let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+------------+---+------------+---+
| date | n | date | n |
+------------+---+------------+---+
| 2022-04-26 | 2 | 2022-04-26 | 4 |
| 2022-04-26 | 2 | 2022-04-26 | 5 |
| 2022-04-27 | 3 | 2022-04-27 | 6 |
+------------+---+------------+---+
");
}
Ok(())
}
#[tokio::test]
async fn join_with_error_right() {
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let err = exec_err!("bad data error");
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let join = join(
Arc::clone(&left),
Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();
let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx).unwrap();
let result_string = common::collect(stream).await.unwrap_err().to_string();
assert!(
result_string.contains("bad data error"),
"actual: {result_string}"
);
}
}
#[tokio::test]
async fn join_split_batch() {
let left = build_table(
("a1", &vec![1, 2, 3, 4]),
("b1", &vec![1, 1, 1, 1]),
("c1", &vec![0, 0, 0, 0]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40, 50]),
("b2", &vec![1, 1, 1, 1, 1]),
("c2", &vec![0, 0, 0, 0, 0]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::RightSemi,
JoinType::RightAnti,
JoinType::LeftSemi,
JoinType::LeftAnti,
];
let expected_resultset_records = 20;
let common_result = [
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 1 | 0 | 10 | 1 | 0 |",
"| 2 | 1 | 0 | 10 | 1 | 0 |",
"| 3 | 1 | 0 | 10 | 1 | 0 |",
"| 4 | 1 | 0 | 10 | 1 | 0 |",
"| 1 | 1 | 0 | 20 | 1 | 0 |",
"| 2 | 1 | 0 | 20 | 1 | 0 |",
"| 3 | 1 | 0 | 20 | 1 | 0 |",
"| 4 | 1 | 0 | 20 | 1 | 0 |",
"| 1 | 1 | 0 | 30 | 1 | 0 |",
"| 2 | 1 | 0 | 30 | 1 | 0 |",
"| 3 | 1 | 0 | 30 | 1 | 0 |",
"| 4 | 1 | 0 | 30 | 1 | 0 |",
"| 1 | 1 | 0 | 40 | 1 | 0 |",
"| 2 | 1 | 0 | 40 | 1 | 0 |",
"| 3 | 1 | 0 | 40 | 1 | 0 |",
"| 4 | 1 | 0 | 40 | 1 | 0 |",
"| 1 | 1 | 0 | 50 | 1 | 0 |",
"| 2 | 1 | 0 | 50 | 1 | 0 |",
"| 3 | 1 | 0 | 50 | 1 | 0 |",
"| 4 | 1 | 0 | 50 | 1 | 0 |",
"+----+----+----+----+----+----+",
];
let left_batch = [
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 1 | 1 | 0 |",
"| 2 | 1 | 0 |",
"| 3 | 1 | 0 |",
"| 4 | 1 | 0 |",
"+----+----+----+",
];
let right_batch = [
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 10 | 1 | 0 |",
"| 20 | 1 | 0 |",
"| 30 | 1 | 0 |",
"| 40 | 1 | 0 |",
"| 50 | 1 | 0 |",
"+----+----+----+",
];
let right_empty = [
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"+----+----+----+",
];
let left_empty = [
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"+----+----+----+",
];
for join_type in join_types {
for batch_size in (1..21).rev() {
let task_ctx = prepare_task_ctx(batch_size, true);
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected_batch_count = match join_type {
JoinType::Inner
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
div_ceil(expected_resultset_records, batch_size)
}
_ => div_ceil(expected_resultset_records, batch_size) + 1,
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
batches.len()
);
let expected = match join_type {
JoinType::RightSemi => right_batch.to_vec(),
JoinType::RightAnti => right_empty.to_vec(),
JoinType::LeftSemi => left_batch.to_vec(),
JoinType::LeftAnti => left_empty.to_vec(),
_ => common_result.to_vec(),
};
if batches.is_empty() {
assert!(
matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
"Unexpected empty result for {join_type} join"
);
} else {
assert_batches_eq!(expected, &batches);
}
}
}
}
#[tokio::test]
async fn single_partition_join_overallocation() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
);
let right = build_table(
("a2", &vec![10, 11]),
("b2", &vec![12, 13]),
("c2", &vec![14, 15]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
JoinType::LeftMark,
JoinType::RightMark,
];
for join_type in join_types {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)?;
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
);
assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput"
);
}
Ok(())
}
#[tokio::test]
async fn partitioned_join_overallocation() -> Result<()> {
let left_batch = build_table_i32(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
);
let left = TestMemoryExec::try_new_exec(
&[vec![left_batch.clone()], vec![left_batch.clone()]],
left_batch.schema(),
None,
)
.unwrap();
let right_batch = build_table_i32(
("a2", &vec![10, 11]),
("b2", &vec![12, 13]),
("c2", &vec![14, 15]),
);
let right = TestMemoryExec::try_new_exec(
&[vec![right_batch.clone()], vec![right_batch.clone()]],
right_batch.schema(),
None,
)
.unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
let join = HashJoinExec::try_new(
Arc::clone(&left) as Arc<dyn ExecutionPlan>,
Arc::clone(&right) as Arc<dyn ExecutionPlan>,
on.clone(),
None,
&join_type,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?;
let stream = join.execute(1, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
);
assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput[1]"
);
}
Ok(())
}
fn build_table_struct(
struct_name: &str,
field_name_and_values: (&str, &Vec<Option<i32>>),
nulls: Option<NullBuffer>,
) -> Arc<dyn ExecutionPlan> {
let (field_name, values) = field_name_and_values;
let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
let schema = Schema::new(vec![Field::new(
struct_name,
DataType::Struct(inner_fields.clone().into()),
nulls.is_some(),
)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StructArray::new(
inner_fields.into(),
vec![Arc::new(Int32Array::from(values.clone()))],
nulls,
))],
)
.unwrap();
let schema_ref = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
}
#[tokio::test]
async fn join_on_struct() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left =
build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
let right =
build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
let on = vec![(
Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["n1", "n2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+--------+--------+
| n1 | n2 |
+--------+--------+
| {a: } | {a: } |
| {a: 1} | {a: 1} |
| {a: 2} | {a: 2} |
+--------+--------+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_on_struct_with_nulls() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left =
build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
let right =
build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
let on = vec![(
Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
)];
let (_, batches_null_eq, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNull,
Arc::clone(&task_ctx),
)
.await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
+----+----+
| n1 | n2 |
+----+----+
| | |
+----+----+
");
}
assert_join_metrics!(metrics, 1);
let (_, batches_null_neq, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_join_metrics!(metrics, 0);
if batches_null_neq.is_empty() {
} else {
let expected_null_neq =
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
assert_batches_eq!(expected_null_neq, &batches_null_neq);
}
Ok(())
}
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
#[tokio::test]
async fn test_hash_join_marks_filter_complete() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 6]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;
dynamic_filter_clone.wait_complete().await;
Ok(())
}
#[tokio::test]
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;
dynamic_filter_clone.wait_complete().await;
Ok(())
}
#[tokio::test]
async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> {
let task_ctx = prepare_task_ctx(8192, true);
let (left_schema, right_schema, on) = build_schema_and_on()?;
let left_batch = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
Arc::new(Int32Array::from(vec![-1, 0, 1])) as ArrayRef,
],
)?;
let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
let right_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![
Arc::new(Int32Array::from(vec![10, 20, 30, 40])) as ArrayRef,
Arc::new(Int32Array::from(vec![1, -1, 0, 2])) as ArrayRef,
],
)?;
let right =
TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
assert_batches_sorted_eq!(
[
"+----+----+----+----+",
"| a1 | b1 | a2 | b1 |",
"+----+----+----+----+",
"| 1 | -1 | 20 | -1 |",
"| 2 | 0 | 30 | 0 |",
"| 3 | 1 | 10 | 1 |",
"+----+----+----+----+",
],
&batches
);
assert_phj_used(&metrics, true);
Ok(())
}
#[tokio::test]
async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
let task_ctx = prepare_task_ctx(8192, true);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
)?;
let left = TestMemoryExec::try_new_exec(
&[vec![batch.clone()]],
Arc::clone(&schema),
None,
)?;
let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
let on: JoinOn = vec![(
Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
)];
let (_columns, batches, _metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let (left_schema, right_schema, on) = build_schema_and_on()?;
let left_batch = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef,
],
)?;
let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
let right_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![
Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
],
)?;
let right =
TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNull,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
assert_batches_sorted_eq!(
[
"+----+----+----+----+",
"| a1 | b1 | a2 | b1 |",
"+----+----+----+----+",
"| 1 | 10 | 3 | 10 |",
"+----+----+----+----+",
],
&batches
);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_phj_null_equals_nothing_build_probe_all_have_nulls(
batch_size: usize,
use_perfect_hash_join_as_possible: bool,
) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible);
let (left_schema, right_schema, on) = build_schema_and_on()?;
let left_batch = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
],
)?;
let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
let right_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![
Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef,
],
)?;
let right =
TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
assert_batches_sorted_eq!(
[
"+----+----+----+----+",
"| a1 | b1 | a2 | b1 |",
"+----+----+----+----+",
"| 1 | 10 | 3 | 10 |",
"+----+----+----+----+",
],
&batches
);
assert_phj_used(&metrics, use_perfect_hash_join_as_possible);
Ok(())
}
#[tokio::test]
async fn test_phj_null_equals_null_build_have_nulls() -> Result<()> {
let task_ctx = prepare_task_ctx(8192, true);
let (left_schema, right_schema, on) = build_schema_and_on()?;
let left_batch = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef,
],
)?;
let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?;
let right_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![
Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef,
],
)?;
let right =
TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?;
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNull,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]);
assert_batches_sorted_eq!(
[
"+----+----+----+----+",
"| a1 | b1 | a2 | b1 |",
"+----+----+----+----+",
"| 1 | 10 | 3 | 10 |",
"+----+----+----+----+",
],
&batches
);
assert_phj_used(&metrics, false);
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, false);
let left = build_table_two_cols(
("c1", &vec![Some(1), Some(2), Some(3), Some(4)]),
("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]),
);
let right = build_table_two_cols(
("c2", &vec![Some(1), Some(2), Some(3), None]),
("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]),
);
let on = vec![(
Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
)];
let join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::LeftAnti,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
true, )?;
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
++
++
");
}
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, false);
let left = build_table_two_cols(
("c1", &vec![Some(1), Some(4), None]),
("dummy", &vec![Some(10), Some(40), Some(0)]),
);
let right = build_table_two_cols(
("c2", &vec![Some(1), Some(2), Some(3)]),
("dummy", &vec![Some(100), Some(200), Some(300)]),
);
let on = vec![(
Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
)];
let join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::LeftAnti,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
true, )?;
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+-------+
| c1 | dummy |
+----+-------+
| 4 | 40 |
+----+-------+
");
}
Ok(())
}
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size, false);
let left = build_table_two_cols(
("c1", &vec![Some(1), Some(2), Some(4), Some(5)]),
("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]),
);
let right = build_table_two_cols(
("c2", &vec![Some(1), Some(2), Some(3)]),
("dummy", &vec![Some(100), Some(200), Some(300)]),
);
let on = vec![(
Arc::new(Column::new_with_schema("c1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("c2", &right.schema())?) as _,
)];
let join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::LeftAnti,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
true, )?;
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+-------+
| c1 | dummy |
+----+-------+
| 4 | 40 |
| 5 | 50 |
+----+-------+
");
}
Ok(())
}
#[tokio::test]
async fn test_null_aware_validation_wrong_join_type() {
let left =
build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)]));
let right =
build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)]));
let on = vec![(
Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _,
)];
let result = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
true, );
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("null_aware can only be true for LeftAnti joins")
);
}
#[tokio::test]
async fn test_null_aware_validation_multi_column() {
let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3]));
let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3]));
let on = vec![
(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _,
),
];
let result = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::LeftAnti,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
true, );
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("null_aware anti join only supports single column join key")
);
}
#[test]
fn test_lr_is_preserved() {
assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true));
assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true));
assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true));
assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true));
assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
}
}