use std::hash::{Hash, Hasher};
use std::sync::Arc;
use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::const_exprs_contains;
use crate::equivalence::{
collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
};
use crate::expressions::Literal;
use crate::{
physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef, LexRequirement,
LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
PhysicalSortRequirement,
};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, JoinSide, JoinType, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::expressions::column::Column;
use datafusion_physical_expr_common::expressions::CastExpr;
use datafusion_physical_expr_common::physical_expr::with_new_schema;
use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
pub eq_group: EquivalenceGroup,
pub oeq_class: OrderingEquivalenceClass,
pub constants: Vec<ConstExpr>,
schema: SchemaRef,
}
impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
}
pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
constants: vec![],
schema,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
&self.oeq_class
}
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
}
pub fn constants(&self) -> &[ConstExpr] {
&self.constants
}
pub fn output_ordering(&self) -> Option<LexOrdering> {
let constants = self.constants();
let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
output_ordering
.retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
(!output_ordering.is_empty()).then_some(output_ordering)
}
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
)
}
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
self.oeq_class.extend(other.oeq_class);
self.add_constants(other.constants)
}
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
}
pub fn clear_per_partition_constants(&mut self) {
self.constants.retain(|item| item.across_partitions());
}
pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
self.oeq_class.extend(other);
}
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
self.oeq_class.add_new_orderings(orderings);
}
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
}
pub fn add_equal_conditions(
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) -> Result<()> {
if self.is_expr_constant(left) {
if !const_exprs_contains(&self.constants, right) {
self.constants
.push(ConstExpr::from(right).with_across_partitions(true));
}
} else if self.is_expr_constant(right) {
if !const_exprs_contains(&self.constants, left) {
self.constants
.push(ConstExpr::from(left).with_across_partitions(true));
}
}
self.eq_group.add_equal_conditions(left, right);
self.discover_new_orderings(left)?;
Ok(())
}
pub fn add_constants(
mut self,
constants: impl IntoIterator<Item = ConstExpr>,
) -> Self {
let (const_exprs, across_partition_flags): (
Vec<Arc<dyn PhysicalExpr>>,
Vec<bool>,
) = constants
.into_iter()
.map(|const_expr| {
let across_partitions = const_expr.across_partitions();
let expr = const_expr.owned_expr();
(expr, across_partitions)
})
.unzip();
for (expr, across_partitions) in self
.eq_group
.normalize_exprs(const_exprs)
.into_iter()
.zip(across_partition_flags)
{
if !const_exprs_contains(&self.constants, &expr) {
let const_expr =
ConstExpr::from(expr).with_across_partitions(across_partitions);
self.constants.push(const_expr);
}
}
for ordering in self.normalized_oeq_class().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
}
self
}
fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
let eq_class = self
.eq_group
.classes
.iter()
.find_map(|class| {
class
.contains(&normalized_expr)
.then(|| class.clone().into_vec())
})
.unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
let mut new_orderings: Vec<LexOrdering> = vec![];
for (ordering, next_expr) in self
.normalized_oeq_class()
.iter()
.filter(|ordering| ordering[0].expr.eq(&normalized_expr))
.filter_map(|ordering| Some(ordering).zip(ordering.get(1)))
{
let leading_ordering = ordering[0].options;
for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
== equivalent_expr
.get_properties(&[ExprProperties {
sort_properties: SortProperties::Ordered(
leading_ordering,
),
range: Interval::make_unbounded(
&equivalent_expr.data_type(&self.schema)?,
)?,
}])?
.sort_properties
{
new_orderings.push(ordering[1..].to_vec());
break;
}
}
}
self.oeq_class.add_new_orderings(new_orderings);
Ok(())
}
pub fn with_reorder(mut self, sort_exprs: Vec<PhysicalSortExpr>) -> Self {
self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
self
}
fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
}
fn normalize_sort_requirements(
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
let mut constant_exprs = vec![];
constant_exprs.extend(
self.constants
.iter()
.map(|const_expr| Arc::clone(const_expr.expr())),
);
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
}
pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
self.ordering_satisfy_requirement(&sort_requirements)
}
pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool {
let mut eq_properties = self.clone();
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
for normalized_req in normalized_reqs {
if !eq_properties.ordering_satisfy_single(&normalized_req) {
return false;
}
eq_properties = eq_properties
.add_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
}
true
}
fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
let ExprProperties {
sort_properties, ..
} = self.get_expr_properties(Arc::clone(&req.expr));
match sort_properties {
SortProperties::Ordered(options) => {
let sort_expr = PhysicalSortExpr {
expr: Arc::clone(&req.expr),
options,
};
sort_expr.satisfy(req, self.schema())
}
SortProperties::Singleton => true,
SortProperties::Unordered => false,
}
}
pub fn requirements_compatible(
&self,
given: LexRequirementRef,
reference: LexRequirementRef,
) -> bool {
let normalized_given = self.normalize_sort_requirements(given);
let normalized_reference = self.normalize_sort_requirements(reference);
(normalized_reference.len() <= normalized_given.len())
&& normalized_reference
.into_iter()
.zip(normalized_given)
.all(|(reference, given)| given.compatible(&reference))
}
pub fn get_finer_ordering(
&self,
lhs: LexOrderingRef,
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let finer = self.get_finer_requirement(&lhs, &rhs);
finer.map(PhysicalSortRequirement::to_sort_exprs)
}
pub fn get_finer_requirement(
&self,
req1: LexRequirementRef,
req2: LexRequirementRef,
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.iter_mut()
.zip(rhs.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
(Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
(Some(options), None) => {
rhs.options = Some(options);
true
}
(None, Some(options)) => {
lhs.options = Some(options);
true
}
(None, None) => true,
}
})
.then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
}
pub fn substitute_ordering_component(
&self,
mapping: &ProjectionMapping,
sort_expr: &[PhysicalSortExpr],
) -> Result<Vec<Vec<PhysicalSortExpr>>> {
let new_orderings = sort_expr
.iter()
.map(|sort_expr| {
let referring_exprs: Vec<_> = mapping
.iter()
.map(|(source, _target)| source)
.filter(|source| expr_refers(source, &sort_expr.expr))
.cloned()
.collect();
let mut res = vec![sort_expr.clone()];
for r_expr in referring_exprs {
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
let expr_type = sort_expr.expr.data_type(&self.schema)?;
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
res.push(PhysicalSortExpr {
expr: Arc::clone(&r_expr),
options: sort_expr.options,
});
}
}
}
Ok(res)
})
.collect::<Result<Vec<_>>>()?;
let res = new_orderings
.into_iter()
.multi_cartesian_product()
.collect::<Vec<_>>();
Ok(res)
}
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let orderings = &self.oeq_class.orderings;
let new_order = orderings
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
Ok(())
}
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
projection_mapping: &ProjectionMapping,
) -> Option<Arc<dyn PhysicalExpr>> {
self.eq_group.project_expr(projection_mapping, expr)
}
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = IndexMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
self.project_expr(&sort_expr.expr, mapping).map(|expr| {
PhysicalSortExpr {
expr,
options: sort_expr.options,
}
});
let is_projected = target_sort_expr.is_some();
if is_projected
|| mapping
.iter()
.any(|(source, _)| expr_refers(source, &sort_expr.expr))
{
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
dependency_map
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: IndexSet::new(),
})
.insert_dependency(dependency);
}
if !is_projected {
break;
}
}
}
dependency_map
}
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
ProjectionMapping {
map: mapping
.iter()
.map(|(source, target)| {
let normalized_source =
self.eq_group.normalize_expr(Arc::clone(source));
(normalized_source, Arc::clone(target))
})
.collect(),
}
}
fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
let mapping = self.normalized_mapping(mapping);
let dependency_map = self.construct_dependency_map(&mapping);
let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|relevant_deps| {
if let Ok(SortProperties::Ordered(options)) =
get_expr_properties(source, &relevant_deps, &self.schema)
.map(|prop| prop.sort_properties)
{
Some((options, relevant_deps))
} else {
None
}
})
.flat_map(|(options, relevant_deps)| {
let sort_expr = PhysicalSortExpr {
expr: Arc::clone(target),
options,
};
let mut dependency_orderings =
generate_dependency_orderings(&relevant_deps, &dependency_map);
for ordering in dependency_orderings.iter_mut() {
ordering.push(sort_expr.clone());
}
dependency_orderings
})
});
let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
if prefixes.is_empty() {
prefixes = vec![vec![]];
}
for ordering in prefixes.iter_mut() {
if let Some(target) = &node.target_sort_expr {
ordering.push(target.clone())
}
}
prefixes
});
orderings
.chain(projected_orderings)
.map(collapse_lex_ordering)
.collect()
}
fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> {
let mut projected_constants = self
.constants
.iter()
.flat_map(|const_expr| {
const_expr.map(|expr| self.eq_group.project_expr(mapping, expr))
})
.collect::<Vec<_>>();
for (source, target) in mapping.iter() {
if self.is_expr_constant(source)
&& !const_exprs_contains(&projected_constants, target)
{
projected_constants
.push(ConstExpr::from(target).with_across_partitions(true));
}
}
projected_constants
}
pub fn project(
&self,
projection_mapping: &ProjectionMapping,
output_schema: SchemaRef,
) -> Self {
let projected_constants = self.projected_constants(projection_mapping);
let projected_eq_group = self.eq_group.project(projection_mapping);
let projected_orderings = self.projected_orderings(projection_mapping);
Self {
eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
constants: projected_constants,
schema: output_schema,
}
}
pub fn find_longest_permutation(
&self,
exprs: &[Arc<dyn PhysicalExpr>],
) -> (LexOrdering, Vec<usize>) {
let mut eq_properties = self.clone();
let mut result = vec![];
let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
for _idx in 0..exprs.len() {
let ordered_exprs = search_indices
.iter()
.flat_map(|&idx| {
let ExprProperties {
sort_properties, ..
} = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
match sort_properties {
SortProperties::Ordered(options) => Some((
PhysicalSortExpr {
expr: Arc::clone(&exprs[idx]),
options,
},
idx,
)),
SortProperties::Singleton => {
let options = SortOptions::default();
Some((
PhysicalSortExpr {
expr: Arc::clone(&exprs[idx]),
options,
},
idx,
))
}
SortProperties::Unordered => None,
}
})
.collect::<Vec<_>>();
if ordered_exprs.is_empty() {
break;
}
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties =
eq_properties.add_constants(std::iter::once(ConstExpr::from(expr)));
search_indices.shift_remove(idx);
}
result.extend(ordered_exprs);
}
result.into_iter().unzip()
}
pub fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
let const_exprs = self
.constants
.iter()
.map(|const_expr| Arc::clone(const_expr.expr()));
let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
is_constant_recurse(&normalized_constants, &normalized_expr)
}
pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
ExprPropertiesNode::new_unknown(expr)
.transform_up(|expr| update_properties(expr, self))
.data()
.map(|node| node.data)
.unwrap_or(ExprProperties::new_unknown())
}
pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
&& self
.schema
.fields
.iter()
.zip(schema.fields.iter())
.all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
if !schemas_aligned {
return plan_err!(
"Cannot rewrite old_schema:{:?} with new schema: {:?}",
self.schema,
schema
);
}
let new_constants = self
.constants
.into_iter()
.map(|const_expr| {
let across_partitions = const_expr.across_partitions();
let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?;
Ok(ConstExpr::new(new_const_expr)
.with_across_partitions(across_partitions))
})
.collect::<Result<Vec<_>>>()?;
let mut new_orderings = vec![];
for ordering in self.oeq_class.orderings {
let new_ordering = ordering
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
Ok(sort_expr)
})
.collect::<Result<_>>()?;
new_orderings.push(new_ordering);
}
let mut eq_classes = vec![];
for eq_class in self.eq_group.classes {
let new_eq_exprs = eq_class
.into_vec()
.into_iter()
.map(|expr| with_new_schema(expr, &schema))
.collect::<Result<_>>()?;
eq_classes.push(EquivalenceClass::new(new_eq_exprs));
}
let mut result = EquivalenceProperties::new(schema);
result.constants = new_constants;
result.add_new_orderings(new_orderings);
result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
Ok(result)
}
}
fn update_properties(
mut node: ExprPropertiesNode,
eq_properties: &EquivalenceProperties,
) -> Result<Transformed<ExprPropertiesNode>> {
if !node.expr.children().is_empty() {
let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
node.data = node.expr.get_properties(&children_props)?;
} else if node.expr.as_any().is::<Literal>() {
node.data = node.expr.get_properties(&[])?;
} else if node.expr.as_any().is::<Column>() {
node.data.range =
Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
}
let normalized_expr = eq_properties
.eq_group
.normalize_expr(Arc::clone(&node.expr));
if eq_properties.is_expr_constant(&normalized_expr) {
node.data.sort_properties = SortProperties::Singleton;
} else if let Some(options) = eq_properties
.normalized_oeq_class()
.get_options(&normalized_expr)
{
node.data.sort_properties = SortProperties::Ordered(options);
}
Ok(Transformed::yes(node))
}
fn is_constant_recurse(
constants: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
if physical_exprs_contains(constants, expr) || expr.as_any().is::<Literal>() {
return true;
}
let children = expr.children();
!children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
}
fn expr_refers(
referring_expr: &Arc<dyn PhysicalExpr>,
referred_expr: &Arc<dyn PhysicalExpr>,
) -> bool {
referring_expr.eq(referred_expr)
|| referring_expr
.children()
.iter()
.any(|child| expr_refers(child, referred_expr))
}
fn referred_dependencies(
dependency_map: &DependencyMap,
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
expr_to_sort_exprs
.entry(key)
.or_default()
.insert(sort_expr.clone());
}
expr_to_sort_exprs
.values()
.multi_cartesian_product()
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
.collect()
}
fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
dependency_map[relevant_sort_expr]
.dependencies
.iter()
.flat_map(|dep| construct_orderings(dep, dependency_map))
.collect()
}
fn generate_dependency_orderings(
dependencies: &Dependencies,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let relevant_prefixes = dependencies
.iter()
.flat_map(|dep| {
let prefixes = construct_prefix_orderings(dep, dependency_map);
(!prefixes.is_empty()).then_some(prefixes)
})
.collect::<Vec<_>>();
if relevant_prefixes.is_empty() {
return vec![vec![]];
}
relevant_prefixes
.into_iter()
.multi_cartesian_product()
.flat_map(|prefix_orderings| {
prefix_orderings
.iter()
.permutations(prefix_orderings.len())
.map(|prefixes| prefixes.into_iter().flatten().cloned().collect())
.collect::<Vec<_>>()
})
.collect()
}
fn get_expr_properties(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
schema: &SchemaRef,
) -> Result<ExprProperties> {
if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
Ok(ExprProperties {
sort_properties: SortProperties::Ordered(column_order.options),
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
})
} else if expr.as_any().downcast_ref::<Column>().is_some() {
Ok(ExprProperties {
sort_properties: SortProperties::Unordered,
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
})
} else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
Ok(ExprProperties {
sort_properties: SortProperties::Singleton,
range: Interval::try_new(literal.value().clone(), literal.value().clone())?,
})
} else {
let child_states = expr
.children()
.iter()
.map(|child| get_expr_properties(child, dependencies, schema))
.collect::<Result<Vec<_>>>()?;
expr.get_properties(&child_states)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DependencyNode {
target_sort_expr: Option<PhysicalSortExpr>,
dependencies: Dependencies,
}
impl DependencyNode {
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
}
}
}
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;
fn construct_orderings(
referred_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let node = &dependency_map[referred_sort_expr];
let target_sort_expr = node.target_sort_expr.clone().unwrap();
if node.dependencies.is_empty() {
vec![vec![target_sort_expr]]
} else {
node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = construct_orderings(dep, dependency_map);
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
orderings
})
.collect()
}
}
pub fn join_equivalence_properties(
left: EquivalenceProperties,
right: EquivalenceProperties,
join_type: &JoinType,
join_schema: SchemaRef,
maintains_input_order: &[bool],
probe_side: Option<JoinSide>,
on: &[(PhysicalExprRef, PhysicalExprRef)],
) -> EquivalenceProperties {
let left_size = left.schema.fields.len();
let mut result = EquivalenceProperties::new(join_schema);
result.add_equivalence_group(left.eq_group().join(
right.eq_group(),
join_type,
left_size,
on,
));
let EquivalenceProperties {
constants: left_constants,
oeq_class: left_oeq_class,
..
} = left;
let EquivalenceProperties {
constants: right_constants,
oeq_class: mut right_oeq_class,
..
} = right;
match maintains_input_order {
[true, false] => {
if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, join_type) {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
let out_oeq_class = left_oeq_class.join_suffix(&right_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(left_oeq_class);
}
}
[false, true] => {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
if let (Some(JoinSide::Right), JoinType::Inner) = (probe_side, join_type) {
let out_oeq_class = right_oeq_class.join_suffix(&left_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(right_oeq_class);
}
}
[false, false] => {}
[true, true] => unreachable!("Cannot maintain ordering of both sides"),
_ => unreachable!("Join operators can not have more than two children"),
}
match join_type {
JoinType::LeftAnti | JoinType::LeftSemi => {
result = result.add_constants(left_constants);
}
JoinType::RightAnti | JoinType::RightSemi => {
result = result.add_constants(right_constants);
}
_ => {}
}
result
}
fn updated_right_ordering_equivalence_class(
right_oeq_class: &mut OrderingEquivalenceClass,
join_type: &JoinType,
left_size: usize,
) {
if matches!(
join_type,
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right
) {
right_oeq_class.add_offset(left_size);
}
}
#[derive(Debug, Clone)]
struct ExprWrapper(Arc<dyn PhysicalExpr>);
impl PartialEq<Self> for ExprWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl Eq for ExprWrapper {}
impl Hash for ExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
fn calculate_union_binary(
lhs: EquivalenceProperties,
mut rhs: EquivalenceProperties,
) -> Result<EquivalenceProperties> {
if !rhs.schema.eq(&lhs.schema) {
rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?;
}
let constants = lhs
.constants()
.iter()
.filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr()))
.map(|const_expr| {
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
})
.collect();
let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().orderings {
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().orderings {
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
if !ordering.is_empty() {
orderings.push(ordering);
}
}
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.constants = constants;
eq_properties.add_new_orderings(orderings);
Ok(eq_properties)
}
pub fn calculate_union(
eqps: Vec<EquivalenceProperties>,
schema: SchemaRef,
) -> Result<EquivalenceProperties> {
let mut init = eqps[0].clone();
if !init.schema.eq(&schema) {
init = init.with_new_schema(schema)?;
}
eqps.into_iter()
.skip(1)
.try_fold(init, calculate_union_binary)
}
#[cfg(test)]
mod tests {
use std::ops::Not;
use super::*;
use crate::equivalence::add_offset_to_expr;
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
create_random_schema, create_test_params, create_test_schema,
generate_table_for_eq_properties, is_table_same_after_sort, output_schema,
};
use crate::expressions::{col, BinaryExpr, Column};
use crate::utils::tests::TestScalarUDF;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{Fields, TimeUnit};
use datafusion_common::DFSchema;
use datafusion_expr::{Operator, ScalarUDF};
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));
let input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
let col_a = col("a", &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_schema = output_schema(&projection_mapping, &input_schema)?;
let proj_exprs = vec![
(Arc::clone(&col_a), "a1".to_string()),
(Arc::clone(&col_a), "a2".to_string()),
(Arc::clone(&col_a), "a3".to_string()),
(Arc::clone(&col_a), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let col_a1 = &col("a1", &out_schema)?;
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let out_properties = input_properties.project(&projection_mapping, out_schema);
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = &out_properties.eq_group().classes[0];
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
assert!(eq_class.contains(col_a3));
assert!(eq_class.contains(col_a4));
Ok(())
}
#[test]
fn test_join_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let offset = schema.fields.len();
let col_a2 = &add_offset_to_expr(Arc::clone(col_a), offset);
let col_b2 = &add_offset_to_expr(Arc::clone(col_b), offset);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let test_cases = vec![
(
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc)],
vec![(col_b, option_asc)],
vec![(col_c, option_asc)],
],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
vec![(col_c, option_asc), (col_a2, option_asc)],
vec![(col_c, option_asc), (col_b2, option_asc)],
],
),
];
for (left_orderings, right_orderings, expected) in test_cases {
let mut left_eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let mut right_eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let left_orderings = convert_to_orderings(&left_orderings);
let right_orderings = convert_to_orderings(&right_orderings);
let expected = convert_to_orderings(&expected);
left_eq_properties.add_new_orderings(left_orderings);
right_eq_properties.add_new_orderings(right_orderings);
let join_eq = join_equivalence_properties(
left_eq_properties,
right_eq_properties,
&JoinType::Inner,
Arc::new(Schema::empty()),
&[true, false],
Some(JoinSide::Left),
&[],
);
let orderings = &join_eq.oeq_class.orderings;
let err_msg = format!("expected: {:?}, actual:{:?}", expected, orderings);
assert_eq!(
join_eq.oeq_class.orderings.len(),
expected.len(),
"{}",
err_msg
);
for ordering in orderings {
assert!(
expected.contains(ordering),
"{}, ordering: {:?}",
err_msg,
ordering
);
}
}
Ok(())
}
#[test]
fn test_expr_consists_of_constants() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_d = col("d", &schema)?;
let b_plus_d = Arc::new(BinaryExpr::new(
Arc::clone(&col_b),
Operator::Plus,
Arc::clone(&col_d),
)) as Arc<dyn PhysicalExpr>;
let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b)];
let expr = Arc::clone(&b_plus_d);
assert!(!is_constant_recurse(&constants, &expr));
let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_d)];
let expr = Arc::clone(&b_plus_d);
assert!(is_constant_recurse(&constants, &expr));
Ok(())
}
#[test]
fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> {
let join_type = JoinType::Inner;
let child_fields: Fields = ["x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let child_schema = Schema::new(child_fields);
let col_x = &col("x", &child_schema)?;
let col_y = &col("y", &child_schema)?;
let col_z = &col("z", &child_schema)?;
let col_w = &col("w", &child_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let mut right_oeq_class = OrderingEquivalenceClass::new(orderings);
let left_columns_len = 4;
let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let schema = Schema::new(fields);
let col_a = &col("a", &schema)?;
let col_d = &col("d", &schema)?;
let col_x = &col("x", &schema)?;
let col_y = &col("y", &schema)?;
let col_z = &col("z", &schema)?;
let col_w = &col("w", &schema)?;
let mut join_eq_properties = EquivalenceProperties::new(Arc::new(schema));
join_eq_properties.add_equal_conditions(col_a, col_x)?;
join_eq_properties.add_equal_conditions(col_d, col_w)?;
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
&join_type,
left_columns_len,
);
join_eq_properties.add_ordering_equivalence_class(right_oeq_class);
let result = join_eq_properties.oeq_class().clone();
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let expected = OrderingEquivalenceClass::new(orderings);
assert_eq!(result, expected);
Ok(())
}
#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let sort_options = SortOptions::default();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr)?;
let others = vec![
vec![PhysicalSortExpr {
expr: Arc::clone(&col_b_expr),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: Arc::clone(&col_c_expr),
options: sort_options,
}],
];
eq_properties.add_new_orderings(others);
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_new_orderings([
vec![PhysicalSortExpr {
expr: Arc::clone(&col_b_expr),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: Arc::clone(&col_c_expr),
options: sort_options,
}],
]);
let oeq_class = eq_properties.oeq_class().clone();
let expected = expected_eqs.oeq_class();
assert!(oeq_class.eq(expected));
Ok(())
}
#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [Arc::clone(col_b), Arc::clone(col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: Arc::clone(col_b),
options: sort_options_not
},
PhysicalSortExpr {
expr: Arc::clone(col_a),
options: sort_options
}
]
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [Arc::clone(col_b), Arc::clone(col_a)];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
}],
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
],
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: Arc::clone(col_b),
options: sort_options_not
},
PhysicalSortExpr {
expr: Arc::clone(col_a),
options: sort_options
}
]
);
let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0]);
Ok(())
}
#[test]
fn test_update_properties() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
eq_properties.add_equal_conditions(col_b, col_a)?;
eq_properties.add_new_orderings(vec![
vec![PhysicalSortExpr {
expr: Arc::clone(col_b),
options: option_asc,
}],
vec![PhysicalSortExpr {
expr: Arc::clone(col_d),
options: option_asc,
}],
]);
let test_cases = vec![
(
Arc::new(BinaryExpr::new(
Arc::clone(col_d),
Operator::Plus,
Arc::clone(col_b),
)) as Arc<dyn PhysicalExpr>,
SortProperties::Ordered(option_asc),
),
(Arc::clone(col_b), SortProperties::Ordered(option_asc)),
(Arc::clone(col_a), SortProperties::Ordered(option_asc)),
(
Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_c),
)),
SortProperties::Unordered,
),
];
for (expr, expected) in test_cases {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
expr, expected, expr_props.sort_properties
);
assert_eq!(expr_props.sort_properties, expected, "{}", err_msg);
}
Ok(())
}
#[test]
fn test_find_longest_permutation_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 100;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
let floor_a = crate::udf::create_physical_expr(
&test_fun,
&[col("a", &test_schema)?],
&test_schema,
&[],
&DFSchema::empty(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let exprs = [
col("a", &test_schema)?,
col("b", &test_schema)?,
col("c", &test_schema)?,
col("d", &test_schema)?,
col("e", &test_schema)?,
col("f", &test_schema)?,
floor_a,
a_plus_b,
];
for n_req in 0..=exprs.len() {
for exprs in exprs.iter().combinations(n_req) {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let (ordering, indices) =
eq_properties.find_longest_permutation(&exprs);
let ordering2 = indices
.iter()
.zip(ordering.iter())
.map(|(&idx, sort_expr)| PhysicalSortExpr {
expr: Arc::clone(&exprs[idx]),
options: sort_expr.options,
})
.collect::<Vec<_>>();
assert_eq!(
ordering, ordering2,
"indices and lexicographical ordering do not match"
);
let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
);
assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
assert!(
is_table_same_after_sort(
ordering.clone(),
table_data_with_properties.clone(),
)?,
"{}",
err_msg
);
}
}
}
Ok(())
}
#[test]
fn test_find_longest_permutation() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_h = &col("h", &test_schema)?;
let a_plus_d = Arc::new(BinaryExpr::new(
Arc::clone(col_a),
Operator::Plus,
Arc::clone(col_d),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::clone(col_d),
options: option_asc,
},
PhysicalSortExpr {
expr: Arc::clone(col_h),
options: option_desc,
},
]]);
let test_cases = vec![
(vec![col_a], vec![(col_a, option_asc)]),
(vec![col_c], vec![(col_c, option_asc)]),
(
vec![col_d, col_e, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
),
(vec![col_b], vec![]),
(vec![col_d], vec![(col_d, option_asc)]),
(vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
(
vec![col_b, col_d],
vec![(col_d, option_asc), (col_b, option_asc)],
),
(
vec![col_c, col_e],
vec![(col_c, option_asc), (col_e, option_desc)],
),
(
vec![col_d, col_h, col_e, col_f, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_h, col_f, col_b],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_h, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
),
(
vec![col_e, col_d, col_b, col_h, col_f],
vec![
(col_e, option_desc),
(col_d, option_asc),
(col_b, option_asc),
(col_h, option_desc),
(col_f, option_asc),
],
),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_find_longest_permutation2() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_h = &col("h", &test_schema)?;
eq_properties = eq_properties.add_constants(vec![ConstExpr::from(col_h)]);
let test_cases = vec![
(vec![col_h], vec![(col_h, SortOptions::default())]),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_get_finer() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, None), (col_b, Some(option_asc))],
Some(vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))]),
),
(
vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
],
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
Some(vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
]),
),
(
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_a, Some(option_asc)), (col_b, Some(option_desc))],
None,
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_reqs(&lhs);
let rhs = convert_to_sort_reqs(&rhs);
let expected = expected.map(|expected| convert_to_sort_reqs(&expected));
let finer = eq_properties.get_finer_requirement(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_normalize_sort_reqs() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let requirements = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(
vec![(col_a, Some(option_desc))],
vec![(col_a, Some(option_desc))],
),
(vec![(col_a, None)], vec![(col_a, None)]),
(
vec![(col_c, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(vec![(col_c, None)], vec![(col_a, None)]),
(
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
),
(
vec![(col_d, None), (col_b, None)],
vec![(col_d, None), (col_b, None)],
),
(
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
),
(
vec![(col_e, Some(option_desc)), (col_f, None)],
vec![(col_e, Some(option_desc)), (col_f, None)],
),
(
vec![(col_e, None), (col_f, None)],
vec![(col_e, None), (col_f, None)],
),
];
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_sort_reqs(&reqs);
let expected_normalized = convert_to_sort_reqs(&expected_normalized);
assert_eq!(
eq_properties.normalize_sort_requirements(&req),
expected_normalized
);
}
Ok(())
}
#[test]
fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let test_cases = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, None)], vec![(col_a, None)]),
(vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
];
for (reqs, expected) in test_cases.into_iter() {
let reqs = convert_to_sort_reqs(&reqs);
let expected = convert_to_sort_reqs(&expected);
let normalized = eq_properties.normalize_sort_requirements(&reqs);
assert!(
expected.eq(&normalized),
"error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
);
}
Ok(())
}
#[test]
fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let base_properties = EquivalenceProperties::new(Arc::clone(&schema))
.with_reorder(
["a", "b", "c"]
.into_iter()
.map(|c| {
col(c, schema.as_ref()).map(|expr| PhysicalSortExpr {
expr,
options: SortOptions {
descending: false,
nulls_first: true,
},
})
})
.collect::<Result<Vec<_>>>()?,
);
struct TestCase {
name: &'static str,
constants: Vec<Arc<dyn PhysicalExpr>>,
equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
sort_columns: &'static [&'static str],
should_satisfy_ordering: bool,
}
let col_a = col("a", schema.as_ref())?;
let col_b = col("b", schema.as_ref())?;
let col_c = col("c", schema.as_ref())?;
let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None));
let cases = vec![
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![Arc::clone(&col_b)],
equal_conditions: vec![[
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
Arc::clone(&col_a),
]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "(a, b, c) -> (c)",
constants: vec![col_b],
equal_conditions: vec![[
Arc::clone(&col_a),
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
]],
sort_columns: &["c"],
should_satisfy_ordering: true,
},
TestCase {
name: "not ordered because (b) is not constant",
constants: vec![],
equal_conditions: vec![[
Arc::clone(&cast_c) as Arc<dyn PhysicalExpr>,
Arc::clone(&col_a),
]],
sort_columns: &["c"],
should_satisfy_ordering: false,
},
];
for case in cases {
for properties in [
{
let mut properties = base_properties.clone();
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties.add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)
},
{
let mut properties = base_properties.clone().add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
);
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties
},
] {
let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
})
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(
properties.ordering_satisfy(&sort),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
}
}
Ok(())
}
fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef {
Arc::new(Schema::new(
schema
.fields()
.iter()
.map(|field| {
Field::new(
format!("{}{}", field.name(), text),
field.data_type().clone(),
field.is_nullable(),
)
})
.collect::<Vec<_>>(),
))
}
#[tokio::test]
async fn test_union_equivalence_properties_multi_children() -> Result<()> {
let schema = create_test_schema()?;
let schema2 = append_fields(&schema, "1");
let schema3 = append_fields(&schema, "2");
let test_cases = vec![
(
vec![
(
vec![vec!["a", "b", "c"]],
Arc::clone(&schema),
),
(
vec![vec!["a1", "b1", "c1"]],
Arc::clone(&schema2),
),
(
vec![vec!["a2", "b2"]],
Arc::clone(&schema3),
),
],
vec![vec!["a", "b"]],
),
(
vec![
(
vec![vec!["a", "b", "c"]],
Arc::clone(&schema),
),
(
vec![vec!["a1", "b1", "c1"]],
Arc::clone(&schema2),
),
(
vec![vec!["a2", "b2", "c2"]],
Arc::clone(&schema3),
),
],
vec![vec!["a", "b", "c"]],
),
(
vec![
(
vec![vec!["a", "b"]],
Arc::clone(&schema),
),
(
vec![vec!["a1", "b1", "c1"]],
Arc::clone(&schema2),
),
(
vec![vec!["a2", "b2", "c2"]],
Arc::clone(&schema3),
),
],
vec![vec!["a", "b"]],
),
(
vec![
(
vec![vec!["a", "b"]],
Arc::clone(&schema),
),
(
vec![vec!["a1", "b1"]],
Arc::clone(&schema2),
),
(
vec![vec!["b2", "c2"]],
Arc::clone(&schema3),
),
],
vec![],
),
(
vec![
(
vec![vec!["a", "b"], vec!["c"]],
Arc::clone(&schema),
),
(
vec![vec!["a1", "b1"], vec!["c1"]],
Arc::clone(&schema2),
),
],
vec![vec!["a", "b"], vec!["c"]],
),
];
for (children, expected) in test_cases {
let children_eqs = children
.iter()
.map(|(orderings, schema)| {
let orderings = orderings
.iter()
.map(|ordering| {
ordering
.iter()
.map(|name| PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options: SortOptions::default(),
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
EquivalenceProperties::new_with_orderings(
Arc::clone(schema),
&orderings,
)
})
.collect::<Vec<_>>();
let actual = calculate_union(children_eqs, Arc::clone(&schema))?;
let expected_ordering = expected
.into_iter()
.map(|ordering| {
ordering
.into_iter()
.map(|name| PhysicalSortExpr {
expr: col(name, &schema).unwrap(),
options: SortOptions::default(),
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let expected = EquivalenceProperties::new_with_orderings(
Arc::clone(&schema),
&expected_ordering,
);
assert_eq_properties_same(
&actual,
&expected,
format!("expected: {:?}, actual: {:?}", expected, actual),
);
}
Ok(())
}
#[tokio::test]
async fn test_union_equivalence_properties_binary() -> Result<()> {
let schema = create_test_schema()?;
let schema2 = append_fields(&schema, "1");
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_a1 = &col("a1", &schema2)?;
let col_b1 = &col("b1", &schema2)?;
let options = SortOptions::default();
let options_desc = !SortOptions::default();
let test_cases = [
(
(
vec![
(vec![(col_a, options)]),
],
vec![col_b, col_c],
Arc::clone(&schema),
),
(
vec![
(vec![(col_b, options)]),
],
vec![col_a, col_c],
Arc::clone(&schema),
),
(
vec![
vec![(col_a, options)],
vec![(col_b, options)],
],
vec![col_c],
),
),
(
(
vec![
vec![(col_a, options)],
],
vec![],
Arc::clone(&schema),
),
(
vec![
vec![(col_a, options), (col_b, options)],
],
vec![],
Arc::clone(&schema),
),
(
vec![
vec![(col_a, options)],
],
vec![],
),
),
(
(
vec![
vec![(col_a, options)],
],
vec![],
Arc::clone(&schema),
),
(
vec![
vec![(col_a, options_desc)],
],
vec![],
Arc::clone(&schema),
),
(
vec![],
vec![],
),
),
(
(
vec![
vec![(col_a, options)],
],
vec![],
Arc::clone(&schema),
),
(
vec![
vec![(col_a1, options), (col_b1, options)],
],
vec![],
Arc::clone(&schema2),
),
(
vec![
vec![(col_a, options)],
],
vec![],
),
),
];
for (
test_idx,
(
(first_child_orderings, first_child_constants, first_schema),
(second_child_orderings, second_child_constants, second_schema),
(union_orderings, union_constants),
),
) in test_cases.iter().enumerate()
{
let first_orderings = first_child_orderings
.iter()
.map(|ordering| convert_to_sort_exprs(ordering))
.collect::<Vec<_>>();
let first_constants = first_child_constants
.iter()
.map(|expr| ConstExpr::new(Arc::clone(expr)))
.collect::<Vec<_>>();
let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema));
lhs = lhs.add_constants(first_constants);
lhs.add_new_orderings(first_orderings);
let second_orderings = second_child_orderings
.iter()
.map(|ordering| convert_to_sort_exprs(ordering))
.collect::<Vec<_>>();
let second_constants = second_child_constants
.iter()
.map(|expr| ConstExpr::new(Arc::clone(expr)))
.collect::<Vec<_>>();
let mut rhs = EquivalenceProperties::new(Arc::clone(second_schema));
rhs = rhs.add_constants(second_constants);
rhs.add_new_orderings(second_orderings);
let union_expected_orderings = union_orderings
.iter()
.map(|ordering| convert_to_sort_exprs(ordering))
.collect::<Vec<_>>();
let union_constants = union_constants
.iter()
.map(|expr| ConstExpr::new(Arc::clone(expr)))
.collect::<Vec<_>>();
let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema));
union_expected_eq = union_expected_eq.add_constants(union_constants);
union_expected_eq.add_new_orderings(union_expected_orderings);
let actual_union_eq = calculate_union_binary(lhs, rhs)?;
let err_msg = format!(
"Error in test id: {:?}, test case: {:?}",
test_idx, test_cases[test_idx]
);
assert_eq_properties_same(&actual_union_eq, &union_expected_eq, err_msg);
}
Ok(())
}
fn assert_eq_properties_same(
lhs: &EquivalenceProperties,
rhs: &EquivalenceProperties,
err_msg: String,
) {
let lhs_constants = lhs.constants();
let rhs_constants = rhs.constants();
assert_eq!(lhs_constants.len(), rhs_constants.len(), "{}", err_msg);
for rhs_constant in rhs_constants {
assert!(
const_exprs_contains(lhs_constants, rhs_constant.expr()),
"{}",
err_msg
);
}
let lhs_orderings = lhs.oeq_class();
let rhs_orderings = &rhs.oeq_class.orderings;
assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
for rhs_ordering in rhs_orderings {
assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
}
}
}