use std::fmt::{self, Display};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use super::expr_refers;
use crate::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Dependencies {
sort_exprs: IndexSet<PhysicalSortExpr>,
}
impl Display for Dependencies {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;
let mut iter = self.sort_exprs.iter();
if let Some(dep) = iter.next() {
write!(f, "{dep}")?;
}
for dep in iter {
write!(f, ", {dep}")?;
}
write!(f, "]")
}
}
impl Dependencies {
pub fn new(sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
Self {
sort_exprs: sort_exprs.into_iter().collect(),
}
}
}
impl Deref for Dependencies {
type Target = IndexSet<PhysicalSortExpr>;
fn deref(&self) -> &Self::Target {
&self.sort_exprs
}
}
impl DerefMut for Dependencies {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sort_exprs
}
}
impl IntoIterator for Dependencies {
type Item = PhysicalSortExpr;
type IntoIter = <IndexSet<PhysicalSortExpr> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.sort_exprs.into_iter()
}
}
pub struct DependencyEnumerator<'a> {
seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
}
impl<'a> DependencyEnumerator<'a> {
pub fn new() -> Self {
Self {
seen: IndexMap::new(),
}
}
fn insert(
&mut self,
target: &'a PhysicalSortExpr,
dep: &'a PhysicalSortExpr,
) -> bool {
self.seen.entry(target).or_default().insert(dep)
}
pub fn construct_orderings(
&mut self,
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
let node = dependency_map
.get(referred_sort_expr)
.expect("`referred_sort_expr` should be inside `dependency_map`");
let target = node.target.as_ref().unwrap();
if node.dependencies.is_empty() {
return vec![[target.clone()].into()];
};
node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = if self.insert(target, dep) {
self.construct_orderings(dep, dependency_map)
} else {
vec![]
};
for ordering in orderings.iter_mut() {
ordering.push(target.clone());
}
orderings
})
.collect()
}
}
#[derive(Debug, Default)]
pub struct DependencyMap {
map: IndexMap<PhysicalSortExpr, DependencyNode>,
}
impl DependencyMap {
pub fn insert(
&mut self,
sort_expr: PhysicalSortExpr,
target_sort_expr: Option<PhysicalSortExpr>,
dependency: Option<PhysicalSortExpr>,
) {
let entry = self.map.entry(sort_expr);
let node = entry.or_insert_with(|| DependencyNode {
target: target_sort_expr,
dependencies: Dependencies::default(),
});
node.dependencies.extend(dependency);
}
}
impl Deref for DependencyMap {
type Target = IndexMap<PhysicalSortExpr, DependencyNode>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
impl Display for DependencyMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "DependencyMap: {{")?;
for (sort_expr, node) in self.map.iter() {
writeln!(f, " {sort_expr} --> {node}")?;
}
writeln!(f, "}}")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DependencyNode {
pub(crate) target: Option<PhysicalSortExpr>,
pub(crate) dependencies: Dependencies,
}
impl Display for DependencyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(target) = &self.target {
write!(f, "(target: {target}, ")?;
} else {
write!(f, "(")?;
}
write!(f, "dependencies: [{}])", self.dependencies)
}
}
pub fn referred_dependencies(
dependency_map: &DependencyMap,
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
let mut expr_to_sort_exprs = IndexMap::<_, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = Arc::clone(&sort_expr.expr);
expr_to_sort_exprs
.entry(key)
.or_default()
.insert(sort_expr.clone());
}
expr_to_sort_exprs
.into_values()
.multi_cartesian_product()
.map(Dependencies::new)
.collect()
}
pub fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map
.get(relevant_sort_expr)
.expect("no relevant sort expr found")
.dependencies
.iter()
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
.collect()
}
pub fn generate_dependency_orderings(
dependencies: &Dependencies,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
dependencies
.iter()
.filter_map(|dep| {
let prefixes = construct_prefix_orderings(dep, dependency_map);
(!prefixes.is_empty()).then_some(prefixes)
})
.multi_cartesian_product()
.flat_map(|prefix_orderings| {
let length = prefix_orderings.len();
prefix_orderings
.into_iter()
.permutations(length)
.filter_map(|prefixes| {
prefixes.into_iter().reduce(|mut acc, ordering| {
acc.extend(ordering);
acc
})
})
})
.collect()
}
#[cfg(test)]
mod tests {
use std::ops::Not;
use std::sync::Arc;
use super::*;
use crate::equivalence::tests::{
convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
};
use crate::equivalence::{ProjectionMapping, convert_to_sort_exprs};
use crate::expressions::{BinaryExpr, CastExpr, Column, col};
use crate::projection::tests::output_schema;
use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraint, Constraints, Result};
use datafusion_expr::Operator;
use datafusion_expr::sort_properties::SortProperties;
use datafusion_functions::string::concat;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{
LexRequirement, PhysicalSortRequirement,
};
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));
let input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
let col_a = col("a", &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
let out_schema = output_schema(&projection_mapping, &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
let col_a1 = &col("a1", &out_schema)?;
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let out_properties = input_properties.project(&projection_mapping, out_schema);
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = out_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
assert!(eq_class.contains(col_a3));
assert!(eq_class.contains(col_a4));
Ok(())
}
#[test]
fn project_equivalence_properties_test_multi() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int64, true),
]));
let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
input_properties.add_ordering([
parse_sort_expr("a", &input_schema),
parse_sort_expr("b", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("d", &input_schema),
]);
input_properties.add_ordering([
parse_sort_expr("a", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("b", &input_schema), parse_sort_expr("d", &input_schema),
]);
let proj_exprs = vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
(col("c", &input_schema)?, "c".to_string()),
(col("d", &input_schema)?, "d".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
let out_properties = input_properties.project(&projection_mapping, input_schema);
assert_eq!(
out_properties.to_string(),
"order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]"
);
Ok(())
}
#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
eq_properties.add_equal_conditions(col_a_expr, Arc::clone(&col_c_expr))?;
eq_properties.add_orderings([
vec![PhysicalSortExpr::new_default(Arc::clone(&col_b_expr))],
vec![PhysicalSortExpr::new_default(Arc::clone(&col_c_expr))],
]);
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_orderings([
vec![PhysicalSortExpr::new_default(col_b_expr)],
vec![PhysicalSortExpr::new_default(col_c_expr)],
]);
assert!(eq_properties.oeq_class().eq(expected_eqs.oeq_class()));
Ok(())
}
#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_ordering([
PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr::new(col_b, sort_options_not),
PhysicalSortExpr::new(col_a, sort_options),
]
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_orderings([
vec![PhysicalSortExpr::new(
Arc::new(Column::new("c", 2)),
sort_options,
)],
vec![
PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
],
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr::new(col_b, sort_options_not),
PhysicalSortExpr::new(col_a, sort_options),
]
);
let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_ordering([
PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
PhysicalSortExpr::new(Arc::new(Column::new("c", 2)), sort_options),
PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
assert_eq!(idxs, vec![0]);
Ok(())
}
#[test]
fn test_update_properties() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let col_d = col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_a))?;
eq_properties.add_orderings([
vec![PhysicalSortExpr::new(Arc::clone(&col_b), option_asc)],
vec![PhysicalSortExpr::new(Arc::clone(&col_d), option_asc)],
]);
let test_cases = vec![
(
Arc::new(BinaryExpr::new(col_d, Operator::Plus, Arc::clone(&col_b))) as _,
SortProperties::Ordered(option_asc),
),
(col_b, SortProperties::Ordered(option_asc)),
(Arc::clone(&col_a), SortProperties::Ordered(option_asc)),
(
Arc::new(BinaryExpr::new(col_a, Operator::Plus, col_c)),
SortProperties::Unordered,
),
];
for (expr, expected) in test_cases {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.map(|ordering| ordering.first().clone())
.collect::<Vec<_>>();
let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
expr, expected, expr_props.sort_properties
);
assert_eq!(expr_props.sort_properties, expected, "{err_msg}");
}
Ok(())
}
#[test]
fn test_find_longest_permutation() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_h = &col("h", &test_schema)?;
let a_plus_d = Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_d),
)) as _;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_ordering([
PhysicalSortExpr::new(Arc::clone(col_d), option_asc),
PhysicalSortExpr::new(Arc::clone(col_h), option_desc),
]);
let test_cases = vec![
(vec![col_a], vec![(col_a, option_asc)]),
(vec![col_c], vec![(col_c, option_asc)]),
(
vec![col_d, col_e, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
),
(vec![col_b], vec![]),
(vec![col_d], vec![(col_d, option_asc)]),
(vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
(
vec![col_b, col_d],
vec![(col_d, option_asc), (col_b, option_asc)],
),
(
vec![col_c, col_e],
vec![(col_c, option_asc), (col_e, option_desc)],
),
(
vec![col_d, col_h, col_e, col_f, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_h, col_f, col_b],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_b, col_h, col_f],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_b, option_asc),
(col_h, option_desc),
(col_f, option_asc),
],
),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_find_longest_permutation2() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_h = &col("h", &test_schema)?;
eq_properties.add_constants(vec![ConstExpr::from(Arc::clone(col_h))])?;
let test_cases = vec![
(vec![col_h], vec![(col_h, SortOptions::default())]),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_normalize_sort_reqs() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let requirements = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(
vec![(col_a, Some(option_desc))],
vec![(col_a, Some(option_desc))],
),
(vec![(col_a, None)], vec![(col_a, None)]),
(
vec![(col_c, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(vec![(col_c, None)], vec![(col_a, None)]),
(
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
),
(
vec![(col_d, None), (col_b, None)],
vec![(col_d, None), (col_b, None)],
),
(
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
),
(
vec![(col_e, Some(option_desc)), (col_f, None)],
vec![(col_e, Some(option_desc)), (col_f, None)],
),
(
vec![(col_e, None), (col_f, None)],
vec![(col_e, None), (col_f, None)],
),
];
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_sort_reqs(&reqs);
let expected_normalized = convert_to_sort_reqs(&expected_normalized);
assert_eq!(
eq_properties.normalize_sort_requirements(req).unwrap(),
expected_normalized
);
}
Ok(())
}
#[test]
fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let test_cases = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, None)], vec![(col_a, None)]),
(vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
];
for (reqs, expected) in test_cases.into_iter() {
let reqs = convert_to_sort_reqs(&reqs);
let expected = convert_to_sort_reqs(&expected);
let normalized = eq_properties
.normalize_sort_requirements(reqs.clone())
.unwrap();
assert!(
expected.eq(&normalized),
"error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
);
}
Ok(())
}
#[test]
fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let mut base_properties = EquivalenceProperties::new(Arc::clone(&schema));
base_properties.reorder(
["a", "b", "c"]
.into_iter()
.map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())),
)?;
struct TestCase {
name: &'static str,
constants: Vec<Arc<dyn PhysicalExpr>>,
equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
sort_columns: &'static [&'static str],
should_satisfy_ordering: bool,
}
let col_a = col("a", schema.as_ref())?;
let col_b = col("b", schema.as_ref())?;
let col_c = col("c", schema.as_ref())?;
let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _;
let cases = vec![
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![Arc::clone(&col_b)],
equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![col_b],
equal_conditions: vec![[Arc::clone(&col_a), Arc::clone(&cast_c)]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "not ordered because (b) is not constant",
constants: vec![],
equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
sort_columns: &["c"],
should_satisfy_ordering: false,
},
];
for case in cases {
for properties in [
{
let mut properties = base_properties.clone();
for [left, right] in case.equal_conditions.clone() {
properties.add_equal_conditions(left, right)?
}
properties.add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)?;
properties
},
{
let mut properties = base_properties.clone();
properties.add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)?;
for [left, right] in case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties
},
] {
let sort = case
.sort_columns
.iter()
.map(|&name| col(name, &schema).map(PhysicalSortExpr::new_default))
.collect::<Result<Vec<_>>>()?;
assert_eq!(
properties.ordering_satisfy(sort)?,
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
}
}
Ok(())
}
#[test]
fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_concat_b: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
"concat",
concat(),
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
Field::new("f", DataType::Utf8, true).into(),
Arc::new(ConfigOptions::default()),
));
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
eq_properties.add_ordering([
PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]);
eq_properties.add_equal_conditions(Arc::clone(&col_c), a_concat_b)?;
let orderings = eq_properties.oeq_class();
let expected_ordering1 = [PhysicalSortExpr::new_default(col_c).asc()].into();
let expected_ordering2 = [
PhysicalSortExpr::new_default(col_a).asc(),
PhysicalSortExpr::new_default(col_b).asc(),
]
.into();
assert_eq!(orderings.len(), 2);
assert!(orderings.contains(&expected_ordering1));
assert!(orderings.contains(&expected_ordering2));
Ok(())
}
#[test]
fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_times_b = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
Operator::Multiply,
Arc::clone(&col_b),
)) as _;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let initial_ordering: LexOrdering = [
PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
PhysicalSortExpr::new_default(col_a).asc(),
PhysicalSortExpr::new_default(col_b).asc(),
]
.into();
eq_properties.add_ordering(initial_ordering.clone());
eq_properties.add_equal_conditions(col_c, a_times_b)?;
let orderings = eq_properties.oeq_class();
assert_eq!(orderings.len(), 1);
assert!(orderings.contains(&initial_ordering));
Ok(())
}
#[test]
fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let a_concat_b = Arc::new(ScalarFunctionExpr::new(
"concat",
concat(),
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
Field::new("f", DataType::Utf8, true).into(),
Arc::new(ConfigOptions::default()),
)) as _;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
eq_properties.add_ordering([
PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
]);
eq_properties.add_equal_conditions(col_c, Arc::clone(&a_concat_b))?;
let orderings = eq_properties.oeq_class();
let expected_ordering1 = [PhysicalSortExpr::new_default(a_concat_b).asc()].into();
let expected_ordering2 = [
PhysicalSortExpr::new_default(col_a).asc(),
PhysicalSortExpr::new_default(col_b).asc(),
]
.into();
assert_eq!(orderings.len(), 2);
assert!(orderings.contains(&expected_ordering1));
assert!(orderings.contains(&expected_ordering2));
Ok(())
}
#[test]
fn test_requirements_compatible() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let lex_a: LexRequirement =
[PhysicalSortRequirement::new(Arc::clone(&col_a), None)].into();
let lex_a_b: LexRequirement = [
PhysicalSortRequirement::new(col_a, None),
PhysicalSortRequirement::new(col_b, None),
]
.into();
let lex_c = [PhysicalSortRequirement::new(col_c, None)].into();
assert!(eq_properties.requirements_compatible(lex_a.clone(), lex_a.clone()));
assert!(!eq_properties.requirements_compatible(lex_a.clone(), lex_a_b.clone()));
assert!(eq_properties.requirements_compatible(lex_a_b, lex_a.clone()));
assert!(!eq_properties.requirements_compatible(lex_c, lex_a));
Ok(())
}
#[test]
fn test_with_reorder_constant_filtering() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_a))])?;
let sort_exprs = vec![
PhysicalSortExpr::new_default(Arc::clone(&col_a)),
PhysicalSortExpr::new_default(Arc::clone(&col_b)),
];
let change = eq_properties.reorder(sort_exprs)?;
assert!(change);
assert_eq!(eq_properties.oeq_class().len(), 1);
let ordering = eq_properties.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);
assert!(ordering[0].expr.eq(&col_a));
assert!(ordering[1].expr.eq(&col_b));
Ok(())
}
#[test]
fn test_with_reorder_preserve_suffix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_ordering([
PhysicalSortExpr::new(Arc::clone(&col_a), asc),
PhysicalSortExpr::new(Arc::clone(&col_b), desc),
PhysicalSortExpr::new(Arc::clone(&col_c), asc),
]);
let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), asc)];
let change = eq_properties.reorder(new_order)?;
assert!(!change);
assert_eq!(eq_properties.oeq_class().len(), 1);
let ordering = eq_properties.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 3);
assert!(ordering[0].expr.eq(&col_a));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_b));
assert!(ordering[1].options.eq(&desc));
assert!(ordering[2].expr.eq(&col_c));
assert!(ordering[2].options.eq(&asc));
Ok(())
}
#[test]
fn test_with_reorder_equivalent_expressions() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
eq_properties.add_equal_conditions(Arc::clone(&col_a), Arc::clone(&col_b))?;
eq_properties.add_ordering([
PhysicalSortExpr::new_default(Arc::clone(&col_a)),
PhysicalSortExpr::new_default(Arc::clone(&col_c)),
]);
let new_order = vec![PhysicalSortExpr::new_default(Arc::clone(&col_b))];
let change = eq_properties.reorder(new_order)?;
assert!(!change);
assert_eq!(eq_properties.oeq_class().len(), 1);
let asc = SortOptions::default();
let ordering = eq_properties.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);
assert!(ordering[0].expr.eq(&col_a) || ordering[0].expr.eq(&col_b));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_c));
assert!(ordering[1].options.eq(&asc));
Ok(())
}
#[test]
fn test_with_reorder_incompatible_prefix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_ordering([
PhysicalSortExpr::new(Arc::clone(&col_a), asc),
PhysicalSortExpr::new(Arc::clone(&col_b), desc),
]);
let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), desc)];
let change = eq_properties.reorder(new_order.clone())?;
assert!(change);
assert_eq!(eq_properties.oeq_class().len(), 1);
let ordering = eq_properties.oeq_class().iter().next().unwrap();
assert_eq!(ordering.to_vec(), new_order);
Ok(())
}
#[test]
fn test_with_reorder_comprehensive() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let col_d = col("d", &schema)?;
let col_e = col("e", &schema)?;
eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_c))])?;
eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_d))?;
eq_properties.add_orderings([
vec![
PhysicalSortExpr::new_default(Arc::clone(&col_d)),
PhysicalSortExpr::new_default(Arc::clone(&col_a)),
],
vec![PhysicalSortExpr::new_default(Arc::clone(&col_e))],
]);
let new_order = vec![
PhysicalSortExpr::new_default(Arc::clone(&col_b)),
PhysicalSortExpr::new_default(Arc::clone(&col_c)),
];
let old_orderings = eq_properties.oeq_class().clone();
let change = eq_properties.reorder(new_order)?;
assert!(!change);
assert_eq!(eq_properties.oeq_class, old_orderings);
Ok(())
}
#[test]
fn test_ordering_satisfaction_with_key_constraints() -> Result<()> {
let pk_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]));
let unique_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]));
let test_cases = vec![
(
"single column primary key",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0])],
vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
vec![vec!["b", "a"], vec!["c", "a"]],
),
(
"single column unique",
&unique_schema,
vec![Constraint::Unique(vec![0])],
vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
vec![vec!["b", "a"], vec!["c", "a"]],
),
(
"multi-column primary key",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0, 1])],
vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
vec![vec!["b", "a"], vec!["a", "c", "b"]],
),
(
"multi-column unique",
&unique_schema,
vec![Constraint::Unique(vec![0, 1])],
vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
vec![vec!["b", "a"], vec!["c", "a", "b"]],
),
(
"nullable unique",
&unique_schema,
vec![Constraint::Unique(vec![2, 3])],
vec!["c", "d"], vec![],
vec![vec!["c", "d", "a"]],
),
(
"ordering with arbitrary column unique",
&unique_schema,
vec![Constraint::Unique(vec![0, 1])],
vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
vec![vec!["a", "b", "d"]],
),
(
"ordering with arbitrary column pk",
&pk_schema,
vec![Constraint::PrimaryKey(vec![0, 1])],
vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
vec![vec!["a", "b", "d"]],
),
(
"ordering with arbitrary column pk complex",
&pk_schema,
vec![Constraint::PrimaryKey(vec![3, 1])],
vec!["b", "a", "d"], vec![vec!["b", "a", "d", "c"]],
vec![vec!["b", "c", "d", "a"], vec!["b", "a", "c", "d"]],
),
];
for (
name,
schema,
constraints,
base_order,
satisfied_orders,
unsatisfied_orders,
) in test_cases
{
let mut eq_properties = EquivalenceProperties::new(Arc::clone(schema));
let satisfied_orderings: Vec<_> = satisfied_orders
.iter()
.map(|cols| {
cols.iter()
.map(|col_name| {
PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
})
.collect::<Vec<_>>()
})
.collect();
let unsatisfied_orderings: Vec<_> = unsatisfied_orders
.iter()
.map(|cols| {
cols.iter()
.map(|col_name| {
PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
})
.collect::<Vec<_>>()
})
.collect();
for ordering in satisfied_orderings.clone() {
let err_msg = format!(
"{name}: ordering {ordering:?} should not be satisfied before adding constraints",
);
assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
}
let base_ordering = base_order.iter().map(|col_name| PhysicalSortExpr {
expr: col(col_name, schema).unwrap(),
options: SortOptions::default(),
});
eq_properties.add_ordering(base_ordering);
eq_properties =
eq_properties.with_constraints(Constraints::new_unverified(constraints));
for ordering in satisfied_orderings {
let err_msg = format!(
"{name}: ordering {ordering:?} should be satisfied after adding constraints",
);
assert!(eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
}
for ordering in unsatisfied_orderings {
let err_msg = format!(
"{name}: ordering {ordering:?} should not be satisfied after adding constraints",
);
assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
}
}
Ok(())
}
}