use std::fmt;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::{any::Any, vec};
use crate::ExecutionPlanProperties;
use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
use crate::joins::hash_join::shared_bounds::{
ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
use crate::joins::utils::{
OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap,
swap_join_projection, update_hash,
};
use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
use crate::projection::{
EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
try_pushdown_through_join,
};
use crate::repartition::REPARTITION_RANDOM_STATE;
use crate::spill::get_record_batch_memory_size;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
common::can_project,
joins::utils::{
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
build_join_schema, check_join_is_valid, estimate_join_statistics,
need_produce_result_in_final, symmetric_join_output_partitioning,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
};
use arrow::array::{ArrayRef, BooleanBufferBuilder};
use arrow::compute::concat_batches;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_schema::DataType;
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, plan_err,
project_schema,
};
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_expr::Accumulator;
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::equivalence::{
ProjectionMapping, join_equivalence_properties,
};
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use ahash::RandomState;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use parking_lot::Mutex;
use super::partitioned_hash_eval::SeededRandomState;
pub(crate) const HASH_JOIN_SEED: SeededRandomState =
SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
pub(super) struct JoinLeftData {
pub(super) hash_map: Arc<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
_reservation: MemoryReservation,
pub(super) bounds: Option<PartitionBounds>,
pub(super) membership: PushdownStrategy,
}
impl JoinLeftData {
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
&*self.hash_map
}
pub(super) fn batch(&self) -> &RecordBatch {
&self.batch
}
pub(super) fn values(&self) -> &[ArrayRef] {
&self.values
}
pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
&self.visited_indices_bitmap
}
pub(super) fn membership(&self) -> &PushdownStrategy {
&self.membership
}
pub(super) fn report_probe_completed(&self) -> bool {
self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
}
}
#[expect(rustdoc::private_intra_doc_links)]
pub struct HashJoinExec {
pub left: Arc<dyn ExecutionPlan>,
pub right: Arc<dyn ExecutionPlan>,
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
join_schema: SchemaRef,
left_fut: Arc<OnceAsync<JoinLeftData>>,
random_state: SeededRandomState,
pub mode: PartitionMode,
metrics: ExecutionPlanMetricsSet,
pub projection: Option<Vec<usize>>,
column_indices: Vec<ColumnIndex>,
pub null_equality: NullEquality,
cache: PlanProperties,
dynamic_filter: Option<HashJoinExecDynamicFilter>,
}
#[derive(Clone)]
struct HashJoinExecDynamicFilter {
filter: Arc<DynamicFilterPhysicalExpr>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}
impl fmt::Debug for HashJoinExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HashJoinExec")
.field("left", &self.left)
.field("right", &self.right)
.field("on", &self.on)
.field("filter", &self.filter)
.field("join_type", &self.join_type)
.field("join_schema", &self.join_schema)
.field("left_fut", &self.left_fut)
.field("random_state", &self.random_state)
.field("mode", &self.mode)
.field("metrics", &self.metrics)
.field("projection", &self.projection)
.field("column_indices", &self.column_indices)
.field("null_equality", &self.null_equality)
.field("cache", &self.cache)
.finish()
}
}
impl EmbeddedProjection for HashJoinExec {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
self.with_projection(projection)
}
}
impl HashJoinExec {
#[expect(clippy::too_many_arguments)]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
partition_mode: PartitionMode,
null_equality: NullEquality,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
if on.is_empty() {
return plan_err!("On constraints in HashJoinExec should be non-empty");
}
check_join_is_valid(&left_schema, &right_schema, &on)?;
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, join_type);
let random_state = HASH_JOIN_SEED;
let join_schema = Arc::new(join_schema);
can_project(&join_schema, projection.as_ref())?;
let cache = Self::compute_properties(
&left,
&right,
&join_schema,
*join_type,
&on,
partition_mode,
projection.as_ref(),
)?;
Ok(HashJoinExec {
left,
right,
on,
filter,
join_type: *join_type,
join_schema,
left_fut: Default::default(),
random_state,
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
projection,
column_indices,
null_equality,
cache,
dynamic_filter: None,
})
}
fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
}
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
}
pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
&self.right
}
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
&self.on
}
pub fn filter(&self) -> Option<&JoinFilter> {
self.filter.as_ref()
}
pub fn join_type(&self) -> &JoinType {
&self.join_type
}
pub fn join_schema(&self) -> &SchemaRef {
&self.join_schema
}
pub fn partition_mode(&self) -> &PartitionMode {
&self.mode
}
pub fn null_equality(&self) -> NullEquality {
self.null_equality
}
#[doc(hidden)]
pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
self.dynamic_filter
.as_ref()
.map(|df| Arc::clone(&df.filter))
}
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
vec![
false,
matches!(
join_type,
JoinType::Inner
| JoinType::Right
| JoinType::RightAnti
| JoinType::RightSemi
| JoinType::RightMark
),
]
}
pub fn probe_side() -> JoinSide {
JoinSide::Right
}
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
}
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
can_project(&self.schema(), projection.as_ref())?;
let projection = match projection {
Some(projection) => match &self.projection {
Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
None => Some(projection),
},
None => None,
};
Self::try_new(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.on.clone(),
self.filter.clone(),
&self.join_type,
projection,
self.mode,
self.null_equality,
)
}
fn compute_properties(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
join_type: JoinType,
on: JoinOnRef,
mode: PartitionMode,
projection: Option<&Vec<usize>>,
) -> Result<PlanProperties> {
let mut eq_properties = join_equivalence_properties(
left.equivalence_properties().clone(),
right.equivalence_properties().clone(),
&join_type,
Arc::clone(schema),
&Self::maintains_input_order(join_type),
Some(Self::probe_side()),
on,
)?;
let mut output_partitioning = match mode {
PartitionMode::CollectLeft => {
asymmetric_join_output_partitioning(left, right, &join_type)?
}
PartitionMode::Auto => Partitioning::UnknownPartitioning(
right.output_partitioning().partition_count(),
),
PartitionMode::Partitioned => {
symmetric_join_output_partitioning(left, right, &join_type)?
}
};
let emission_type = if left.boundedness().is_unbounded() {
EmissionType::Final
} else if right.pipeline_behavior() == EmissionType::Incremental {
match join_type {
JoinType::Inner
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::Right
| JoinType::RightAnti
| JoinType::RightMark => EmissionType::Incremental,
JoinType::Left
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::Full => EmissionType::Both,
}
} else {
right.pipeline_behavior()
};
if let Some(projection) = projection {
let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
let out_schema = project_schema(schema, Some(projection))?;
output_partitioning =
output_partitioning.project(&projection_mapping, &eq_properties);
eq_properties = eq_properties.project(&projection_mapping, out_schema);
}
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
emission_type,
boundedness_from_children([left, right]),
))
}
pub fn swap_inputs(
&self,
partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
let new_join = HashJoinExec::try_new(
Arc::clone(right),
Arc::clone(left),
self.on()
.iter()
.map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
.collect(),
self.filter().map(JoinFilter::swap),
&self.join_type().swap(),
swap_join_projection(
left.schema().fields().len(),
right.schema().fields().len(),
self.projection.as_ref(),
self.join_type(),
),
partition_mode,
self.null_equality(),
)?;
if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Ok(Arc::new(new_join))
} else {
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
}
}
}
impl DisplayAs for HashJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.contains_projection() {
format!(
", projection=[{}]",
self.projection
.as_ref()
.unwrap()
.iter()
.map(|index| format!(
"{}@{}",
self.join_schema.fields().get(*index).unwrap().name(),
index
))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
let display_null_equality =
if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
", NullsEqual: true"
} else {
""
};
let on = self
.on
.iter()
.map(|(c1, c2)| format!("({c1}, {c2})"))
.collect::<Vec<String>>()
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
)
}
DisplayFormatType::TreeRender => {
let on = self
.on
.iter()
.map(|(c1, c2)| {
format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
})
.collect::<Vec<String>>()
.join(", ");
if *self.join_type() != JoinType::Inner {
writeln!(f, "join_type={:?}", self.join_type)?;
}
writeln!(f, "on={on}")?;
if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
writeln!(f, "NullsEqual: true")?;
}
if let Some(filter) = self.filter.as_ref() {
writeln!(f, "filter={filter}")?;
}
Ok(())
}
}
}
}
impl ExecutionPlan for HashJoinExec {
fn name(&self) -> &'static str {
"HashJoinExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
PartitionMode::CollectLeft => vec![
Distribution::SinglePartition,
Distribution::UnspecifiedDistribution,
],
PartitionMode::Partitioned => {
let (left_expr, right_expr) = self
.on
.iter()
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
vec![
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
]
}
PartitionMode::Auto => vec![
Distribution::UnspecifiedDistribution,
Distribution::UnspecifiedDistribution,
],
}
}
fn maintains_input_order(&self) -> Vec<bool> {
Self::maintains_input_order(self.join_type)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.left, &self.right]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(HashJoinExec {
left: Arc::clone(&children[0]),
right: Arc::clone(&children[1]),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: Self::compute_properties(
&children[0],
&children[1],
&self.join_schema,
self.join_type,
&self.on,
self.mode,
self.projection.as_ref(),
)?,
dynamic_filter: self.dynamic_filter.clone(),
}))
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::new(OnceAsync::default()),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: None,
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let on_left = self
.on
.iter()
.map(|on| Arc::clone(&on.0))
.collect::<Vec<_>>();
let left_partitions = self.left.output_partitioning().partition_count();
let right_partitions = self.right.output_partitioning().partition_count();
assert_or_internal_err!(
self.mode != PartitionMode::Partitioned
|| left_partitions == right_partitions,
"Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
consider using RepartitionExec"
);
assert_or_internal_err!(
self.mode != PartitionMode::CollectLeft || left_partitions == 1,
"Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
Ok(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
context
.session_config()
.options()
.optimizer
.hash_join_inlist_pushdown_max_size,
context
.session_config()
.options()
.optimizer
.hash_join_inlist_pushdown_max_distinct_values,
))
})?,
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;
let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());
OnceFut::new(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown,
context
.session_config()
.options()
.optimizer
.hash_join_inlist_pushdown_max_size,
context
.session_config()
.options()
.optimizer
.hash_join_inlist_pushdown_max_distinct_values,
))
}
PartitionMode::Auto => {
return plan_err!(
"Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
PartitionMode::Auto
);
}
};
let batch_size = context.session_config().batch_size();
let repartition_random_state = REPARTITION_RANDOM_STATE;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();
let right_stream = self.right.execute(partition, context)?;
let column_indices_after_projection = match &self.projection {
Some(projection) => projection
.iter()
.map(|i| self.column_indices[*i].clone())
.collect(),
None => self.column_indices.clone(),
};
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Ok(Box::pin(HashJoinStream::new(
partition,
self.schema(),
on_right,
self.filter.clone(),
self.join_type,
right_stream,
self.random_state.random_state().clone(),
join_metrics,
column_indices_after_projection,
self.null_equality,
HashJoinStreamState::WaitBuildSide,
BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
vec![],
self.right.output_ordering().is_some(),
build_accumulator,
self.mode,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
}
let stats = estimate_join_statistics(
self.left.partition_statistics(None)?,
self.right.partition_statistics(None)?,
&self.on,
&self.join_type,
&self.join_schema,
)?;
Ok(stats.project(self.projection.as_ref()))
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if self.contains_projection() {
return Ok(None);
}
let schema = self.schema();
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
join_on,
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
self.on(),
&schema,
self.filter(),
)? {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::new(projected_left_child),
Arc::new(projected_right_child),
join_on,
join_filter,
self.join_type(),
None,
*self.partition_mode(),
self.null_equality,
)?)))
} else {
try_embed_projection(projection, self)
}
}
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterDescription> {
if self.join_type != JoinType::Inner {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
&parent_filters,
self.left(),
)?;
let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
&parent_filters,
self.right(),
)?;
if matches!(phase, FilterPushdownPhase::Post)
&& config.optimizer.enable_join_dynamic_filter_pushdown
{
let dynamic_filter = Self::create_dynamic_filter(&self.on);
right_child = right_child.with_self_filter(dynamic_filter);
}
Ok(FilterDescription::new()
.with_child(left_child)
.with_child(right_child))
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
if self.join_type != JoinType::Inner {
return Ok(FilterPushdownPropagation::all_unsupported(
child_pushdown_result,
));
}
let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
let predicate = Arc::clone(&filter.predicate);
if let Ok(dynamic_filter) =
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
{
let new_node = Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
}
Ok(result)
}
}
struct CollectLeftAccumulator {
expr: Arc<dyn PhysicalExpr>,
min: MinAccumulator,
max: MaxAccumulator,
}
impl CollectLeftAccumulator {
fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
fn dictionary_value_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Dictionary(_, value_type) => {
dictionary_value_type(value_type.as_ref())
}
_ => data_type.clone(),
}
}
let data_type = expr
.data_type(schema)
.map(|dt| dictionary_value_type(&dt))?;
Ok(Self {
expr,
min: MinAccumulator::try_new(&data_type)?,
max: MaxAccumulator::try_new(&data_type)?,
})
}
fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
self.min.update_batch(std::slice::from_ref(&array))?;
self.max.update_batch(std::slice::from_ref(&array))?;
Ok(())
}
fn evaluate(mut self) -> Result<ColumnBounds> {
Ok(ColumnBounds::new(
self.min.evaluate()?,
self.max.evaluate()?,
))
}
}
struct BuildSideState {
batches: Vec<RecordBatch>,
num_rows: usize,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
}
impl BuildSideState {
fn try_new(
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
on_left: Vec<Arc<dyn PhysicalExpr>>,
schema: &SchemaRef,
should_compute_dynamic_filters: bool,
) -> Result<Self> {
Ok(Self {
batches: Vec::new(),
num_rows: 0,
metrics,
reservation,
bounds_accumulators: should_compute_dynamic_filters
.then(|| {
on_left
.into_iter()
.map(|expr| CollectLeftAccumulator::try_new(expr, schema))
.collect::<Result<Vec<_>>>()
})
.transpose()?,
})
}
}
#[expect(clippy::too_many_arguments)]
async fn collect_left_input(
random_state: RandomState,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
should_compute_dynamic_filters: bool,
max_inlist_size: usize,
max_inlist_distinct_values: usize,
) -> Result<JoinLeftData> {
let schema = left_stream.schema();
let initial = BuildSideState::try_new(
metrics,
reservation,
on_left.clone(),
&schema,
should_compute_dynamic_filters,
)?;
let state = left_stream
.try_fold(initial, |mut state, batch| async move {
if let Some(ref mut accumulators) = state.bounds_accumulators {
for accumulator in accumulators {
accumulator.update_batch(&batch)?;
}
}
let batch_size = get_record_batch_memory_size(&batch);
state.reservation.try_grow(batch_size)?;
state.metrics.build_mem_used.add(batch_size);
state.metrics.build_input_batches.add(1);
state.metrics.build_input_rows.add(batch.num_rows());
state.num_rows += batch.num_rows();
state.batches.push(batch);
Ok(state)
})
.await?;
let BuildSideState {
batches,
num_rows,
metrics,
mut reservation,
bounds_accumulators,
} = state;
let fixed_size_u32 = size_of::<JoinHashMapU32>();
let fixed_size_u64 = size_of::<JoinHashMapU64>();
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU64::with_capacity(num_rows))
} else {
let estimated_hashtable_size =
estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU32::with_capacity(num_rows))
};
let mut hashes_buffer = Vec::new();
let mut offset = 0;
let batches_iter = batches.iter().rev();
for batch in batches_iter.clone() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
&on_left,
batch,
&mut *hashmap,
offset,
&random_state,
&mut hashes_buffer,
0,
true,
)?;
offset += batch.num_rows();
}
let batch = concat_batches(&schema, batches_iter)?;
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
bitmap_buffer.append_n(num_rows, false);
bitmap_buffer
} else {
BooleanBufferBuilder::new(0)
};
let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
let bounds = match bounds_accumulators {
Some(accumulators) if num_rows > 0 => {
let bounds = accumulators
.into_iter()
.map(CollectLeftAccumulator::evaluate)
.collect::<Result<Vec<_>>>()?;
Some(PartitionBounds::new(bounds))
}
_ => None,
};
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
let membership = if num_rows == 0 {
PushdownStrategy::Empty
} else {
let estimated_size = left_values
.iter()
.map(|arr| arr.get_array_memory_size())
.sum::<usize>();
if left_values.is_empty()
|| left_values[0].is_empty()
|| estimated_size > max_inlist_size
|| hash_map.len() > max_inlist_distinct_values
{
PushdownStrategy::HashTable(Arc::clone(&hash_map))
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
PushdownStrategy::InList(in_list_values)
} else {
PushdownStrategy::HashTable(Arc::clone(&hash_map))
}
};
let data = JoinLeftData {
hash_map,
batch,
values: left_values,
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
probe_threads_counter: AtomicUsize::new(probe_threads_count),
_reservation: reservation,
bounds,
membership,
};
Ok(data)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::joins::hash_join::stream::lookup_join_hashmap;
use crate::test::{TestMemoryExec, assert_join_metrics};
use crate::{
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
test::exec::MockExec,
};
use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{
ScalarValue, assert_batches_eq, assert_batches_sorted_eq, assert_contains,
exec_err, internal_err,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use hashbrown::HashTable;
use insta::{allow_duplicates, assert_snapshot};
use rstest::*;
use rstest_reuse::*;
fn div_ceil(a: usize, b: usize) -> usize {
a.div_ceil(b)
}
#[template]
#[rstest]
fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::default().with_batch_size(batch_size);
Arc::new(TaskContext::default().with_session_config(session_config))
}
fn build_table(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
c: (&str, &Vec<i32>),
) -> Arc<dyn ExecutionPlan> {
let batch = build_table_i32(a, b, c);
let schema = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
}
fn join(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
) -> Result<HashJoinExec> {
HashJoinExec::try_new(
left,
right,
on,
None,
join_type,
None,
PartitionMode::CollectLeft,
null_equality,
)
}
fn join_with_filter(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: JoinFilter,
join_type: &JoinType,
null_equality: NullEquality,
) -> Result<HashJoinExec> {
HashJoinExec::try_new(
left,
right,
on,
Some(filter),
join_type,
None,
PartitionMode::CollectLeft,
null_equality,
)
}
async fn join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
let join = join(left, right, on, join_type, null_equality)?;
let columns_header = columns(&join.schema());
let stream = join.execute(0, context)?;
let batches = common::collect(stream).await?;
let metrics = join.metrics().unwrap();
Ok((columns_header, batches, metrics))
}
async fn partitioned_join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
join_collect_with_partition_mode(
left,
right,
on,
join_type,
PartitionMode::Partitioned,
null_equality,
context,
)
.await
}
async fn join_collect_with_partition_mode(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: &JoinType,
partition_mode: PartitionMode,
null_equality: NullEquality,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
let partition_count = 4;
let (left_expr, right_expr) = on
.iter()
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
left,
Partitioning::Hash(left_expr, partition_count),
)?),
PartitionMode::Auto => {
return internal_err!("Unexpected PartitionMode::Auto in join tests");
}
};
let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
PartitionMode::CollectLeft => {
let partition_column_name = right.schema().field(0).name().clone();
let partition_expr = vec![Arc::new(Column::new_with_schema(
&partition_column_name,
&right.schema(),
)?) as _];
Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(partition_expr, partition_count),
)?) as _
}
PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(right_expr, partition_count),
)?),
PartitionMode::Auto => {
return internal_err!("Unexpected PartitionMode::Auto in join tests");
}
};
let join = HashJoinExec::try_new(
left_repartitioned,
right_repartitioned,
on,
None,
join_type,
None,
partition_mode,
null_equality,
)?;
let columns = columns(&join.schema());
let mut batches = vec![];
for i in 0..partition_count {
let stream = join.execute(i, Arc::clone(&context))?;
let more_batches = common::collect(stream).await?;
batches.extend(
more_batches
.into_iter()
.filter(|b| b.num_rows() > 0)
.collect::<Vec<_>>(),
);
}
let metrics = join.metrics().unwrap();
Ok((columns, batches, metrics))
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_inner_one_no_shared_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 5 | 9 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_inner_one_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![0, 3, 2, 1]),
("b1", &vec![4, 5, 5, 4]),
("c1", &vec![6, 9, 8, 7]),
);
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 3 | 5 | 9 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 0 | 4 | 6 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 4);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_two(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 2]),
("b2", &vec![1, 2, 2]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a1", &vec![1, 2, 3]),
("b2", &vec![1, 2, 2]),
("c2", &vec![70, 80, 90]),
);
let on = vec![
(
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
),
(
Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
),
];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(3, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(9, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b2 | c1 | a1 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 1 | 7 | 1 | 1 | 70 |
| 2 | 2 | 8 | 2 | 2 | 80 |
| 2 | 2 | 9 | 2 | 2 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let batch1 = build_table_i32(
("a1", &vec![1, 2]),
("b2", &vec![1, 2]),
("c1", &vec![7, 8]),
);
let batch2 =
build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
let schema = batch1.schema();
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a1", &vec![1, 2, 3]),
("b2", &vec![1, 2, 2]),
("c2", &vec![70, 80, 90]),
);
let on = vec![
(
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
),
(
Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
),
];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(3, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(9, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b2 | c1 | a1 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 1 | 7 | 1 | 1 | 70 |
| 2 | 2 | 8 | 2 | 2 | 80 |
| 2 | 2 | 9 | 2 | 2 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch1 = build_table_i32(
("a1", &vec![0, 3]),
("b1", &vec![4, 5]),
("c1", &vec![6, 9]),
);
let batch2 = build_table_i32(
("a1", &vec![2, 1]),
("b1", &vec![5, 4]),
("c1", &vec![8, 7]),
);
let schema = batch1.schema();
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 3 | 5 | 9 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 0 | 4 | 6 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 4);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
);
let batch1 = build_table_i32(
("a2", &vec![10, 20]),
("b1", &vec![4, 6]),
("c2", &vec![70, 80]),
);
let batch2 =
build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
let schema = batch1.schema();
let right =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
let mut expected_batch_count = div_ceil(1, batch_size);
if batch_size == 1 {
expected_batch_count += 1;
}
expected_batch_count
} else {
div_ceil(6, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
+----+----+----+----+----+----+
");
}
let stream = join.execute(1, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
div_ceil(2, batch_size)
} else {
div_ceil(3, batch_size)
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 2 | 5 | 8 | 30 | 5 | 90 |
| 3 | 5 | 9 | 30 | 5 | 90 |
+----+----+----+----+----+----+
");
}
Ok(())
}
fn build_table_two_batches(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
c: (&str, &Vec<i32>),
) -> Arc<dyn ExecutionPlan> {
let batch = build_table_i32(a, b, c);
let schema = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_multi_batch(batch_size: usize) {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table_two_batches(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Left,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_full_multi_batch(batch_size: usize) {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table_two_batches(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_empty_right(batch_size: usize) {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
let join = join(
left,
right,
on,
&JoinType::Left,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | | | |
| 2 | 5 | 8 | | | |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_full_empty_right(batch_size: usize) {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)
.unwrap();
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | | | |
| 2 | 5 | 8 | | | |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Left,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
build_table(
("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
)
}
fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
build_table(
("a2", &vec![8, 12, 6, 2, 10, 4]),
("b2", &vec![8, 10, 6, 2, 10, 4]),
("c2", &vec![20, 40, 60, 80, 100, 120]),
)
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_semi(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 11 | 8 | 110 |
| 13 | 10 | 130 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Right,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 11 | 8 | 110 |
| 13 | 10 | 130 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::LeftSemi,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 13 | 10 | 130 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_semi(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Left,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::RightSemi,
NullEquality::NullEqualsNothing,
)?;
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 12 | 10 | 40 |
| 10 | 10 | 100 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_anti(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+
| a1 | b1 | c1 |
+----+----+----+
| 1 | 1 | 10 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
+----+----+----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Right,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 1 | 1 | 10 |
| 11 | 8 | 110 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::LeftAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+-----+
| a1 | b1 | c1 |
+----+----+-----+
| 1 | 1 | 10 |
| 11 | 8 | 110 |
| 3 | 3 | 30 |
| 5 | 5 | 50 |
| 7 | 7 | 70 |
| 9 | 8 | 90 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_anti(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let join = join(
left,
right,
on,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
)];
let column_indices = vec![ColumnIndex {
index: 0,
side: JoinSide::Left,
}];
let intermediate_schema =
Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
filter,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, Arc::clone(&task_ctx))?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 12 | 10 | 40 |
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 10 | 10 | 100 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
let column_indices = vec![ColumnIndex {
index: 1,
side: JoinSide::Right,
}];
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
let filter = JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
);
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::RightAnti,
NullEquality::NullEqualsNothing,
)?;
let columns_header = columns(&join.schema());
assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+-----+
| a2 | b2 | c2 |
+----+----+-----+
| 8 | 8 | 20 |
| 6 | 6 | 60 |
| 2 | 2 | 80 |
| 4 | 4 | 120 |
+----+----+-----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Right,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
left,
right,
on,
&JoinType::Right,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b1 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
+----+----+----+----+----+----+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_full_one(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+----+----+----+
| a1 | b1 | c1 | a2 | b2 | c2 |
+----+----+----+----+----+----+
| | | | 30 | 6 | 90 |
| 1 | 4 | 7 | 10 | 4 | 70 |
| 2 | 5 | 8 | 20 | 5 | 80 |
| 3 | 7 | 9 | | | |
+----+----+----+----+----+----+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_mark(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::LeftMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+-------+
| a1 | b1 | c1 | mark |
+----+----+----+-------+
| 1 | 4 | 7 | true |
| 2 | 5 | 8 | true |
| 3 | 7 | 9 | false |
+----+----+----+-------+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40]),
("b1", &vec![4, 4, 5, 6]),
("c2", &vec![60, 70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::LeftMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+----+----+----+-------+
| a1 | b1 | c1 | mark |
+----+----+----+-------+
| 1 | 4 | 7 | true |
| 2 | 5 | 8 | true |
| 3 | 7 | 9 | false |
+----+----+----+-------+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_mark(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::RightMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
let expected = [
"+----+----+----+-------+",
"| a2 | b1 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 70 | true |",
"| 20 | 5 | 80 | true |",
"| 30 | 6 | 90 | false |",
"+----+----+----+-------+",
];
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, 3);
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40]),
("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let (columns, batches, metrics) = partitioned_join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::RightMark,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
let expected = [
"+----+----+----+-------+",
"| a2 | b1 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 60 | true |",
"| 20 | 4 | 70 | true |",
"| 30 | 5 | 80 | true |",
"| 40 | 6 | 90 | false |",
"+----+----+----+-------+",
];
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, 4);
Ok(())
}
#[test]
fn join_with_hash_collisions_64() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
let next = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();
assert_eq!(left_ids, l);
assert_eq!(right_ids, r);
Ok(())
}
#[test]
fn join_with_hash_collisions_u32() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
let next: Vec<u32> = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();
assert_eq!(left_ids, l);
assert_eq!(right_ids, r);
Ok(())
}
#[tokio::test]
async fn join_with_duplicated_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![1, 2, 3]),
("b", &vec![4, 5, 7]),
("c", &vec![7, 8, 9]),
);
let right = build_table(
("a", &vec![10, 20, 30]),
("b", &vec![1, 2, 7]),
("c", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+----+
| a | b | c | a | b | c |
+---+---+---+----+---+----+
| 1 | 4 | 7 | 10 | 1 | 70 |
| 2 | 5 | 8 | 20 | 2 | 80 |
+---+---+---+----+---+----+
");
}
Ok(())
}
fn prepare_join_filter() -> JoinFilter {
let column_indices = vec![
ColumnIndex {
index: 2,
side: JoinSide::Left,
},
ColumnIndex {
index: 2,
side: JoinSide::Right,
},
];
let intermediate_schema = Schema::new(vec![
Field::new("c", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let filter_expression = Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Gt,
Arc::new(Column::new("c", 1)),
)) as Arc<dyn PhysicalExpr>;
JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
)
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
+---+---+---+----+---+---+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_left_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Left,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| 0 | 4 | 7 | | | |
| 1 | 5 | 8 | | | |
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
| 2 | 8 | 1 | | | |
+---+---+---+----+---+---+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_right_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Right,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+---+---+---+----+---+---+
| a | b | c | a | b | c |
+---+---+---+----+---+---+
| | | | 30 | 3 | 6 |
| | | | 40 | 4 | 4 |
| 2 | 7 | 9 | 10 | 2 | 7 |
| 2 | 7 | 9 | 20 | 2 | 5 |
+---+---+---+----+---+---+
");
}
Ok(())
}
#[apply(batch_sizes)]
#[tokio::test]
async fn join_full_with_filter(batch_size: usize) -> Result<()> {
let task_ctx = prepare_task_ctx(batch_size);
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
("c", &vec![7, 8, 9, 1]),
);
let right = build_table(
("a", &vec![10, 20, 30, 40]),
("b", &vec![2, 2, 3, 4]),
("c", &vec![7, 5, 6, 4]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
let filter = prepare_join_filter();
let join = join_with_filter(
left,
right,
on,
filter,
&JoinType::Full,
NullEquality::NullEqualsNothing,
)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = [
"+---+---+---+----+---+---+",
"| a | b | c | a | b | c |",
"+---+---+---+----+---+---+",
"| | | | 30 | 3 | 6 |",
"| | | | 40 | 4 | 4 |",
"| 2 | 7 | 9 | 10 | 2 | 7 |",
"| 2 | 7 | 9 | 20 | 2 | 5 |",
"| 0 | 4 | 7 | | | |",
"| 1 | 5 | 8 | | | |",
"| 2 | 8 | 1 | | | |",
"+---+---+---+----+---+---+",
];
assert_batches_sorted_eq!(expected, &batches);
Ok(())
}
#[tokio::test]
async fn test_collect_left_multiple_partitions_join() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b2", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let expected_inner = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
let expected_left = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 3 | 7 | 9 | | | |",
"+----+----+----+----+----+----+",
];
let expected_right = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| | | | 30 | 6 | 90 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
let expected_full = vec![
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| | | | 30 | 6 | 90 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 3 | 7 | 9 | | | |",
"+----+----+----+----+----+----+",
];
let expected_left_semi = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 1 | 4 | 7 |",
"| 2 | 5 | 8 |",
"+----+----+----+",
];
let expected_left_anti = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 3 | 7 | 9 |",
"+----+----+----+",
];
let expected_right_semi = vec![
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 10 | 4 | 70 |",
"| 20 | 5 | 80 |",
"+----+----+----+",
];
let expected_right_anti = vec![
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 30 | 6 | 90 |",
"+----+----+----+",
];
let expected_left_mark = vec![
"+----+----+----+-------+",
"| a1 | b1 | c1 | mark |",
"+----+----+----+-------+",
"| 1 | 4 | 7 | true |",
"| 2 | 5 | 8 | true |",
"| 3 | 7 | 9 | false |",
"+----+----+----+-------+",
];
let expected_right_mark = vec![
"+----+----+----+-------+",
"| a2 | b2 | c2 | mark |",
"+----+----+----+-------+",
"| 10 | 4 | 70 | true |",
"| 20 | 5 | 80 | true |",
"| 30 | 6 | 90 | false |",
"+----+----+----+-------+",
];
let test_cases = vec![
(JoinType::Inner, expected_inner),
(JoinType::Left, expected_left),
(JoinType::Right, expected_right),
(JoinType::Full, expected_full),
(JoinType::LeftSemi, expected_left_semi),
(JoinType::LeftAnti, expected_left_anti),
(JoinType::RightSemi, expected_right_semi),
(JoinType::RightAnti, expected_right_anti),
(JoinType::LeftMark, expected_left_mark),
(JoinType::RightMark, expected_right_mark),
];
for (join_type, expected) in test_cases {
let (_, batches, metrics) = join_collect_with_partition_mode(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
Arc::clone(&task_ctx),
)
.await?;
assert_batches_sorted_eq!(expected, &batches);
assert_join_metrics!(metrics, expected.len() - 4);
}
Ok(())
}
#[tokio::test]
async fn join_date32() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("date", DataType::Date32, false),
Field::new("n", DataType::Int32, false),
]));
let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
let left =
TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
.unwrap();
let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
)];
let join = join(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
)?;
let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches), @r"
+------------+---+------------+---+
| date | n | date | n |
+------------+---+------------+---+
| 2022-04-26 | 2 | 2022-04-26 | 4 |
| 2022-04-26 | 2 | 2022-04-26 | 5 |
| 2022-04-27 | 3 | 2022-04-27 | 6 |
+------------+---+------------+---+
");
}
Ok(())
}
#[tokio::test]
async fn join_with_error_right() {
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
("c1", &vec![7, 8, 9]),
);
let err = exec_err!("bad data error");
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
let schema = right.schema();
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let join = join(
Arc::clone(&left),
Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();
let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx).unwrap();
let result_string = common::collect(stream).await.unwrap_err().to_string();
assert!(
result_string.contains("bad data error"),
"actual: {result_string}"
);
}
}
#[tokio::test]
async fn join_split_batch() {
let left = build_table(
("a1", &vec![1, 2, 3, 4]),
("b1", &vec![1, 1, 1, 1]),
("c1", &vec![0, 0, 0, 0]),
);
let right = build_table(
("a2", &vec![10, 20, 30, 40, 50]),
("b2", &vec![1, 1, 1, 1, 1]),
("c2", &vec![0, 0, 0, 0, 0]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::RightSemi,
JoinType::RightAnti,
JoinType::LeftSemi,
JoinType::LeftAnti,
];
let expected_resultset_records = 20;
let common_result = [
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 1 | 0 | 10 | 1 | 0 |",
"| 2 | 1 | 0 | 10 | 1 | 0 |",
"| 3 | 1 | 0 | 10 | 1 | 0 |",
"| 4 | 1 | 0 | 10 | 1 | 0 |",
"| 1 | 1 | 0 | 20 | 1 | 0 |",
"| 2 | 1 | 0 | 20 | 1 | 0 |",
"| 3 | 1 | 0 | 20 | 1 | 0 |",
"| 4 | 1 | 0 | 20 | 1 | 0 |",
"| 1 | 1 | 0 | 30 | 1 | 0 |",
"| 2 | 1 | 0 | 30 | 1 | 0 |",
"| 3 | 1 | 0 | 30 | 1 | 0 |",
"| 4 | 1 | 0 | 30 | 1 | 0 |",
"| 1 | 1 | 0 | 40 | 1 | 0 |",
"| 2 | 1 | 0 | 40 | 1 | 0 |",
"| 3 | 1 | 0 | 40 | 1 | 0 |",
"| 4 | 1 | 0 | 40 | 1 | 0 |",
"| 1 | 1 | 0 | 50 | 1 | 0 |",
"| 2 | 1 | 0 | 50 | 1 | 0 |",
"| 3 | 1 | 0 | 50 | 1 | 0 |",
"| 4 | 1 | 0 | 50 | 1 | 0 |",
"+----+----+----+----+----+----+",
];
let left_batch = [
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"| 1 | 1 | 0 |",
"| 2 | 1 | 0 |",
"| 3 | 1 | 0 |",
"| 4 | 1 | 0 |",
"+----+----+----+",
];
let right_batch = [
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"| 10 | 1 | 0 |",
"| 20 | 1 | 0 |",
"| 30 | 1 | 0 |",
"| 40 | 1 | 0 |",
"| 50 | 1 | 0 |",
"+----+----+----+",
];
let right_empty = [
"+----+----+----+",
"| a2 | b2 | c2 |",
"+----+----+----+",
"+----+----+----+",
];
let left_empty = [
"+----+----+----+",
"| a1 | b1 | c1 |",
"+----+----+----+",
"+----+----+----+",
];
for join_type in join_types {
for batch_size in (1..21).rev() {
let task_ctx = prepare_task_ctx(batch_size);
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();
let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected_batch_count = match join_type {
JoinType::Inner
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
div_ceil(expected_resultset_records, batch_size)
}
_ => div_ceil(expected_resultset_records, batch_size) + 1,
};
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
batches.len()
);
let expected = match join_type {
JoinType::RightSemi => right_batch.to_vec(),
JoinType::RightAnti => right_empty.to_vec(),
JoinType::LeftSemi => left_batch.to_vec(),
JoinType::LeftAnti => left_empty.to_vec(),
_ => common_result.to_vec(),
};
if batches.is_empty() {
assert!(
matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
"Unexpected empty result for {join_type} join"
);
} else {
assert_batches_eq!(expected, &batches);
}
}
}
}
#[tokio::test]
async fn single_partition_join_overallocation() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
);
let right = build_table(
("a2", &vec![10, 11]),
("b2", &vec![12, 13]),
("c2", &vec![14, 15]),
);
let on = vec![(
Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
JoinType::LeftMark,
JoinType::RightMark,
];
for join_type in join_types {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&join_type,
NullEquality::NullEqualsNothing,
)?;
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
);
assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput"
);
}
Ok(())
}
#[tokio::test]
async fn partitioned_join_overallocation() -> Result<()> {
let left_batch = build_table_i32(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
);
let left = TestMemoryExec::try_new_exec(
&[vec![left_batch.clone()], vec![left_batch.clone()]],
left_batch.schema(),
None,
)
.unwrap();
let right_batch = build_table_i32(
("a2", &vec![10, 11]),
("b2", &vec![12, 13]),
("c2", &vec![14, 15]),
);
let right = TestMemoryExec::try_new_exec(
&[vec![right_batch.clone()], vec![right_batch.clone()]],
right_batch.schema(),
None,
)
.unwrap();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
)];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
let join = HashJoinExec::try_new(
Arc::clone(&left) as Arc<dyn ExecutionPlan>,
Arc::clone(&right) as Arc<dyn ExecutionPlan>,
on.clone(),
None,
&join_type,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
)?;
let stream = join.execute(1, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
);
assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput[1]"
);
}
Ok(())
}
fn build_table_struct(
struct_name: &str,
field_name_and_values: (&str, &Vec<Option<i32>>),
nulls: Option<NullBuffer>,
) -> Arc<dyn ExecutionPlan> {
let (field_name, values) = field_name_and_values;
let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
let schema = Schema::new(vec![Field::new(
struct_name,
DataType::Struct(inner_fields.clone().into()),
nulls.is_some(),
)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StructArray::new(
inner_fields.into(),
vec![Arc::new(Int32Array::from(values.clone()))],
nulls,
))],
)
.unwrap();
let schema_ref = batch.schema();
TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
}
#[tokio::test]
async fn join_on_struct() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left =
build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
let right =
build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
let on = vec![(
Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
)];
let (columns, batches, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_eq!(columns, vec!["n1", "n2"]);
allow_duplicates! {
assert_snapshot!(batches_to_string(&batches), @r"
+--------+--------+
| n1 | n2 |
+--------+--------+
| {a: } | {a: } |
| {a: 1} | {a: 1} |
| {a: 2} | {a: 2} |
+--------+--------+
");
}
assert_join_metrics!(metrics, 3);
Ok(())
}
#[tokio::test]
async fn join_on_struct_with_nulls() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left =
build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
let right =
build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
let on = vec![(
Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
)];
let (_, batches_null_eq, metrics) = join_collect(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
&JoinType::Inner,
NullEquality::NullEqualsNull,
Arc::clone(&task_ctx),
)
.await?;
allow_duplicates! {
assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r"
+----+----+
| n1 | n2 |
+----+----+
| | |
+----+----+
");
}
assert_join_metrics!(metrics, 1);
let (_, batches_null_neq, metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
assert_join_metrics!(metrics, 0);
if batches_null_neq.is_empty() {
} else {
let expected_null_neq =
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
assert_batches_eq!(expected_null_neq, &batches_null_neq);
}
Ok(())
}
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
#[tokio::test]
async fn test_hash_join_marks_filter_complete() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 6]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;
dynamic_filter_clone.wait_complete().await;
Ok(())
}
#[tokio::test]
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;
dynamic_filter_clone.wait_complete().await;
Ok(())
}
}