use std::fmt::{self, Display};
use std::sync::Arc;
use crate::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use indexmap::IndexSet;
use indexmap::IndexMap;
use itertools::Itertools;
use super::{expr_refers, ExprWrapper};
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Dependencies {
inner: IndexSet<PhysicalSortExpr>,
}
impl Display for Dependencies {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;
let mut iter = self.inner.iter();
if let Some(dep) = iter.next() {
write!(f, "{}", dep)?;
}
for dep in iter {
write!(f, ", {}", dep)?;
}
write!(f, "]")
}
}
impl Dependencies {
fn new() -> Self {
Self {
inner: IndexSet::new(),
}
}
pub fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
Self {
inner: iter.into_iter().collect(),
}
}
pub fn insert(&mut self, sort_expr: PhysicalSortExpr) {
self.inner.insert(sort_expr);
}
pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
self.inner.iter()
}
pub fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
self.inner
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
pub struct DependencyEnumerator<'a> {
seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
}
impl<'a> DependencyEnumerator<'a> {
pub fn new() -> Self {
Self {
seen: IndexMap::new(),
}
}
fn insert(
&mut self,
target: &'a PhysicalSortExpr,
dep: &'a PhysicalSortExpr,
) -> bool {
self.seen.entry(target).or_default().insert(dep)
}
pub fn construct_orderings(
&mut self,
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
let node = dependency_map
.get(referred_sort_expr)
.expect("`referred_sort_expr` should be inside `dependency_map`");
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
if node.dependencies.is_empty() {
return vec![LexOrdering::new(vec![target_sort_expr.clone()])];
};
node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = if self.insert(target_sort_expr, dep) {
self.construct_orderings(dep, dependency_map)
} else {
vec![]
};
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
orderings
})
.collect()
}
}
#[derive(Debug)]
pub struct DependencyMap {
inner: IndexMap<PhysicalSortExpr, DependencyNode>,
}
impl DependencyMap {
pub fn new() -> Self {
Self {
inner: IndexMap::new(),
}
}
pub fn insert(
&mut self,
sort_expr: &PhysicalSortExpr,
target_sort_expr: Option<&PhysicalSortExpr>,
dependency: Option<&PhysicalSortExpr>,
) {
self.inner
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.cloned(),
dependencies: Dependencies::new(),
})
.insert_dependency(dependency)
}
pub fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr, &DependencyNode)> {
self.inner.iter()
}
pub fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
self.inner.keys()
}
pub fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
self.inner.get(sort_expr)
}
}
impl Display for DependencyMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "DependencyMap: {{")?;
for (sort_expr, node) in self.inner.iter() {
writeln!(f, " {sort_expr} --> {node}")?;
}
writeln!(f, "}}")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DependencyNode {
pub target_sort_expr: Option<PhysicalSortExpr>,
pub dependencies: Dependencies,
}
impl DependencyNode {
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
}
}
}
impl Display for DependencyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(target) = &self.target_sort_expr {
write!(f, "(target: {}, ", target)?;
} else {
write!(f, "(")?;
}
write!(f, "dependencies: [{}])", self.dependencies)
}
}
pub fn referred_dependencies(
dependency_map: &DependencyMap,
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.sort_exprs()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
expr_to_sort_exprs
.entry(key)
.or_default()
.insert(sort_expr.clone());
}
let dependencies = expr_to_sort_exprs
.into_values()
.map(Dependencies::into_inner)
.collect::<Vec<_>>();
dependencies
.iter()
.multi_cartesian_product()
.map(|referred_deps| {
Dependencies::new_from_iter(referred_deps.into_iter().cloned())
})
.collect()
}
pub fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map
.get(relevant_sort_expr)
.expect("no relevant sort expr found")
.dependencies
.iter()
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
.collect()
}
pub fn generate_dependency_orderings(
dependencies: &Dependencies,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let relevant_prefixes = dependencies
.iter()
.flat_map(|dep| {
let prefixes = construct_prefix_orderings(dep, dependency_map);
(!prefixes.is_empty()).then_some(prefixes)
})
.collect::<Vec<_>>();
if relevant_prefixes.is_empty() {
return vec![LexOrdering::default()];
}
relevant_prefixes
.into_iter()
.multi_cartesian_product()
.flat_map(|prefix_orderings| {
prefix_orderings
.iter()
.permutations(prefix_orderings.len())
.map(|prefixes| {
prefixes
.into_iter()
.flat_map(|ordering| ordering.clone())
.collect()
})
.collect::<Vec<_>>()
})
.collect()
}
#[cfg(test)]
mod tests {
use std::ops::Not;
use std::sync::Arc;
use super::*;
use crate::equivalence::tests::{
convert_to_sort_exprs, convert_to_sort_reqs, create_test_params,
create_test_schema, output_schema, parse_sort_expr,
};
use crate::equivalence::ProjectionMapping;
use crate::expressions::{col, BinaryExpr, CastExpr, Column};
use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion_common::{Constraint, Constraints, Result};
use datafusion_expr::sort_properties::SortProperties;
use datafusion_expr::Operator;
use datafusion_functions::string::concat;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));
let input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
let col_a = col("a", &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_schema = output_schema(&projection_mapping, &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let col_a1 = &col("a1", &out_schema)?;
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let out_properties = input_properties.project(&projection_mapping, out_schema);
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = out_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
assert!(eq_class.contains(col_a3));
assert!(eq_class.contains(col_a4));
Ok(())
}
#[test]
fn project_equivalence_properties_test_multi() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int64, true),
]));
let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
input_properties.add_new_ordering(LexOrdering::new(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("b", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("d", &input_schema),
]));
input_properties.add_new_ordering(LexOrdering::new(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("b", &input_schema), parse_sort_expr("d", &input_schema),
]));
let proj_exprs = vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
(col("c", &input_schema)?, "c".to_string()),
(col("d", &input_schema)?, "d".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_properties = input_properties.project(&projection_mapping, input_schema);
assert_eq!(
out_properties.to_string(),
"order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]"
);
Ok(())
}
#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let sort_options = SortOptions::default();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr)?;
let others = vec![
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_b_expr),
options: sort_options,
}]),
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_c_expr),
options: sort_options,
}]),
];
eq_properties.add_new_orderings(others);
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_new_orderings([
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_b_expr),
options: sort_options,
}]),
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_c_expr),
options: sort_options,
}]),
]);
let oeq_class = eq_properties.oeq_class().clone();
let expected = expected_eqs.oeq_class();
assert!(oeq_class.eq(expected));
Ok(())
}
#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [Arc::clone(col_b), Arc::clone(col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
])]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(col_b),
options: sort_options_not
},
PhysicalSortExpr {
expr: Arc::clone(col_a),
options: sort_options
}
])
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [Arc::clone(col_b), Arc::clone(col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
}]),
LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]),
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(col_b),
options: sort_options_not
},
PhysicalSortExpr {
expr: Arc::clone(col_a),
options: sort_options
}
])
);
let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
])]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0]);
Ok(())
}
#[test]
fn test_update_properties() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
eq_properties.add_equal_conditions(col_b, col_a)?;
eq_properties.add_new_orderings(vec![
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(col_b),
options: option_asc,
}]),
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(col_d),
options: option_asc,
}]),
]);
let test_cases = vec![
(
Arc::new(BinaryExpr::new(
Arc::clone(col_d),
Operator::Plus,
Arc::clone(col_b),
)) as Arc<dyn PhysicalExpr>,
SortProperties::Ordered(option_asc),
),
(Arc::clone(col_b), SortProperties::Ordered(option_asc)),
(Arc::clone(col_a), SortProperties::Ordered(option_asc)),
(
Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_c),
)),
SortProperties::Unordered,
),
];
for (expr, expected) in test_cases {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
expr, expected, expr_props.sort_properties
);
assert_eq!(expr_props.sort_properties, expected, "{}", err_msg);
}
Ok(())
}
#[test]
fn test_find_longest_permutation() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_h = &col("h", &test_schema)?;
let a_plus_d = Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_d),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(col_d),
options: option_asc,
},
PhysicalSortExpr {
expr: Arc::clone(col_h),
options: option_desc,
},
])]);
let test_cases = vec![
(vec![col_a], vec![(col_a, option_asc)]),
(vec![col_c], vec![(col_c, option_asc)]),
(
vec![col_d, col_e, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
),
(vec![col_b], vec![]),
(vec![col_d], vec![(col_d, option_asc)]),
(vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
(
vec![col_b, col_d],
vec![(col_d, option_asc), (col_b, option_asc)],
),
(
vec![col_c, col_e],
vec![(col_c, option_asc), (col_e, option_desc)],
),
(
vec![col_d, col_h, col_e, col_f, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_h, col_f, col_b],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_b, col_h, col_f],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_b, option_asc),
(col_h, option_desc),
(col_f, option_asc),
],
),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_find_longest_permutation2() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_h = &col("h", &test_schema)?;
eq_properties = eq_properties.with_constants(vec![ConstExpr::from(col_h)]);
let test_cases = vec![
(vec![col_h], vec![(col_h, SortOptions::default())]),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_get_finer() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, None), (col_b, Some(option_asc))],
Some(vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))]),
),
(
vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
],
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
Some(vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
]),
),
(
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_a, Some(option_asc)), (col_b, Some(option_desc))],
None,
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_reqs(&lhs);
let rhs = convert_to_sort_reqs(&rhs);
let expected = expected.map(|expected| convert_to_sort_reqs(&expected));
let finer = eq_properties.get_finer_requirement(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_normalize_sort_reqs() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let requirements = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(
vec![(col_a, Some(option_desc))],
vec![(col_a, Some(option_desc))],
),
(vec![(col_a, None)], vec![(col_a, None)]),
(
vec![(col_c, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(vec![(col_c, None)], vec![(col_a, None)]),
(
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
),
(
vec![(col_d, None), (col_b, None)],
vec![(col_d, None), (col_b, None)],
),
(
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
),
(
vec![(col_e, Some(option_desc)), (col_f, None)],
vec![(col_e, Some(option_desc)), (col_f, None)],
),
(
vec![(col_e, None), (col_f, None)],
vec![(col_e, None), (col_f, None)],
),
];
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_sort_reqs(&reqs);
let expected_normalized = convert_to_sort_reqs(&expected_normalized);
assert_eq!(
eq_properties.normalize_sort_requirements(&req),
expected_normalized
);
}
Ok(())
}
#[test]
fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let test_cases = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, None)], vec![(col_a, None)]),
(vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
];
for (reqs, expected) in test_cases.into_iter() {
let reqs = convert_to_sort_reqs(&reqs);
let expected = convert_to_sort_reqs(&expected);
let normalized = eq_properties.normalize_sort_requirements(&reqs);
assert!(
expected.eq(&normalized),
"error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
);
}
Ok(())
}
#[test]
fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let base_properties = EquivalenceProperties::new(Arc::clone(&schema))
.with_reorder(LexOrdering::new(
["a", "b", "c"]
.into_iter()
.map(|c| {
col(c, schema.as_ref()).map(|expr| PhysicalSortExpr {
expr,
options: SortOptions {
descending: false,
nulls_first: true,
},
})
})
.collect::<Result<Vec<_>>>()?,
));
struct TestCase {
name: &'static str,
constants: Vec<Arc<dyn PhysicalExpr>>,
equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
sort_columns: &'static [&'static str],
should_satisfy_ordering: bool,
}
let col_a = col("a", schema.as_ref())?;
let col_b = col("b", schema.as_ref())?;
let col_c = col("c", schema.as_ref())?;
let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None));
let cases = vec![
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![Arc::clone(&col_b)],
equal_conditions: vec![[
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
Arc::clone(&col_a),
]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![col_b],
equal_conditions: vec![[
Arc::clone(&col_a),
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "not ordered because (b) is not constant",
constants: vec![],
equal_conditions: vec![[
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
Arc::clone(&col_a),
]],
sort_columns: &["c"],
should_satisfy_ordering: false,
},
];
for case in cases {
for properties in [
{
let mut properties = base_properties.clone();
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties.with_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)
},
{
let mut properties = base_properties.clone().with_constants(
case.constants.iter().cloned().map(ConstExpr::from),
);
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties
},
] {
let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
})
})
.collect::<Result<LexOrdering>>()?;
assert_eq!(
properties.ordering_satisfy(sort.as_ref()),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
}
}
Ok(())
}
#[test]
fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_concat_b: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
"concat",
concat(),
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
DataType::Utf8,
));
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
eq_properties.add_new_ordering(LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]));
eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
let orderings = eq_properties.oeq_class();
let expected_ordering1 =
LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc()
]);
let expected_ordering2 = LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]);
assert_eq!(orderings.len(), 2);
assert!(orderings.contains(&expected_ordering1));
assert!(orderings.contains(&expected_ordering2));
Ok(())
}
#[test]
fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_times_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
Operator::Multiply,
Arc::clone(&col_b),
));
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let initial_ordering = LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]);
eq_properties.add_new_ordering(initial_ordering.clone());
eq_properties.add_equal_conditions(&col_c, &a_times_b)?;
let orderings = eq_properties.oeq_class();
assert_eq!(orderings.len(), 1);
assert!(orderings.contains(&initial_ordering));
Ok(())
}
#[test]
fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_concat_b: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
"concat",
concat(),
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
DataType::Utf8,
));
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
eq_properties.add_new_ordering(LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]));
eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
let orderings = eq_properties.oeq_class();
let expected_ordering1 = LexOrdering::from(vec![PhysicalSortExpr::new_default(
Arc::clone(&a_concat_b),
)
.asc()]);
let expected_ordering2 = LexOrdering::from(vec![
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]);
assert_eq!(orderings.len(), 2);
assert!(orderings.contains(&expected_ordering1));
assert!(orderings.contains(&expected_ordering2));
Ok(())
}
#[test]
fn test_with_reorder_constant_filtering() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_a)]);
let sort_exprs = LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: SortOptions::default(),
},
]);
let result = eq_properties.with_reorder(sort_exprs);
assert_eq!(result.oeq_class().len(), 1);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 1);
assert!(ordering[0].expr.eq(&col_b));
Ok(())
}
#[test]
fn test_with_reorder_preserve_suffix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
])]);
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
}]);
let result = eq_properties.with_reorder(new_order);
assert_eq!(result.oeq_class().len(), 1);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 3);
assert!(ordering[0].expr.eq(&col_a));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_b));
assert!(ordering[1].options.eq(&desc));
assert!(ordering[2].expr.eq(&col_c));
assert!(ordering[2].options.eq(&asc));
Ok(())
}
#[test]
fn test_with_reorder_equivalent_expressions() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
eq_properties.add_equal_conditions(&col_a, &col_b)?;
let asc = SortOptions::default();
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
])]);
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: asc,
}]);
let result = eq_properties.with_reorder(new_order);
assert_eq!(result.oeq_class().len(), 1);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);
assert!(ordering[0].expr.eq(&col_b));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_c));
assert!(ordering[1].options.eq(&asc));
Ok(())
}
#[test]
fn test_with_reorder_incompatible_prefix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
])]);
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: desc,
}]);
let result = eq_properties.with_reorder(new_order.clone());
assert_eq!(result.oeq_class().len(), 1);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering, &new_order);
Ok(())
}
#[test]
fn test_with_reorder_comprehensive() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let col_d = col("d", &schema)?;
let col_e = col("e", &schema)?;
let asc = SortOptions::default();
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_c)]);
eq_properties.add_equal_conditions(&col_b, &col_d)?;
eq_properties.add_new_orderings([
LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_d),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
]),
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_e),
options: asc,
}]),
]);
let new_order = LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
]);
let result = eq_properties.with_reorder(new_order);
assert_eq!(result.oeq_class().len(), 1);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);
assert!(
ordering[0].expr.eq(&col_b) || ordering[0].expr.eq(&col_d),
"Expected b or d as first expression, got {:?}",
ordering[0].expr
);
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_a));
assert!(ordering[1].options.eq(&asc));
Ok(())
}
#[test]
fn test_ordering_satisfaction_with_key_constraints() -> Result<()> {
let pk_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]));
let unique_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]));
let test_cases = vec![
(
"single column primary key",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0])],
vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
vec![vec!["b", "a"], vec!["c", "a"]],
),
(
"single column unique",
&unique_schema,
vec![Constraint::Unique(vec![0])],
vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
vec![vec!["b", "a"], vec!["c", "a"]],
),
(
"multi-column primary key",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0, 1])],
vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
vec![vec!["b", "a"], vec!["a", "c", "b"]],
),
(
"multi-column unique",
&unique_schema,
vec![Constraint::Unique(vec![0, 1])],
vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
vec![vec!["b", "a"], vec!["c", "a", "b"]],
),
(
"nullable unique",
&unique_schema,
vec![Constraint::Unique(vec![2, 3])],
vec!["c", "d"], vec![],
vec![vec!["c", "d", "a"]],
),
(
"ordering with arbitrary column unique",
&unique_schema,
vec![Constraint::Unique(vec![0, 1])],
vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
vec![vec!["a", "b", "d"]],
),
(
"ordering with arbitrary column pk",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0, 1])],
vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
vec![vec!["a", "b", "d"]],
),
(
"ordering with arbitrary column pk complex",
&pk_schema,
vec![Constraint::PrimaryKey(vec![3, 1])],
vec!["b", "a", "d"], vec![vec!["b", "a", "d", "c"]],
vec![vec!["b", "c", "d", "a"], vec!["b", "a", "c", "d"]],
),
];
for (
name,
schema,
constraints,
base_order,
satisfied_orders,
unsatisfied_orders,
) in test_cases
{
let mut eq_properties = EquivalenceProperties::new(Arc::clone(schema));
let base_ordering = LexOrdering::new(
base_order
.iter()
.map(|col_name| PhysicalSortExpr {
expr: col(col_name, schema).unwrap(),
options: SortOptions::default(),
})
.collect(),
);
let satisfied_orderings: Vec<LexOrdering> = satisfied_orders
.iter()
.map(|cols| {
LexOrdering::new(
cols.iter()
.map(|col_name| PhysicalSortExpr {
expr: col(col_name, schema).unwrap(),
options: SortOptions::default(),
})
.collect(),
)
})
.collect();
let unsatisfied_orderings: Vec<LexOrdering> = unsatisfied_orders
.iter()
.map(|cols| {
LexOrdering::new(
cols.iter()
.map(|col_name| PhysicalSortExpr {
expr: col(col_name, schema).unwrap(),
options: SortOptions::default(),
})
.collect(),
)
})
.collect();
for ordering in &satisfied_orderings {
assert!(
!eq_properties.ordering_satisfy(ordering),
"{}: ordering {:?} should not be satisfied before adding constraints",
name,
ordering
);
}
eq_properties.add_new_ordering(base_ordering);
eq_properties =
eq_properties.with_constraints(Constraints::new_unverified(constraints));
for ordering in &satisfied_orderings {
assert!(
eq_properties.ordering_satisfy(ordering),
"{}: ordering {:?} should be satisfied after adding constraints",
name,
ordering
);
}
for ordering in &unsatisfied_orderings {
assert!(
!eq_properties.ordering_satisfy(ordering),
"{}: ordering {:?} should not be satisfied after adding constraints",
name,
ordering
);
}
}
Ok(())
}
}