use crate::{
equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal,
EquivalenceProperties, PhysicalExpr,
};
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
UnknownPartitioning(usize),
}
impl Display for Partitioning {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
Partitioning::Hash(phy_exprs, size) => {
let phy_exprs_str = phy_exprs
.iter()
.map(|e| format!("{e}"))
.collect::<Vec<String>>()
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
pub fn satisfy(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties,
) -> bool {
match required {
Distribution::UnspecifiedDistribution => true,
Distribution::SinglePartition if self.partition_count() == 1 => true,
Distribution::HashPartitioned(_) if self.partition_count() == 1 => true,
Distribution::HashPartitioned(required_exprs) => {
match self {
Partitioning::Hash(partition_exprs, _) => {
let fast_match =
physical_exprs_equal(required_exprs, partition_exprs);
if !fast_match {
let eq_groups = eq_properties.eq_group();
if !eq_groups.is_empty() {
let normalized_required_exprs = required_exprs
.iter()
.map(|e| eq_groups.normalize_expr(Arc::clone(e)))
.collect::<Vec<_>>();
let normalized_partition_exprs = partition_exprs
.iter()
.map(|e| eq_groups.normalize_expr(Arc::clone(e)))
.collect::<Vec<_>>();
return physical_exprs_equal(
&normalized_required_exprs,
&normalized_partition_exprs,
);
}
}
fast_match
}
_ => false,
}
}
_ => false,
}
}
pub fn project(
&self,
mapping: &ProjectionMapping,
input_eq_properties: &EquivalenceProperties,
) -> Self {
if let Partitioning::Hash(exprs, part) = self {
let normalized_exprs = input_eq_properties
.project_expressions(exprs, mapping)
.zip(exprs)
.map(|(proj_expr, expr)| {
proj_expr.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
})
.collect();
Partitioning::Hash(normalized_exprs, *part)
} else {
self.clone()
}
}
}
impl PartialEq for Partitioning {
fn eq(&self, other: &Partitioning) -> bool {
match (self, other) {
(
Partitioning::RoundRobinBatch(count1),
Partitioning::RoundRobinBatch(count2),
) if count1 == count2 => true,
(Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
{
true
}
_ => false,
}
}
}
#[derive(Debug, Clone)]
pub enum Distribution {
UnspecifiedDistribution,
SinglePartition,
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
impl Distribution {
pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Partitioning::UnknownPartitioning(partition_count)
}
Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
Distribution::HashPartitioned(expr) => {
Partitioning::Hash(expr, partition_count)
}
}
}
}
impl Display for Distribution {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
Distribution::SinglePartition => write!(f, "SinglePartition"),
Distribution::HashPartitioned(exprs) => {
write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::Column;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
#[test]
fn partitioning_satisfy_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("column_1", DataType::Int64, false),
Field::new("column_2", DataType::Utf8, false),
]));
let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
];
let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
];
let distribution_types = vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
Distribution::HashPartitioned(partition_exprs1.clone()),
];
let single_partition = Partitioning::UnknownPartitioning(1);
let unspecified_partition = Partitioning::UnknownPartitioning(10);
let round_robin_partition = Partitioning::RoundRobinBatch(10);
let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
let eq_properties = EquivalenceProperties::new(schema);
for distribution in distribution_types {
let result = (
single_partition.satisfy(&distribution, &eq_properties),
unspecified_partition.satisfy(&distribution, &eq_properties),
round_robin_partition.satisfy(&distribution, &eq_properties),
hash_partition1.satisfy(&distribution, &eq_properties),
hash_partition2.satisfy(&distribution, &eq_properties),
);
match distribution {
Distribution::UnspecifiedDistribution => {
assert_eq!(result, (true, true, true, true, true))
}
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (true, false, false, true, false))
}
}
}
Ok(())
}
}