use std::fmt;
use std::sync::Arc;
use crate::ExecutionPlan;
use crate::ExecutionPlanProperties;
use crate::joins::Map;
use crate::joins::PartitionMode;
use crate::joins::hash_join::exec::HASH_JOIN_SEED;
use crate::joins::hash_join::inlist_builder::build_struct_fields;
use crate::joins::hash_join::partitioned_hash_eval::{
HashExpr, HashTableLookupExpr, SeededRandomState,
};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Operator;
use datafusion_functions::core::r#struct as struct_func;
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr};
use parking_lot::Mutex;
use tokio::sync::Barrier;
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ColumnBounds {
pub(crate) min: ScalarValue,
pub(crate) max: ScalarValue,
}
impl ColumnBounds {
pub(crate) fn new(min: ScalarValue, max: ScalarValue) -> Self {
Self { min, max }
}
}
#[derive(Debug, Clone)]
pub(crate) struct PartitionBounds {
column_bounds: Vec<ColumnBounds>,
}
impl PartitionBounds {
pub(crate) fn new(column_bounds: Vec<ColumnBounds>) -> Self {
Self { column_bounds }
}
pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> {
self.column_bounds.get(index)
}
}
fn create_membership_predicate(
on_right: &[PhysicalExprRef],
pushdown: PushdownStrategy,
random_state: &SeededRandomState,
schema: &Schema,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
match pushdown {
PushdownStrategy::InList(in_list_array) => {
let expr = if on_right.len() == 1 {
Arc::clone(&on_right[0])
} else {
let fields = build_struct_fields(
on_right
.iter()
.map(|r| r.data_type(schema))
.collect::<Result<Vec<_>>>()?
.as_ref(),
)?;
let return_field =
Arc::new(Field::new("struct", DataType::Struct(fields), true));
Arc::new(ScalarFunctionExpr::new(
"struct",
struct_func(),
on_right.to_vec(),
return_field,
Arc::new(ConfigOptions::default()),
)) as Arc<dyn PhysicalExpr>
};
Ok(Some(Arc::new(InListExpr::try_new_from_array(
expr,
in_list_array,
false,
)?)))
}
PushdownStrategy::Map(hash_map) => Ok(Some(Arc::new(HashTableLookupExpr::new(
on_right.to_vec(),
random_state.clone(),
hash_map,
"hash_lookup".to_string(),
)) as Arc<dyn PhysicalExpr>)),
PushdownStrategy::Empty => Ok(None),
}
}
fn create_bounds_predicate(
on_right: &[PhysicalExprRef],
bounds: &PartitionBounds,
) -> Option<Arc<dyn PhysicalExpr>> {
let mut column_predicates = Vec::new();
for (col_idx, right_expr) in on_right.iter().enumerate() {
if let Some(column_bounds) = bounds.get_column_bounds(col_idx) {
let min_expr = Arc::new(BinaryExpr::new(
Arc::clone(right_expr),
Operator::GtEq,
lit(column_bounds.min.clone()),
)) as Arc<dyn PhysicalExpr>;
let max_expr = Arc::new(BinaryExpr::new(
Arc::clone(right_expr),
Operator::LtEq,
lit(column_bounds.max.clone()),
)) as Arc<dyn PhysicalExpr>;
let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr))
as Arc<dyn PhysicalExpr>;
column_predicates.push(range_expr);
}
}
if column_predicates.is_empty() {
None
} else {
Some(
column_predicates
.into_iter()
.reduce(|acc, pred| {
Arc::new(BinaryExpr::new(acc, Operator::And, pred))
as Arc<dyn PhysicalExpr>
})
.unwrap(),
)
}
}
pub(crate) struct SharedBuildAccumulator {
inner: Mutex<AccumulatedBuildData>,
barrier: Barrier,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
repartition_random_state: SeededRandomState,
probe_schema: Arc<Schema>,
}
#[derive(Clone)]
pub(crate) enum PushdownStrategy {
InList(ArrayRef),
Map(Arc<Map>),
Empty,
}
pub(crate) enum PartitionBuildData {
Partitioned {
partition_id: usize,
pushdown: PushdownStrategy,
bounds: PartitionBounds,
},
CollectLeft {
pushdown: PushdownStrategy,
bounds: PartitionBounds,
},
}
#[derive(Clone)]
struct PartitionData {
bounds: PartitionBounds,
pushdown: PushdownStrategy,
}
enum AccumulatedBuildData {
Partitioned {
partitions: Vec<Option<PartitionData>>,
},
CollectLeft {
data: Option<PartitionData>,
},
}
impl SharedBuildAccumulator {
pub(crate) fn new_from_partition_mode(
partition_mode: PartitionMode,
left_child: &dyn ExecutionPlan,
right_child: &dyn ExecutionPlan,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
repartition_random_state: SeededRandomState,
) -> Self {
let expected_calls = match partition_mode {
PartitionMode::CollectLeft => {
right_child.output_partitioning().partition_count()
}
PartitionMode::Partitioned => {
left_child.output_partitioning().partition_count()
}
PartitionMode::Auto => unreachable!(
"PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"
),
};
let mode_data = match partition_mode {
PartitionMode::Partitioned => AccumulatedBuildData::Partitioned {
partitions: vec![
None;
left_child.output_partitioning().partition_count()
],
},
PartitionMode::CollectLeft => {
AccumulatedBuildData::CollectLeft { data: None }
}
PartitionMode::Auto => unreachable!(
"PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"
),
};
Self {
inner: Mutex::new(mode_data),
barrier: Barrier::new(expected_calls),
dynamic_filter,
on_right,
repartition_random_state,
probe_schema: right_child.schema(),
}
}
pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> {
{
let mut guard = self.inner.lock();
match (data, &mut *guard) {
(
PartitionBuildData::Partitioned {
partition_id,
pushdown,
bounds,
},
AccumulatedBuildData::Partitioned { partitions },
) => {
partitions[partition_id] = Some(PartitionData { pushdown, bounds });
}
(
PartitionBuildData::CollectLeft { pushdown, bounds },
AccumulatedBuildData::CollectLeft { data },
) => {
if data.is_none() {
*data = Some(PartitionData { pushdown, bounds });
}
}
_ => {
return datafusion_common::internal_err!(
"Build data mode mismatch in report_build_data"
);
}
}
}
if self.barrier.wait().await.is_leader() {
let inner = self.inner.lock();
match &*inner {
AccumulatedBuildData::CollectLeft { data } => {
if let Some(partition_data) = data {
let membership_expr = create_membership_predicate(
&self.on_right,
partition_data.pushdown.clone(),
&HASH_JOIN_SEED,
self.probe_schema.as_ref(),
)?;
let bounds_expr = create_bounds_predicate(
&self.on_right,
&partition_data.bounds,
);
if let Some(filter_expr) = match (membership_expr, bounds_expr) {
(Some(membership), Some(bounds)) => {
Some(Arc::new(BinaryExpr::new(
bounds,
Operator::And,
membership,
))
as Arc<dyn PhysicalExpr>)
}
(Some(membership), None) => {
Some(membership)
}
(None, Some(bounds)) => {
Some(bounds)
}
(None, None) => {
None
}
} {
self.dynamic_filter.update(filter_expr)?;
}
}
}
AccumulatedBuildData::Partitioned { partitions } => {
let partition_data: Vec<_> =
partitions.iter().filter_map(|p| p.as_ref()).collect();
if !partition_data.is_empty() {
let num_partitions = partition_data.len();
let routing_hash_expr = Arc::new(HashExpr::new(
self.on_right.clone(),
self.repartition_random_state.clone(),
"hash_repartition".to_string(),
))
as Arc<dyn PhysicalExpr>;
let modulo_expr = Arc::new(BinaryExpr::new(
routing_hash_expr,
Operator::Modulo,
lit(ScalarValue::UInt64(Some(num_partitions as u64))),
))
as Arc<dyn PhysicalExpr>;
let when_then_branches: Vec<(
Arc<dyn PhysicalExpr>,
Arc<dyn PhysicalExpr>,
)> = partitions
.iter()
.enumerate()
.filter_map(|(partition_id, partition_opt)| {
partition_opt.as_ref().and_then(|partition| {
match &partition.pushdown {
PushdownStrategy::Empty => None,
_ => Some((partition_id, partition)),
}
})
})
.map(|(partition_id, partition)| -> Result<_> {
let when_expr =
lit(ScalarValue::UInt64(Some(partition_id as u64)));
let membership_expr = create_membership_predicate(
&self.on_right,
partition.pushdown.clone(),
&HASH_JOIN_SEED,
self.probe_schema.as_ref(),
)?;
let bounds_expr = create_bounds_predicate(
&self.on_right,
&partition.bounds,
);
let then_expr = match (membership_expr, bounds_expr) {
(Some(membership), Some(bounds)) => {
Arc::new(BinaryExpr::new(
bounds,
Operator::And,
membership,
))
as Arc<dyn PhysicalExpr>
}
(Some(membership), None) => {
membership
}
(None, Some(bounds)) => {
bounds
}
(None, None) => {
lit(true)
}
};
Ok((when_expr, then_expr))
})
.collect::<Result<Vec<_>>>()?;
let filter_expr = if when_then_branches.is_empty() {
lit(false)
} else if when_then_branches.len() == 1 {
Arc::clone(&when_then_branches[0].1)
} else {
Arc::new(CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)), )?) as Arc<dyn PhysicalExpr>
};
self.dynamic_filter.update(filter_expr)?;
}
}
}
self.dynamic_filter.mark_complete();
}
Ok(())
}
}
impl fmt::Debug for SharedBuildAccumulator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SharedBuildAccumulator")
}
}