use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;
use std::vec::IntoIter;
use crate::expressions::with_new_schema;
use crate::{LexOrdering, PhysicalExpr, add_offset_to_physical_sort_exprs};
use arrow::compute::SortOptions;
use arrow::datatypes::SchemaRef;
use datafusion_common::{HashSet, Result};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct OrderingEquivalenceClass {
orderings: Vec<LexOrdering>,
}
impl OrderingEquivalenceClass {
pub fn clear(&mut self) {
self.orderings.clear();
}
pub fn new(
orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
) -> Self {
let mut result = Self {
orderings: orderings.into_iter().filter_map(LexOrdering::new).collect(),
};
result.remove_redundant_entries();
result
}
pub fn extend(&mut self, orderings: impl IntoIterator<Item = LexOrdering>) {
self.orderings.extend(orderings);
self.remove_redundant_entries();
}
pub fn add_orderings(
&mut self,
sort_exprs: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
) {
self.orderings
.extend(sort_exprs.into_iter().filter_map(LexOrdering::new));
self.remove_redundant_entries();
}
fn remove_redundant_entries(&mut self) {
let mut work = true;
while work {
work = false;
let mut idx = 0;
'outer: while idx < self.orderings.len() {
let mut ordering_idx = idx + 1;
while ordering_idx < self.orderings.len() {
if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
work = true;
if remove {
self.orderings.swap_remove(idx);
continue 'outer;
}
}
if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
work = true;
if remove {
self.orderings.swap_remove(ordering_idx);
continue;
}
}
ordering_idx += 1;
}
idx += 1;
}
}
}
fn resolve_overlap(&mut self, idx: usize, pre_idx: usize) -> Option<bool> {
let length = self.orderings[idx].len();
let other_length = self.orderings[pre_idx].len();
for overlap in 1..=length.min(other_length) {
if self.orderings[idx][length - overlap..]
== self.orderings[pre_idx][..overlap]
{
return Some(!self.orderings[idx].truncate(length - overlap));
}
}
None
}
pub fn output_ordering(&self) -> Option<LexOrdering> {
self.orderings.iter().cloned().reduce(|mut cat, o| {
cat.extend(o);
cat
})
}
pub fn join_suffix(mut self, other: &Self) -> Self {
let n_ordering = self.orderings.len();
let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering);
self.orderings = self.orderings.into_iter().cycle().take(n_cross).collect();
for (outer_idx, ordering) in other.iter().enumerate() {
let base = outer_idx * n_ordering;
for idx in base..(base + n_ordering) {
self.orderings[idx].extend(ordering.iter().cloned());
}
}
self
}
pub fn add_offset(&mut self, offset: isize) -> Result<()> {
let orderings = std::mem::take(&mut self.orderings);
for ordering_result in orderings
.into_iter()
.map(|o| add_offset_to_physical_sort_exprs(o, offset))
{
self.orderings.extend(LexOrdering::new(ordering_result?));
}
Ok(())
}
pub fn with_new_schema(mut self, schema: &SchemaRef) -> Result<Self> {
self.orderings = self
.orderings
.into_iter()
.map(|ordering| {
ordering
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, schema)?;
Ok(sort_expr)
})
.collect::<Result<Vec<_>>>()
.map(|v| LexOrdering::new(v).unwrap())
})
.collect::<Result<_>>()?;
Ok(self)
}
pub fn get_options(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<SortOptions> {
for ordering in self.iter() {
let leading_ordering = &ordering[0];
if leading_ordering.expr.eq(expr) {
return Some(leading_ordering.options);
}
}
None
}
pub fn is_expr_partial_const(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
let mut constantness_defining_pairs = [
HashSet::from([(false, false), (true, true)]),
HashSet::from([(false, true), (true, false)]),
];
for ordering in self.iter() {
let leading_ordering = ordering.first();
if leading_ordering.expr.eq(expr) {
let opt = (
leading_ordering.options.descending,
leading_ordering.options.nulls_first,
);
constantness_defining_pairs[0].remove(&opt);
constantness_defining_pairs[1].remove(&opt);
}
}
constantness_defining_pairs
.iter()
.any(|pair| pair.is_empty())
}
}
impl Deref for OrderingEquivalenceClass {
type Target = [LexOrdering];
fn deref(&self) -> &Self::Target {
self.orderings.as_slice()
}
}
impl From<Vec<LexOrdering>> for OrderingEquivalenceClass {
fn from(orderings: Vec<LexOrdering>) -> Self {
let mut result = Self { orderings };
result.remove_redundant_entries();
result
}
}
impl IntoIterator for OrderingEquivalenceClass {
type Item = LexOrdering;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.orderings.into_iter()
}
}
impl Display for OrderingEquivalenceClass {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
let mut iter = self.orderings.iter();
if let Some(ordering) = iter.next() {
write!(f, "[{ordering}]")?;
}
for ordering in iter {
write!(f, ", [{ordering}]")?;
}
write!(f, "]")
}
}
impl From<OrderingEquivalenceClass> for Vec<LexOrdering> {
fn from(oeq_class: OrderingEquivalenceClass) -> Self {
oeq_class.orderings
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::equivalence::tests::create_test_schema;
use crate::equivalence::{
EquivalenceClass, EquivalenceGroup, EquivalenceProperties,
OrderingEquivalenceClass, convert_to_orderings, convert_to_sort_exprs,
};
use crate::expressions::{BinaryExpr, Column, col};
use crate::utils::tests::TestScalarUDF;
use crate::{
AcrossPartitions, ConstExpr, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
ScalarFunctionExpr,
};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{Operator, ScalarUDF};
#[test]
fn test_ordering_satisfy() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]));
let crude = vec![PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
}];
let finer = vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: SortOptions::default(),
},
];
let eq_properties_finer = EquivalenceProperties::new_with_orderings(
Arc::clone(&input_schema),
[finer.clone()],
);
assert!(eq_properties_finer.ordering_satisfy(crude.clone())?);
let eq_properties_crude =
EquivalenceProperties::new_with_orderings(Arc::clone(&input_schema), [crude]);
assert!(!eq_properties_crude.ordering_satisfy(finer)?);
Ok(())
}
#[test]
fn test_ordering_satisfy_with_equivalence2() -> Result<()> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
let floor_a = Arc::new(ScalarFunctionExpr::try_new(
Arc::clone(&test_fun),
vec![Arc::clone(col_a)],
&test_schema,
Arc::new(ConfigOptions::default()),
)?) as PhysicalExprRef;
let floor_f = Arc::new(ScalarFunctionExpr::try_new(
Arc::clone(&test_fun),
vec![Arc::clone(col_f)],
&test_schema,
Arc::new(ConfigOptions::default()),
)?) as PhysicalExprRef;
let exp_a = Arc::new(ScalarFunctionExpr::try_new(
Arc::clone(&test_fun),
vec![Arc::clone(col_a)],
&test_schema,
Arc::new(ConfigOptions::default()),
)?) as PhysicalExprRef;
let a_plus_b = Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_b),
)) as Arc<dyn PhysicalExpr>;
let options = SortOptions {
descending: false,
nulls_first: false,
};
let test_cases = vec![
(
vec![
vec![(col_a, options), (col_d, options), (col_b, options)],
vec![(col_c, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(&floor_a, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(&floor_f, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_c, options), (&a_plus_b, options)],
true,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(&floor_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(&exp_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_d, options), (col_b, options)],
vec![(col_c, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_d, options), (&floor_a, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (&floor_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_b, options), (col_c, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![
(col_a, options),
(col_c, options),
(&floor_a, options),
(&a_plus_b, options),
],
false,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(&floor_a, options),
],
true,
),
(
vec![
vec![(col_d, options), (col_b, options)],
vec![(col_c, options), (col_a, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_c, options), (col_d, options), (&a_plus_b, options)],
true,
),
];
for (orderings, eq_group, constants, reqs, expected) in test_cases {
let err_msg = format!(
"error in test orderings: {orderings:?}, eq_group: {eq_group:?}, constants: {constants:?}, reqs: {reqs:?}, expected: {expected:?}"
);
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&test_schema));
let orderings = convert_to_orderings(&orderings);
eq_properties.add_orderings(orderings);
let classes = eq_group
.into_iter()
.map(|eq_class| EquivalenceClass::new(eq_class.into_iter().cloned()));
let eq_group = EquivalenceGroup::new(classes);
eq_properties.add_equivalence_group(eq_group)?;
let constants = constants.into_iter().map(|expr| {
ConstExpr::new(Arc::clone(expr), AcrossPartitions::Uniform(None))
});
eq_properties.add_constants(constants)?;
let reqs = convert_to_sort_exprs(&reqs);
assert_eq!(eq_properties.ordering_satisfy(reqs)?, expected, "{err_msg}");
}
Ok(())
}
#[test]
fn test_ordering_satisfy_different_lengths() -> Result<()> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let options = SortOptions {
descending: false,
nulls_first: false,
};
let mut eq_properties = EquivalenceProperties::new(test_schema);
eq_properties.add_equal_conditions(Arc::clone(col_a), Arc::clone(col_c))?;
let orderings = vec![
vec![(col_a, options)],
vec![(col_e, options)],
vec![(col_d, options), (col_f, options)],
];
let orderings = convert_to_orderings(&orderings);
eq_properties.add_orderings(orderings);
let test_cases = vec![
(
vec![(col_c, options), (col_a, options), (col_e, options)],
true,
),
(vec![(col_c, options), (col_b, options)], false),
(vec![(col_c, options), (col_d, options)], true),
(
vec![(col_d, options), (col_f, options), (col_b, options)],
false,
),
(vec![(col_d, options), (col_f, options)], true),
];
for (reqs, expected) in test_cases {
let err_msg =
format!("error in test reqs: {reqs:?}, expected: {expected:?}",);
let reqs = convert_to_sort_exprs(&reqs);
assert_eq!(eq_properties.ordering_satisfy(reqs)?, expected, "{err_msg}");
}
Ok(())
}
#[test]
fn test_remove_redundant_entries_oeq_class() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let col_e = &col("e", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let test_cases = vec![
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
],
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_desc)],
vec![(col_a, option_asc)],
vec![(col_a, option_asc), (col_c, option_asc)],
],
vec![
vec![(col_a, option_asc), (col_b, option_desc)],
vec![(col_a, option_asc), (col_c, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![(col_a, option_asc)],
],
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
),
(
vec![],
vec![],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_b, option_asc)],
],
vec![
vec![(col_a, option_asc)],
vec![(col_b, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc), (col_a, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
vec![
vec![(col_b, option_asc), (col_a, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![(col_d, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc), (col_e, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_asc),
(col_c, option_asc),
(col_a, option_asc),
],
],
vec![
vec![(col_b, option_asc), (col_e, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![(col_d, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![
(col_d, option_asc),
(col_a, option_asc),
(col_b, option_asc),
],
],
vec![
vec![(col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![(col_d, option_asc)],
],
),
];
for (orderings, expected) in test_cases {
let orderings = convert_to_orderings(&orderings);
let expected = convert_to_orderings(&expected);
let actual = OrderingEquivalenceClass::from(orderings.clone());
let err_msg = format!(
"orderings: {orderings:?}, expected: {expected:?}, actual :{actual:?}"
);
assert_eq!(actual.len(), expected.len(), "{err_msg}");
for elem in actual {
assert!(expected.contains(&elem), "{}", err_msg);
}
}
Ok(())
}
}