use crate::{
EquivalenceProperties, PhysicalExpr, equivalence::ProjectionMapping,
expressions::UnKnownColumn, physical_exprs_equal,
};
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
UnknownPartitioning(usize),
}
impl Display for Partitioning {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
Partitioning::Hash(phy_exprs, size) => {
let phy_exprs_str = phy_exprs
.iter()
.map(|e| format!("{e}"))
.collect::<Vec<String>>()
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitioningSatisfaction {
NotSatisfied,
Exact,
Subset,
}
impl PartitioningSatisfaction {
pub fn is_satisfied(&self) -> bool {
matches!(self, Self::Exact | Self::Subset)
}
pub fn is_subset(&self) -> bool {
*self == Self::Subset
}
}
impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
fn is_subset_partitioning(
subset_exprs: &[Arc<dyn PhysicalExpr>],
superset_exprs: &[Arc<dyn PhysicalExpr>],
) -> bool {
if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() {
return false;
}
subset_exprs.iter().all(|subset_expr| {
superset_exprs
.iter()
.any(|superset_expr| subset_expr.eq(superset_expr))
})
}
#[deprecated(since = "52.0.0", note = "Use satisfaction instead")]
pub fn satisfy(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties,
) -> bool {
self.satisfaction(required, eq_properties, false)
== PartitioningSatisfaction::Exact
}
pub fn satisfaction(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties,
allow_subset: bool,
) -> PartitioningSatisfaction {
match required {
Distribution::UnspecifiedDistribution => PartitioningSatisfaction::Exact,
Distribution::SinglePartition if self.partition_count() == 1 => {
PartitioningSatisfaction::Exact
}
Distribution::HashPartitioned(_) if self.partition_count() == 1 => {
PartitioningSatisfaction::Exact
}
Distribution::HashPartitioned(required_exprs) => match self {
Partitioning::Hash(partition_exprs, _) => {
if partition_exprs.is_empty() || required_exprs.is_empty() {
return PartitioningSatisfaction::NotSatisfied;
}
if physical_exprs_equal(required_exprs, partition_exprs) {
return PartitioningSatisfaction::Exact;
}
let eq_groups = eq_properties.eq_group();
if !eq_groups.is_empty() {
let normalized_required_exprs = required_exprs
.iter()
.map(|e| eq_groups.normalize_expr(Arc::clone(e)))
.collect::<Vec<_>>();
let normalized_partition_exprs = partition_exprs
.iter()
.map(|e| eq_groups.normalize_expr(Arc::clone(e)))
.collect::<Vec<_>>();
if physical_exprs_equal(
&normalized_required_exprs,
&normalized_partition_exprs,
) {
return PartitioningSatisfaction::Exact;
}
if allow_subset
&& Self::is_subset_partitioning(
&normalized_partition_exprs,
&normalized_required_exprs,
)
{
return PartitioningSatisfaction::Subset;
}
} else if allow_subset
&& Self::is_subset_partitioning(partition_exprs, required_exprs)
{
return PartitioningSatisfaction::Subset;
}
PartitioningSatisfaction::NotSatisfied
}
_ => PartitioningSatisfaction::NotSatisfied,
},
_ => PartitioningSatisfaction::NotSatisfied,
}
}
pub fn project(
&self,
mapping: &ProjectionMapping,
input_eq_properties: &EquivalenceProperties,
) -> Self {
if let Partitioning::Hash(exprs, part) = self {
let normalized_exprs = input_eq_properties
.project_expressions(exprs, mapping)
.zip(exprs)
.map(|(proj_expr, expr)| {
proj_expr.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
})
.collect();
Partitioning::Hash(normalized_exprs, *part)
} else {
self.clone()
}
}
}
impl PartialEq for Partitioning {
fn eq(&self, other: &Partitioning) -> bool {
match (self, other) {
(
Partitioning::RoundRobinBatch(count1),
Partitioning::RoundRobinBatch(count2),
) if count1 == count2 => true,
(Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
{
true
}
_ => false,
}
}
}
#[derive(Debug, Clone)]
pub enum Distribution {
UnspecifiedDistribution,
SinglePartition,
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
impl Distribution {
pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Partitioning::UnknownPartitioning(partition_count)
}
Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
Distribution::HashPartitioned(expr) => {
Partitioning::Hash(expr, partition_count)
}
}
}
}
impl Display for Distribution {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
Distribution::SinglePartition => write!(f, "SinglePartition"),
Distribution::HashPartitioned(exprs) => {
write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::Column;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
#[test]
fn partitioning_satisfy_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("column_1", DataType::Int64, false),
Field::new("column_2", DataType::Utf8, false),
]));
let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
];
let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
];
let distribution_types = vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
Distribution::HashPartitioned(partition_exprs1.clone()),
];
let single_partition = Partitioning::UnknownPartitioning(1);
let unspecified_partition = Partitioning::UnknownPartitioning(10);
let round_robin_partition = Partitioning::RoundRobinBatch(10);
let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
let eq_properties = EquivalenceProperties::new(schema);
for distribution in distribution_types {
let result = (
single_partition
.satisfaction(&distribution, &eq_properties, true)
.is_satisfied(),
unspecified_partition
.satisfaction(&distribution, &eq_properties, true)
.is_satisfied(),
round_robin_partition
.satisfaction(&distribution, &eq_properties, true)
.is_satisfied(),
hash_partition1
.satisfaction(&distribution, &eq_properties, true)
.is_satisfied(),
hash_partition2
.satisfaction(&distribution, &eq_properties, true)
.is_satisfied(),
);
match distribution {
Distribution::UnspecifiedDistribution => {
assert_eq!(result, (true, true, true, true, true))
}
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (true, false, false, true, false))
}
}
}
Ok(())
}
#[test]
fn test_partitioning_satisfy_by_subset() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let col_c: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("c", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([a]) vs Hash([a, b])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
]),
PartitioningSatisfaction::Subset,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a]) vs Hash([a, b, c])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
Arc::clone(&col_c),
]),
PartitioningSatisfaction::Subset,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a, b]) vs Hash([a, b, c])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
Arc::clone(&col_c),
]),
PartitioningSatisfaction::Subset,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([b]) vs Hash([a, b, c])",
Partitioning::Hash(vec![Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
Arc::clone(&col_c),
]),
PartitioningSatisfaction::Subset,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([b, a]) vs Hash([a, b, c])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
Arc::clone(&col_c),
]),
PartitioningSatisfaction::Subset,
PartitioningSatisfaction::NotSatisfied,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_current_superset() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let col_c: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("c", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([a, b]) vs Hash([a])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a, b, c]) vs Hash([a])",
Partitioning::Hash(
vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
4,
),
Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a, b, c]) vs Hash([a, b])",
Partitioning::Hash(
vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
4,
),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_partial_overlap() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let col_c: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("c", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![(
"Partial overlap: Hash([a, c]) vs Hash([a, b])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_c)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
)];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_no_overlap() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let col_c: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("c", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([a]) vs Hash([b, c])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_b),
Arc::clone(&col_c),
]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a, b]) vs Hash([c])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&col_c)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_exact_match() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([a, b]) vs Hash([a, b])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
]),
PartitioningSatisfaction::Exact,
PartitioningSatisfaction::Exact,
),
(
"Hash([a]) vs Hash([a])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
PartitioningSatisfaction::Exact,
PartitioningSatisfaction::Exact,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_unknown() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let unknown: Arc<dyn PhysicalExpr> = Arc::new(UnKnownColumn::new("dropped"));
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([unknown]) vs Hash([a, b])",
Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
Distribution::HashPartitioned(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a, b]) vs Hash([unknown])",
Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([unknown]) vs Hash([unknown])",
Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
#[test]
fn test_partitioning_empty_hash() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let test_cases = vec![
(
"Hash([]) vs Hash([a])",
Partitioning::Hash(vec![], 4),
Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([a]) vs Hash([])",
Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
Distribution::HashPartitioned(vec![]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([]) vs Hash([])",
Partitioning::Hash(vec![], 4),
Distribution::HashPartitioned(vec![]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
];
for (desc, partition, required, expected_with_subset, expected_without_subset) in
test_cases
{
let result = partition.satisfaction(&required, &eq_properties, true);
assert_eq!(
result, expected_with_subset,
"Failed for {desc} with subset enabled"
);
let result = partition.satisfaction(&required, &eq_properties, false);
assert_eq!(
result, expected_without_subset,
"Failed for {desc} with subset disabled"
);
}
Ok(())
}
}