use crate::expressions::Column;
use arrow_schema::SchemaRef;
use datafusion_common::{JoinSide, JoinType};
use indexmap::IndexSet;
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::equivalence::{
collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::Literal;
use crate::sort_properties::{ExprOrdering, SortProperties};
use crate::{
physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement,
LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use super::ordering::collapse_lex_ordering;
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
pub eq_group: EquivalenceGroup,
pub oeq_class: OrderingEquivalenceClass,
pub constants: Vec<Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
}
impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
}
pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
constants: vec![],
schema,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
&self.oeq_class
}
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
}
pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.constants
}
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
)
}
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
self.oeq_class.extend(other.oeq_class);
self.add_constants(other.constants)
}
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
}
pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
self.oeq_class.extend(other);
}
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
self.oeq_class.add_new_orderings(orderings);
}
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
}
pub fn add_equal_conditions(
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) {
self.eq_group.add_equal_conditions(left, right);
}
pub fn add_constants(
mut self,
constants: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
) -> Self {
for expr in self.eq_group.normalize_exprs(constants) {
if !physical_exprs_contains(&self.constants, &expr) {
self.constants.push(expr);
}
}
self
}
pub fn with_reorder(mut self, sort_exprs: Vec<PhysicalSortExpr>) -> Self {
self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
self
}
fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
}
fn normalize_sort_requirements(
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
let constants_normalized = self.eq_group.normalize_exprs(self.constants.clone());
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
}
pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
self.ordering_satisfy_requirement(&sort_requirements)
}
pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool {
let mut eq_properties = self.clone();
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
for normalized_req in normalized_reqs {
if !eq_properties.ordering_satisfy_single(&normalized_req) {
return false;
}
eq_properties =
eq_properties.add_constants(std::iter::once(normalized_req.expr));
}
true
}
fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
let expr_ordering = self.get_expr_ordering(req.expr.clone());
let ExprOrdering { expr, state, .. } = expr_ordering;
match state {
SortProperties::Ordered(options) => {
let sort_expr = PhysicalSortExpr { expr, options };
sort_expr.satisfy(req, self.schema())
}
SortProperties::Singleton => true,
SortProperties::Unordered => false,
}
}
pub fn requirements_compatible(
&self,
given: LexRequirementRef,
reference: LexRequirementRef,
) -> bool {
let normalized_given = self.normalize_sort_requirements(given);
let normalized_reference = self.normalize_sort_requirements(reference);
(normalized_reference.len() <= normalized_given.len())
&& normalized_reference
.into_iter()
.zip(normalized_given)
.all(|(reference, given)| given.compatible(&reference))
}
pub fn get_finer_ordering(
&self,
lhs: LexOrderingRef,
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let finer = self.get_finer_requirement(&lhs, &rhs);
finer.map(PhysicalSortRequirement::to_sort_exprs)
}
pub fn get_finer_requirement(
&self,
req1: LexRequirementRef,
req2: LexRequirementRef,
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.iter_mut()
.zip(rhs.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
(Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
(Some(options), None) => {
rhs.options = Some(options);
true
}
(None, Some(options)) => {
lhs.options = Some(options);
true
}
(None, None) => true,
}
})
.then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
}
pub fn get_meet_ordering(
&self,
lhs: LexOrderingRef,
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
let lhs = self.normalize_sort_exprs(lhs);
let rhs = self.normalize_sort_exprs(rhs);
let mut meet = vec![];
for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) {
if lhs.eq(&rhs) {
meet.push(lhs);
} else {
break;
}
}
(!meet.is_empty()).then_some(meet)
}
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
projection_mapping: &ProjectionMapping,
) -> Option<Arc<dyn PhysicalExpr>> {
self.eq_group.project_expr(projection_mapping, expr)
}
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = HashMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
self.project_expr(&sort_expr.expr, mapping).map(|expr| {
PhysicalSortExpr {
expr,
options: sort_expr.options,
}
});
let is_projected = target_sort_expr.is_some();
if is_projected
|| mapping
.iter()
.any(|(source, _)| expr_refers(source, &sort_expr.expr))
{
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
dependency_map
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: HashSet::new(),
})
.insert_dependency(dependency);
}
if !is_projected {
break;
}
}
}
dependency_map
}
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
ProjectionMapping {
map: mapping
.iter()
.map(|(source, target)| {
let normalized_source = self.eq_group.normalize_expr(source.clone());
(normalized_source, target.clone())
})
.collect(),
}
}
fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
let mapping = self.normalized_mapping(mapping);
let dependency_map = self.construct_dependency_map(&mapping);
let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|relevant_deps| {
if let SortProperties::Ordered(options) =
get_expr_ordering(source, &relevant_deps)
{
Some((options, relevant_deps))
} else {
None
}
})
.flat_map(|(options, relevant_deps)| {
let sort_expr = PhysicalSortExpr {
expr: target.clone(),
options,
};
let mut dependency_orderings =
generate_dependency_orderings(&relevant_deps, &dependency_map);
for ordering in dependency_orderings.iter_mut() {
ordering.push(sort_expr.clone());
}
dependency_orderings
})
});
let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
if prefixes.is_empty() {
prefixes = vec![vec![]];
}
for ordering in prefixes.iter_mut() {
if let Some(target) = &node.target_sort_expr {
ordering.push(target.clone())
}
}
prefixes
});
orderings
.chain(projected_orderings)
.map(collapse_lex_ordering)
.collect()
}
fn projected_constants(
&self,
mapping: &ProjectionMapping,
) -> Vec<Arc<dyn PhysicalExpr>> {
let mut projected_constants = self
.constants
.iter()
.flat_map(|expr| self.eq_group.project_expr(mapping, expr))
.collect::<Vec<_>>();
for (source, target) in mapping.iter() {
if self.is_expr_constant(source)
&& !physical_exprs_contains(&projected_constants, target)
{
projected_constants.push(target.clone());
}
}
projected_constants
}
pub fn project(
&self,
projection_mapping: &ProjectionMapping,
output_schema: SchemaRef,
) -> Self {
let projected_constants = self.projected_constants(projection_mapping);
let projected_eq_group = self.eq_group.project(projection_mapping);
let projected_orderings = self.projected_orderings(projection_mapping);
Self {
eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
constants: projected_constants,
schema: output_schema,
}
}
pub fn find_longest_permutation(
&self,
exprs: &[Arc<dyn PhysicalExpr>],
) -> (LexOrdering, Vec<usize>) {
let mut eq_properties = self.clone();
let mut result = vec![];
let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
for _idx in 0..exprs.len() {
let ordered_exprs = search_indices
.iter()
.flat_map(|&idx| {
let ExprOrdering { expr, state, .. } =
eq_properties.get_expr_ordering(exprs[idx].clone());
if let SortProperties::Ordered(options) = state {
Some((PhysicalSortExpr { expr, options }, idx))
} else {
None
}
})
.collect::<Vec<_>>();
if ordered_exprs.is_empty() {
break;
}
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties =
eq_properties.add_constants(std::iter::once(expr.clone()));
search_indices.remove(idx);
}
result.extend(ordered_exprs);
}
result.into_iter().unzip()
}
fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
let normalized_constants = self.eq_group.normalize_exprs(self.constants.to_vec());
let normalized_expr = self.eq_group.normalize_expr(expr.clone());
is_constant_recurse(&normalized_constants, &normalized_expr)
}
pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) -> ExprOrdering {
ExprOrdering::new(expr.clone())
.transform_up(&|expr| Ok(update_ordering(expr, self)))
.unwrap()
}
}
fn update_ordering(
mut node: ExprOrdering,
eq_properties: &EquivalenceProperties,
) -> Transformed<ExprOrdering> {
let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone());
if eq_properties.is_expr_constant(&normalized_expr) {
node.state = SortProperties::Singleton;
} else if let Some(options) = eq_properties
.normalized_oeq_class()
.get_options(&normalized_expr)
{
node.state = SortProperties::Ordered(options);
} else if !node.expr.children().is_empty() {
node.state = node.expr.get_ordering(&node.children_state());
} else if node.expr.as_any().is::<Literal>() {
node.state = node.expr.get_ordering(&[]);
} else {
return Transformed::No(node);
}
Transformed::Yes(node)
}
fn is_constant_recurse(
constants: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
if physical_exprs_contains(constants, expr) {
return true;
}
let children = expr.children();
!children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
}
fn expr_refers(
referring_expr: &Arc<dyn PhysicalExpr>,
referred_expr: &Arc<dyn PhysicalExpr>,
) -> bool {
referring_expr.eq(referred_expr)
|| referring_expr
.children()
.iter()
.any(|child| expr_refers(child, referred_expr))
}
fn referred_dependencies(
dependency_map: &DependencyMap,
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(sort_expr.expr.clone());
expr_to_sort_exprs
.entry(key)
.or_default()
.insert(sort_expr.clone());
}
expr_to_sort_exprs
.values()
.multi_cartesian_product()
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
.collect()
}
fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
dependency_map[relevant_sort_expr]
.dependencies
.iter()
.flat_map(|dep| construct_orderings(dep, dependency_map))
.collect()
}
fn generate_dependency_orderings(
dependencies: &Dependencies,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let relevant_prefixes = dependencies
.iter()
.flat_map(|dep| {
let prefixes = construct_prefix_orderings(dep, dependency_map);
(!prefixes.is_empty()).then_some(prefixes)
})
.collect::<Vec<_>>();
if relevant_prefixes.is_empty() {
return vec![vec![]];
}
relevant_prefixes
.into_iter()
.multi_cartesian_product()
.flat_map(|prefix_orderings| {
prefix_orderings
.iter()
.permutations(prefix_orderings.len())
.map(|prefixes| prefixes.into_iter().flatten().cloned().collect())
.collect::<Vec<_>>()
})
.collect()
}
fn get_expr_ordering(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
) -> SortProperties {
if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
SortProperties::Ordered(column_order.options)
} else {
let child_states = expr
.children()
.iter()
.map(|child| get_expr_ordering(child, dependencies))
.collect::<Vec<_>>();
expr.get_ordering(&child_states)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DependencyNode {
target_sort_expr: Option<PhysicalSortExpr>,
dependencies: Dependencies,
}
impl DependencyNode {
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
}
}
}
type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = HashSet<PhysicalSortExpr>;
fn construct_orderings(
referred_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let node = &dependency_map[referred_sort_expr];
let target_sort_expr = node.target_sort_expr.clone().unwrap();
if node.dependencies.is_empty() {
vec![vec![target_sort_expr]]
} else {
node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = construct_orderings(dep, dependency_map);
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
orderings
})
.collect()
}
}
pub fn join_equivalence_properties(
left: EquivalenceProperties,
right: EquivalenceProperties,
join_type: &JoinType,
join_schema: SchemaRef,
maintains_input_order: &[bool],
probe_side: Option<JoinSide>,
on: &[(Column, Column)],
) -> EquivalenceProperties {
let left_size = left.schema.fields.len();
let mut result = EquivalenceProperties::new(join_schema);
result.add_equivalence_group(left.eq_group().join(
right.eq_group(),
join_type,
left_size,
on,
));
let left_oeq_class = left.oeq_class;
let mut right_oeq_class = right.oeq_class;
match maintains_input_order {
[true, false] => {
if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, join_type) {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
let out_oeq_class = left_oeq_class.join_suffix(&right_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(left_oeq_class);
}
}
[false, true] => {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
if let (Some(JoinSide::Right), JoinType::Inner) = (probe_side, join_type) {
let out_oeq_class = right_oeq_class.join_suffix(&left_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(right_oeq_class);
}
}
[false, false] => {}
[true, true] => unreachable!("Cannot maintain ordering of both sides"),
_ => unreachable!("Join operators can not have more than two children"),
}
result
}
fn updated_right_ordering_equivalence_class(
right_oeq_class: &mut OrderingEquivalenceClass,
join_type: &JoinType,
left_size: usize,
) {
if matches!(
join_type,
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right
) {
right_oeq_class.add_offset(left_size);
}
}
#[derive(Debug, Clone)]
struct ExprWrapper(Arc<dyn PhysicalExpr>);
impl PartialEq<Self> for ExprWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl Eq for ExprWrapper {}
impl Hash for ExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
#[cfg(test)]
mod tests {
use std::ops::Not;
use std::sync::Arc;
use super::*;
use crate::equivalence::add_offset_to_expr;
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
create_random_schema, create_test_params, create_test_schema,
generate_table_for_eq_properties, is_table_same_after_sort, output_schema,
};
use crate::execution_props::ExecutionProps;
use crate::expressions::{col, BinaryExpr, Column};
use crate::functions::create_physical_expr;
use crate::PhysicalSortExpr;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{Fields, SortOptions, TimeUnit};
use datafusion_common::Result;
use datafusion_expr::{BuiltinScalarFunction, Operator};
use itertools::Itertools;
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));
let input_properties = EquivalenceProperties::new(input_schema.clone());
let col_a = col("a", &input_schema)?;
let proj_exprs = vec![
(col_a.clone(), "a1".to_string()),
(col_a.clone(), "a2".to_string()),
(col_a.clone(), "a3".to_string()),
(col_a.clone(), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_schema = output_schema(&projection_mapping, &input_schema)?;
let proj_exprs = vec![
(col_a.clone(), "a1".to_string()),
(col_a.clone(), "a2".to_string()),
(col_a.clone(), "a3".to_string()),
(col_a.clone(), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let col_a1 = &col("a1", &out_schema)?;
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let out_properties = input_properties.project(&projection_mapping, out_schema);
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = &out_properties.eq_group().classes[0];
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
assert!(eq_class.contains(col_a3));
assert!(eq_class.contains(col_a4));
Ok(())
}
#[test]
fn test_join_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let offset = schema.fields.len();
let col_a2 = &add_offset_to_expr(col_a.clone(), offset);
let col_b2 = &add_offset_to_expr(col_b.clone(), offset);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let test_cases = vec![
(
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc)],
vec![(col_b, option_asc)],
vec![(col_c, option_asc)],
],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
vec![(col_c, option_asc), (col_a2, option_asc)],
vec![(col_c, option_asc), (col_b2, option_asc)],
],
),
];
for (left_orderings, right_orderings, expected) in test_cases {
let mut left_eq_properties = EquivalenceProperties::new(schema.clone());
let mut right_eq_properties = EquivalenceProperties::new(schema.clone());
let left_orderings = convert_to_orderings(&left_orderings);
let right_orderings = convert_to_orderings(&right_orderings);
let expected = convert_to_orderings(&expected);
left_eq_properties.add_new_orderings(left_orderings);
right_eq_properties.add_new_orderings(right_orderings);
let join_eq = join_equivalence_properties(
left_eq_properties,
right_eq_properties,
&JoinType::Inner,
Arc::new(Schema::empty()),
&[true, false],
Some(JoinSide::Left),
&[],
);
let orderings = &join_eq.oeq_class.orderings;
let err_msg = format!("expected: {:?}, actual:{:?}", expected, orderings);
assert_eq!(
join_eq.oeq_class.orderings.len(),
expected.len(),
"{}",
err_msg
);
for ordering in orderings {
assert!(
expected.contains(ordering),
"{}, ordering: {:?}",
err_msg,
ordering
);
}
}
Ok(())
}
#[test]
fn test_expr_consists_of_constants() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_d = col("d", &schema)?;
let b_plus_d = Arc::new(BinaryExpr::new(
col_b.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let constants = vec![col_a.clone(), col_b.clone()];
let expr = b_plus_d.clone();
assert!(!is_constant_recurse(&constants, &expr));
let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
let expr = b_plus_d.clone();
assert!(is_constant_recurse(&constants, &expr));
Ok(())
}
#[test]
fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> {
let join_type = JoinType::Inner;
let child_fields: Fields = ["x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let child_schema = Schema::new(child_fields);
let col_x = &col("x", &child_schema)?;
let col_y = &col("y", &child_schema)?;
let col_z = &col("z", &child_schema)?;
let col_w = &col("w", &child_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let mut right_oeq_class = OrderingEquivalenceClass::new(orderings);
let left_columns_len = 4;
let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let schema = Schema::new(fields);
let col_a = &col("a", &schema)?;
let col_d = &col("d", &schema)?;
let col_x = &col("x", &schema)?;
let col_y = &col("y", &schema)?;
let col_z = &col("z", &schema)?;
let col_w = &col("w", &schema)?;
let mut join_eq_properties = EquivalenceProperties::new(Arc::new(schema));
join_eq_properties.add_equal_conditions(col_a, col_x);
join_eq_properties.add_equal_conditions(col_d, col_w);
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
&join_type,
left_columns_len,
);
join_eq_properties.add_ordering_equivalence_class(right_oeq_class);
let result = join_eq_properties.oeq_class().clone();
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let expected = OrderingEquivalenceClass::new(orderings);
assert_eq!(result, expected);
Ok(())
}
#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let sort_options = SortOptions::default();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
let others = vec![
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
];
eq_properties.add_new_orderings(others);
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_new_orderings([
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
]);
let oeq_class = eq_properties.oeq_class().clone();
let expected = expected_eqs.oeq_class();
assert!(oeq_class.eq(expected));
Ok(())
}
#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
}],
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
],
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);
let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0]);
Ok(())
}
#[test]
fn test_update_ordering() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
eq_properties.add_equal_conditions(col_b, col_a);
eq_properties.add_new_orderings(vec![
vec![PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}],
vec![PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
}],
]);
let test_cases = vec![
(
Arc::new(BinaryExpr::new(
col_d.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>,
SortProperties::Ordered(option_asc),
),
(col_b.clone(), SortProperties::Ordered(option_asc)),
(col_a.clone(), SortProperties::Ordered(option_asc)),
(
Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_c.clone(),
)),
SortProperties::Unordered,
),
];
for (expr, expected) in test_cases {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_ordering = eq_properties.get_expr_ordering(expr.clone());
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
expr, expected, expr_ordering.state
);
assert_eq!(expr_ordering.state, expected, "{}", err_msg);
}
Ok(())
}
#[test]
fn test_find_longest_permutation_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 100;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let floor_a = create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let exprs = vec![
col("a", &test_schema)?,
col("b", &test_schema)?,
col("c", &test_schema)?,
col("d", &test_schema)?,
col("e", &test_schema)?,
col("f", &test_schema)?,
floor_a,
a_plus_b,
];
for n_req in 0..=exprs.len() {
for exprs in exprs.iter().combinations(n_req) {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let (ordering, indices) =
eq_properties.find_longest_permutation(&exprs);
let ordering2 = indices
.iter()
.zip(ordering.iter())
.map(|(&idx, sort_expr)| PhysicalSortExpr {
expr: exprs[idx].clone(),
options: sort_expr.options,
})
.collect::<Vec<_>>();
assert_eq!(
ordering, ordering2,
"indices and lexicographical ordering do not match"
);
let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
);
assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
assert!(
is_table_same_after_sort(
ordering.clone(),
table_data_with_properties.clone(),
)?,
"{}",
err_msg
);
}
}
}
Ok(())
}
#[test]
fn test_find_longest_permutation() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_h = &col("h", &test_schema)?;
let a_plus_d = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
},
PhysicalSortExpr {
expr: col_h.clone(),
options: option_desc,
},
]]);
let test_cases = vec![
(vec![col_a], vec![(col_a, option_asc)]),
(vec![col_c], vec![(col_c, option_asc)]),
(
vec![col_d, col_e, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
),
(vec![col_b], vec![]),
(vec![col_d], vec![(col_d, option_asc)]),
(vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
(
vec![col_b, col_d],
vec![(col_d, option_asc), (col_b, option_asc)],
),
(
vec![col_c, col_e],
vec![(col_c, option_asc), (col_e, option_desc)],
),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_get_meet_ordering() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, option_asc)],
vec![(col_a, option_asc), (col_b, option_asc)],
Some(vec![(col_a, option_asc)]),
),
(vec![(col_a, option_asc)], vec![(col_a, option_desc)], None),
(
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_a, option_asc), (col_b, option_desc)],
Some(vec![(col_a, option_asc)]),
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_exprs(&lhs);
let rhs = convert_to_sort_exprs(&rhs);
let expected = expected.map(|expected| convert_to_sort_exprs(&expected));
let finer = eq_properties.get_meet_ordering(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_get_finer() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, None), (col_b, Some(option_asc))],
Some(vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))]),
),
(
vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
],
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
Some(vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
]),
),
(
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_a, Some(option_asc)), (col_b, Some(option_desc))],
None,
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_reqs(&lhs);
let rhs = convert_to_sort_reqs(&rhs);
let expected = expected.map(|expected| convert_to_sort_reqs(&expected));
let finer = eq_properties.get_finer_requirement(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_normalize_sort_reqs() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let requirements = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(
vec![(col_a, Some(option_desc))],
vec![(col_a, Some(option_desc))],
),
(vec![(col_a, None)], vec![(col_a, None)]),
(
vec![(col_c, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(vec![(col_c, None)], vec![(col_a, None)]),
(
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
),
(
vec![(col_d, None), (col_b, None)],
vec![(col_d, None), (col_b, None)],
),
(
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
),
(
vec![(col_e, Some(option_desc)), (col_f, None)],
vec![(col_e, Some(option_desc)), (col_f, None)],
),
(
vec![(col_e, None), (col_f, None)],
vec![(col_e, None), (col_f, None)],
),
];
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_sort_reqs(&reqs);
let expected_normalized = convert_to_sort_reqs(&expected_normalized);
assert_eq!(
eq_properties.normalize_sort_requirements(&req),
expected_normalized
);
}
Ok(())
}
#[test]
fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let test_cases = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, None)], vec![(col_a, None)]),
(vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
];
for (reqs, expected) in test_cases.into_iter() {
let reqs = convert_to_sort_reqs(&reqs);
let expected = convert_to_sort_reqs(&expected);
let normalized = eq_properties.normalize_sort_requirements(&reqs);
assert!(
expected.eq(&normalized),
"error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
);
}
Ok(())
}
}