use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::expressions::{Column, Literal};
use crate::physical_expr::deduplicate_physical_exprs;
use crate::sort_properties::{ExprOrdering, SortProperties};
use crate::{
physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef,
LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{JoinSide, JoinType, Result};
use indexmap::IndexSet;
use itertools::Itertools;
#[derive(Debug, Clone)]
pub struct EquivalenceClass {
exprs: Vec<Arc<dyn PhysicalExpr>>,
}
impl PartialEq for EquivalenceClass {
fn eq(&self, other: &Self) -> bool {
physical_exprs_bag_equal(&self.exprs, &other.exprs)
}
}
impl EquivalenceClass {
pub fn new_empty() -> Self {
Self { exprs: vec![] }
}
pub fn new(mut exprs: Vec<Arc<dyn PhysicalExpr>>) -> Self {
deduplicate_physical_exprs(&mut exprs);
Self { exprs }
}
pub fn into_vec(self) -> Vec<Arc<dyn PhysicalExpr>> {
self.exprs
}
fn canonical_expr(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.exprs.first().cloned()
}
pub fn push(&mut self, expr: Arc<dyn PhysicalExpr>) {
if !self.contains(&expr) {
self.exprs.push(expr);
}
}
pub fn extend(&mut self, other: Self) {
for expr in other.exprs {
self.push(expr);
}
}
pub fn contains(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
physical_exprs_contains(&self.exprs, expr)
}
pub fn contains_any(&self, other: &Self) -> bool {
self.exprs.iter().any(|e| other.contains(e))
}
pub fn len(&self) -> usize {
self.exprs.len()
}
pub fn is_empty(&self) -> bool {
self.exprs.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &Arc<dyn PhysicalExpr>> {
self.exprs.iter()
}
pub fn with_offset(&self, offset: usize) -> Self {
let new_exprs = self
.exprs
.iter()
.cloned()
.map(|e| add_offset_to_expr(e, offset))
.collect();
Self::new(new_exprs)
}
}
#[derive(Debug, Clone)]
pub struct ProjectionMapping {
map: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
}
impl ProjectionMapping {
pub fn try_new(
expr: &[(Arc<dyn PhysicalExpr>, String)],
input_schema: &SchemaRef,
) -> Result<Self> {
expr.iter()
.enumerate()
.map(|(expr_idx, (expression, name))| {
let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
expression
.clone()
.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
Some(col) => {
let idx = col.index();
let matching_input_field = input_schema.field(idx);
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::Yes(Arc::new(matching_input_column)))
}
None => Ok(Transformed::No(e)),
})
.map(|source_expr| (source_expr, target_expr))
})
.collect::<Result<Vec<_>>>()
.map(|map| Self { map })
}
pub fn iter(
&self,
) -> impl Iterator<Item = &(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> + '_ {
self.map.iter()
}
pub fn target_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<Arc<dyn PhysicalExpr>> {
self.map
.iter()
.find(|(source, _)| source.eq(expr))
.map(|(_, target)| target.clone())
}
}
#[derive(Debug, Clone)]
pub struct EquivalenceGroup {
classes: Vec<EquivalenceClass>,
}
impl EquivalenceGroup {
fn empty() -> Self {
Self { classes: vec![] }
}
fn new(classes: Vec<EquivalenceClass>) -> Self {
let mut result = Self { classes };
result.remove_redundant_entries();
result
}
fn len(&self) -> usize {
self.classes.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
self.classes.iter()
}
fn add_equal_conditions(
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) {
let mut first_class = None;
let mut second_class = None;
for (idx, cls) in self.classes.iter().enumerate() {
if cls.contains(left) {
first_class = Some(idx);
}
if cls.contains(right) {
second_class = Some(idx);
}
}
match (first_class, second_class) {
(Some(mut first_idx), Some(mut second_idx)) => {
if first_idx != second_idx {
if first_idx > second_idx {
(first_idx, second_idx) = (second_idx, first_idx);
}
let other_class = self.classes.swap_remove(second_idx);
self.classes[first_idx].extend(other_class);
}
}
(Some(group_idx), None) => {
self.classes[group_idx].push(right.clone());
}
(None, Some(group_idx)) => {
self.classes[group_idx].push(left.clone());
}
(None, None) => {
self.classes
.push(EquivalenceClass::new(vec![left.clone(), right.clone()]));
}
}
}
fn remove_redundant_entries(&mut self) {
self.classes.retain_mut(|cls| {
cls.len() > 1
});
self.bridge_classes()
}
fn bridge_classes(&mut self) {
let mut idx = 0;
while idx < self.classes.len() {
let mut next_idx = idx + 1;
let start_size = self.classes[idx].len();
while next_idx < self.classes.len() {
if self.classes[idx].contains_any(&self.classes[next_idx]) {
let extension = self.classes.swap_remove(next_idx);
self.classes[idx].extend(extension);
} else {
next_idx += 1;
}
}
if self.classes[idx].len() > start_size {
continue;
}
idx += 1;
}
}
fn extend(&mut self, other: Self) {
self.classes.extend(other.classes);
self.remove_redundant_entries();
}
pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
expr.clone()
.transform(&|expr| {
for cls in self.iter() {
if cls.contains(&expr) {
return Ok(Transformed::Yes(cls.canonical_expr().unwrap()));
}
}
Ok(Transformed::No(expr))
})
.unwrap_or(expr)
}
pub fn normalize_sort_expr(
&self,
mut sort_expr: PhysicalSortExpr,
) -> PhysicalSortExpr {
sort_expr.expr = self.normalize_expr(sort_expr.expr);
sort_expr
}
pub fn normalize_sort_requirement(
&self,
mut sort_requirement: PhysicalSortRequirement,
) -> PhysicalSortRequirement {
sort_requirement.expr = self.normalize_expr(sort_requirement.expr);
sort_requirement
}
pub fn normalize_exprs(
&self,
exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
) -> Vec<Arc<dyn PhysicalExpr>> {
exprs
.into_iter()
.map(|expr| self.normalize_expr(expr))
.collect()
}
pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
}
pub fn normalize_sort_requirements(
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
collapse_lex_req(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
)
}
fn project_expr(
&self,
mapping: &ProjectionMapping,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(target) = mapping.target_expr(expr) {
return Some(target);
} else {
for (source, target) in mapping.iter() {
if self
.get_equivalence_class(source)
.map_or(false, |group| group.contains(expr))
{
return Some(target.clone());
}
}
}
let children = expr.children();
if children.is_empty() {
return None;
}
children
.into_iter()
.map(|child| self.project_expr(mapping, &child))
.collect::<Option<Vec<_>>>()
.map(|children| expr.clone().with_new_children(children).unwrap())
}
pub fn project(&self, mapping: &ProjectionMapping) -> Self {
let projected_classes = self.iter().filter_map(|cls| {
let new_class = cls
.iter()
.filter_map(|expr| self.project_expr(mapping, expr))
.collect::<Vec<_>>();
(new_class.len() > 1).then_some(EquivalenceClass::new(new_class))
});
let mut new_classes = vec![];
for (source, target) in mapping.iter() {
if new_classes.is_empty() {
new_classes.push((source, vec![target.clone()]));
}
if let Some((_, values)) =
new_classes.iter_mut().find(|(key, _)| key.eq(source))
{
if !physical_exprs_contains(values, target) {
values.push(target.clone());
}
}
}
let new_classes = new_classes
.into_iter()
.filter_map(|(_, values)| (values.len() > 1).then_some(values))
.map(EquivalenceClass::new);
let classes = projected_classes.chain(new_classes).collect();
Self::new(classes)
}
fn get_equivalence_class(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<&EquivalenceClass> {
self.iter().find(|cls| cls.contains(expr))
}
pub fn join(
&self,
right_equivalences: &Self,
join_type: &JoinType,
left_size: usize,
on: &[(Column, Column)],
) -> Self {
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
let mut result = Self::new(
self.iter()
.cloned()
.chain(
right_equivalences
.iter()
.map(|cls| cls.with_offset(left_size)),
)
.collect(),
);
if join_type == &JoinType::Inner {
for (lhs, rhs) in on.iter() {
let index = rhs.index() + left_size;
let new_lhs = Arc::new(lhs.clone()) as _;
let new_rhs = Arc::new(Column::new(rhs.name(), index)) as _;
result.add_equal_conditions(&new_lhs, &new_rhs);
}
}
result
}
JoinType::LeftSemi | JoinType::LeftAnti => self.clone(),
JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(),
}
}
}
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
let mut output = Vec::<PhysicalSortExpr>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct OrderingEquivalenceClass {
orderings: Vec<LexOrdering>,
}
impl OrderingEquivalenceClass {
fn empty() -> Self {
Self { orderings: vec![] }
}
pub fn clear(&mut self) {
self.orderings.clear();
}
pub fn new(orderings: Vec<LexOrdering>) -> Self {
let mut result = Self { orderings };
result.remove_redundant_entries();
result
}
pub fn contains(&self, ordering: &LexOrdering) -> bool {
self.orderings.contains(ordering)
}
#[allow(dead_code)]
fn push(&mut self, ordering: LexOrdering) {
self.orderings.push(ordering);
self.remove_redundant_entries();
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> impl Iterator<Item = &LexOrdering> {
self.orderings.iter()
}
pub fn len(&self) -> usize {
self.orderings.len()
}
pub fn extend(&mut self, other: Self) {
self.orderings.extend(other.orderings);
self.remove_redundant_entries();
}
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
self.orderings.extend(orderings);
self.remove_redundant_entries();
}
fn remove_redundant_entries(&mut self) {
let mut work = true;
while work {
work = false;
let mut idx = 0;
while idx < self.orderings.len() {
let mut ordering_idx = idx + 1;
let mut removal = self.orderings[idx].is_empty();
while ordering_idx < self.orderings.len() {
work |= resolve_overlap(&mut self.orderings, idx, ordering_idx);
if self.orderings[idx].is_empty() {
removal = true;
break;
}
work |= resolve_overlap(&mut self.orderings, ordering_idx, idx);
if self.orderings[ordering_idx].is_empty() {
self.orderings.swap_remove(ordering_idx);
} else {
ordering_idx += 1;
}
}
if removal {
self.orderings.swap_remove(idx);
} else {
idx += 1;
}
}
}
}
pub fn output_ordering(&self) -> Option<LexOrdering> {
let output_ordering = self.orderings.iter().flatten().cloned().collect();
let output_ordering = collapse_lex_ordering(output_ordering);
(!output_ordering.is_empty()).then_some(output_ordering)
}
pub fn join_suffix(mut self, other: &Self) -> Self {
let n_ordering = self.orderings.len();
let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering);
self.orderings = self
.orderings
.iter()
.cloned()
.cycle()
.take(n_cross)
.collect();
for (outer_idx, ordering) in other.iter().enumerate() {
for idx in 0..n_ordering {
let idx = outer_idx * n_ordering + idx;
self.orderings[idx].extend(ordering.iter().cloned());
}
}
self
}
pub fn add_offset(&mut self, offset: usize) {
for ordering in self.orderings.iter_mut() {
for sort_expr in ordering {
sort_expr.expr = add_offset_to_expr(sort_expr.expr.clone(), offset);
}
}
}
fn get_options(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<SortOptions> {
for ordering in self.iter() {
let leading_ordering = &ordering[0];
if leading_ordering.expr.eq(expr) {
return Some(leading_ordering.options);
}
}
None
}
}
pub fn add_offset_to_expr(
expr: Arc<dyn PhysicalExpr>,
offset: usize,
) -> Arc<dyn PhysicalExpr> {
expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
col.name(),
offset + col.index(),
)))),
None => Ok(Transformed::No(e)),
})
.unwrap()
}
fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool {
let length = orderings[idx].len();
let other_length = orderings[pre_idx].len();
for overlap in 1..=length.min(other_length) {
if orderings[idx][length - overlap..] == orderings[pre_idx][..overlap] {
orderings[idx].truncate(length - overlap);
return true;
}
}
false
}
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
eq_group: EquivalenceGroup,
oeq_class: OrderingEquivalenceClass,
constants: Vec<Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
}
impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
}
pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
constants: vec![],
schema,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
&self.oeq_class
}
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
}
pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.constants
}
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
)
}
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
self.oeq_class.extend(other.oeq_class);
self.add_constants(other.constants)
}
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
}
pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
self.oeq_class.extend(other);
}
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
self.oeq_class.add_new_orderings(orderings);
}
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
}
pub fn add_equal_conditions(
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) {
self.eq_group.add_equal_conditions(left, right);
}
pub fn add_constants(
mut self,
constants: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
) -> Self {
for expr in self.eq_group.normalize_exprs(constants) {
if !physical_exprs_contains(&self.constants, &expr) {
self.constants.push(expr);
}
}
self
}
pub fn with_reorder(mut self, sort_exprs: Vec<PhysicalSortExpr>) -> Self {
self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
self
}
fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
}
fn normalize_sort_requirements(
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
let constants_normalized = self.eq_group.normalize_exprs(self.constants.clone());
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
}
pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
self.ordering_satisfy_requirement(&sort_requirements)
}
pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool {
let mut eq_properties = self.clone();
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
for normalized_req in normalized_reqs {
if !eq_properties.ordering_satisfy_single(&normalized_req) {
return false;
}
eq_properties =
eq_properties.add_constants(std::iter::once(normalized_req.expr));
}
true
}
fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
let expr_ordering = self.get_expr_ordering(req.expr.clone());
let ExprOrdering { expr, state, .. } = expr_ordering;
match state {
SortProperties::Ordered(options) => {
let sort_expr = PhysicalSortExpr { expr, options };
sort_expr.satisfy(req, self.schema())
}
SortProperties::Singleton => true,
SortProperties::Unordered => false,
}
}
pub fn requirements_compatible(
&self,
given: LexRequirementRef,
reference: LexRequirementRef,
) -> bool {
let normalized_given = self.normalize_sort_requirements(given);
let normalized_reference = self.normalize_sort_requirements(reference);
(normalized_reference.len() <= normalized_given.len())
&& normalized_reference
.into_iter()
.zip(normalized_given)
.all(|(reference, given)| given.compatible(&reference))
}
pub fn get_finer_ordering(
&self,
lhs: LexOrderingRef,
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let finer = self.get_finer_requirement(&lhs, &rhs);
finer.map(PhysicalSortRequirement::to_sort_exprs)
}
pub fn get_finer_requirement(
&self,
req1: LexRequirementRef,
req2: LexRequirementRef,
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.iter_mut()
.zip(rhs.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
(Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
(Some(options), None) => {
rhs.options = Some(options);
true
}
(None, Some(options)) => {
lhs.options = Some(options);
true
}
(None, None) => true,
}
})
.then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
}
pub fn get_meet_ordering(
&self,
lhs: LexOrderingRef,
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
let lhs = self.normalize_sort_exprs(lhs);
let rhs = self.normalize_sort_exprs(rhs);
let mut meet = vec![];
for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) {
if lhs.eq(&rhs) {
meet.push(lhs);
} else {
break;
}
}
(!meet.is_empty()).then_some(meet)
}
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
projection_mapping: &ProjectionMapping,
) -> Option<Arc<dyn PhysicalExpr>> {
self.eq_group.project_expr(projection_mapping, expr)
}
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = HashMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
self.project_expr(&sort_expr.expr, mapping).map(|expr| {
PhysicalSortExpr {
expr,
options: sort_expr.options,
}
});
let is_projected = target_sort_expr.is_some();
if is_projected
|| mapping
.iter()
.any(|(source, _)| expr_refers(source, &sort_expr.expr))
{
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
dependency_map
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: HashSet::new(),
})
.insert_dependency(dependency);
}
if !is_projected {
break;
}
}
}
dependency_map
}
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
ProjectionMapping {
map: mapping
.iter()
.map(|(source, target)| {
let normalized_source = self.eq_group.normalize_expr(source.clone());
(normalized_source, target.clone())
})
.collect(),
}
}
fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
let mapping = self.normalized_mapping(mapping);
let dependency_map = self.construct_dependency_map(&mapping);
let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|relevant_deps| {
if let SortProperties::Ordered(options) =
get_expr_ordering(source, &relevant_deps)
{
Some((options, relevant_deps))
} else {
None
}
})
.flat_map(|(options, relevant_deps)| {
let sort_expr = PhysicalSortExpr {
expr: target.clone(),
options,
};
let mut dependency_orderings =
generate_dependency_orderings(&relevant_deps, &dependency_map);
for ordering in dependency_orderings.iter_mut() {
ordering.push(sort_expr.clone());
}
dependency_orderings
})
});
let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
if prefixes.is_empty() {
prefixes = vec![vec![]];
}
for ordering in prefixes.iter_mut() {
if let Some(target) = &node.target_sort_expr {
ordering.push(target.clone())
}
}
prefixes
});
orderings
.chain(projected_orderings)
.map(collapse_lex_ordering)
.collect()
}
fn projected_constants(
&self,
mapping: &ProjectionMapping,
) -> Vec<Arc<dyn PhysicalExpr>> {
let mut projected_constants = self
.constants
.iter()
.flat_map(|expr| self.eq_group.project_expr(mapping, expr))
.collect::<Vec<_>>();
for (source, target) in mapping.iter() {
if self.is_expr_constant(source)
&& !physical_exprs_contains(&projected_constants, target)
{
projected_constants.push(target.clone());
}
}
projected_constants
}
pub fn project(
&self,
projection_mapping: &ProjectionMapping,
output_schema: SchemaRef,
) -> Self {
let projected_constants = self.projected_constants(projection_mapping);
let projected_eq_group = self.eq_group.project(projection_mapping);
let projected_orderings = self.projected_orderings(projection_mapping);
Self {
eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
constants: projected_constants,
schema: output_schema,
}
}
pub fn find_longest_permutation(
&self,
exprs: &[Arc<dyn PhysicalExpr>],
) -> (LexOrdering, Vec<usize>) {
let mut eq_properties = self.clone();
let mut result = vec![];
let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
for _idx in 0..exprs.len() {
let ordered_exprs = search_indices
.iter()
.flat_map(|&idx| {
let ExprOrdering { expr, state, .. } =
eq_properties.get_expr_ordering(exprs[idx].clone());
if let SortProperties::Ordered(options) = state {
Some((PhysicalSortExpr { expr, options }, idx))
} else {
None
}
})
.collect::<Vec<_>>();
if ordered_exprs.is_empty() {
break;
}
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties =
eq_properties.add_constants(std::iter::once(expr.clone()));
search_indices.remove(idx);
}
result.extend(ordered_exprs);
}
result.into_iter().unzip()
}
fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
let normalized_constants = self.eq_group.normalize_exprs(self.constants.to_vec());
let normalized_expr = self.eq_group.normalize_expr(expr.clone());
is_constant_recurse(&normalized_constants, &normalized_expr)
}
pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) -> ExprOrdering {
ExprOrdering::new(expr.clone())
.transform_up(&|expr| Ok(update_ordering(expr, self)))
.unwrap()
}
}
fn is_constant_recurse(
constants: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
if physical_exprs_contains(constants, expr) {
return true;
}
let children = expr.children();
!children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
}
fn expr_refers(
referring_expr: &Arc<dyn PhysicalExpr>,
referred_expr: &Arc<dyn PhysicalExpr>,
) -> bool {
referring_expr.eq(referred_expr)
|| referring_expr
.children()
.iter()
.any(|child| expr_refers(child, referred_expr))
}
#[derive(Debug, Clone)]
struct ExprWrapper(Arc<dyn PhysicalExpr>);
impl PartialEq<Self> for ExprWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl Eq for ExprWrapper {}
impl Hash for ExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
fn referred_dependencies(
dependency_map: &DependencyMap,
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(sort_expr.expr.clone());
expr_to_sort_exprs
.entry(key)
.or_default()
.insert(sort_expr.clone());
}
expr_to_sort_exprs
.values()
.multi_cartesian_product()
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
.collect()
}
fn construct_orderings(
referred_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let node = &dependency_map[referred_sort_expr];
let target_sort_expr = node.target_sort_expr.clone().unwrap();
if node.dependencies.is_empty() {
vec![vec![target_sort_expr]]
} else {
node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = construct_orderings(dep, dependency_map);
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
orderings
})
.collect()
}
}
fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
dependency_map[relevant_sort_expr]
.dependencies
.iter()
.flat_map(|dep| construct_orderings(dep, dependency_map))
.collect()
}
fn generate_dependency_orderings(
dependencies: &Dependencies,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let relevant_prefixes = dependencies
.iter()
.flat_map(|dep| {
let prefixes = construct_prefix_orderings(dep, dependency_map);
(!prefixes.is_empty()).then_some(prefixes)
})
.collect::<Vec<_>>();
if relevant_prefixes.is_empty() {
return vec![vec![]];
}
relevant_prefixes
.into_iter()
.multi_cartesian_product()
.flat_map(|prefix_orderings| {
prefix_orderings
.iter()
.permutations(prefix_orderings.len())
.map(|prefixes| prefixes.into_iter().flatten().cloned().collect())
.collect::<Vec<_>>()
})
.collect()
}
fn get_expr_ordering(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
) -> SortProperties {
if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
SortProperties::Ordered(column_order.options)
} else {
let child_states = expr
.children()
.iter()
.map(|child| get_expr_ordering(child, dependencies))
.collect::<Vec<_>>();
expr.get_ordering(&child_states)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DependencyNode {
target_sort_expr: Option<PhysicalSortExpr>,
dependencies: Dependencies,
}
impl DependencyNode {
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
}
}
}
type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = HashSet<PhysicalSortExpr>;
pub fn join_equivalence_properties(
left: EquivalenceProperties,
right: EquivalenceProperties,
join_type: &JoinType,
join_schema: SchemaRef,
maintains_input_order: &[bool],
probe_side: Option<JoinSide>,
on: &[(Column, Column)],
) -> EquivalenceProperties {
let left_size = left.schema.fields.len();
let mut result = EquivalenceProperties::new(join_schema);
result.add_equivalence_group(left.eq_group().join(
right.eq_group(),
join_type,
left_size,
on,
));
let left_oeq_class = left.oeq_class;
let mut right_oeq_class = right.oeq_class;
match maintains_input_order {
[true, false] => {
if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, join_type) {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
let out_oeq_class = left_oeq_class.join_suffix(&right_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(left_oeq_class);
}
}
[false, true] => {
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
join_type,
left_size,
);
if let (Some(JoinSide::Right), JoinType::Inner) = (probe_side, join_type) {
let out_oeq_class = right_oeq_class.join_suffix(&left_oeq_class);
result.add_ordering_equivalence_class(out_oeq_class);
} else {
result.add_ordering_equivalence_class(right_oeq_class);
}
}
[false, false] => {}
[true, true] => unreachable!("Cannot maintain ordering of both sides"),
_ => unreachable!("Join operators can not have more than two children"),
}
result
}
fn updated_right_ordering_equivalence_class(
right_oeq_class: &mut OrderingEquivalenceClass,
join_type: &JoinType,
left_size: usize,
) {
if matches!(
join_type,
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right
) {
right_oeq_class.add_offset(left_size);
}
}
fn update_ordering(
mut node: ExprOrdering,
eq_properties: &EquivalenceProperties,
) -> Transformed<ExprOrdering> {
let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone());
if eq_properties.is_expr_constant(&normalized_expr) {
node.state = SortProperties::Singleton;
} else if let Some(options) = eq_properties
.normalized_oeq_class()
.get_options(&normalized_expr)
{
node.state = SortProperties::Ordered(options);
} else if !node.expr.children().is_empty() {
node.state = node.expr.get_ordering(&node.children_state());
} else if node.expr.as_any().is::<Literal>() {
node.state = node.expr.get_ordering(&[]);
} else {
return Transformed::No(node);
}
Transformed::Yes(node)
}
#[cfg(test)]
mod tests {
use std::ops::Not;
use std::sync::Arc;
use super::*;
use crate::execution_props::ExecutionProps;
use crate::expressions::{col, lit, BinaryExpr, Column, Literal};
use crate::functions::create_physical_expr;
use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array};
use arrow_schema::{Fields, SortOptions, TimeUnit};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{BuiltinScalarFunction, Operator};
use itertools::{izip, Itertools};
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
fn output_schema(
mapping: &ProjectionMapping,
input_schema: &Arc<Schema>,
) -> Result<SchemaRef> {
let fields: Result<Vec<Field>> = mapping
.iter()
.map(|(source, target)| {
let name = target
.as_any()
.downcast_ref::<Column>()
.ok_or_else(|| plan_datafusion_err!("Expects to have column"))?
.name();
let field = Field::new(
name,
source.data_type(input_schema)?,
source.nullable(input_schema)?,
);
Ok(field)
})
.collect();
let output_schema = Arc::new(Schema::new_with_metadata(
fields?,
input_schema.metadata().clone(),
));
Ok(output_schema)
}
fn create_test_schema() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
let f = Field::new("f", DataType::Int32, true);
let g = Field::new("g", DataType::Int32, true);
let h = Field::new("h", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f, g, h]));
Ok(schema)
}
fn create_test_params() -> Result<(SchemaRef, EquivalenceProperties)> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_g = &col("g", &test_schema)?;
let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
eq_properties.add_equal_conditions(col_a, col_c);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let orderings = vec![
vec![(col_a, option_asc)],
vec![(col_d, option_asc), (col_b, option_asc)],
vec![
(col_e, option_desc),
(col_f, option_asc),
(col_g, option_asc),
],
];
let orderings = convert_to_orderings(&orderings);
eq_properties.add_new_orderings(orderings);
Ok((test_schema, eq_properties))
}
fn create_test_schema_2() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Float64, true);
let b = Field::new("b", DataType::Float64, true);
let c = Field::new("c", DataType::Float64, true);
let d = Field::new("d", DataType::Float64, true);
let e = Field::new("e", DataType::Float64, true);
let f = Field::new("f", DataType::Float64, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
Ok(schema)
}
fn create_random_schema(seed: u64) -> Result<(SchemaRef, EquivalenceProperties)> {
let test_schema = create_test_schema_2()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];
let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
eq_properties.add_equal_conditions(col_a, col_f);
eq_properties = eq_properties.add_constants([col_e.clone()]);
let mut rng = StdRng::seed_from_u64(seed);
let mut remaining_exprs = col_exprs[0..4].to_vec();
let options_asc = SortOptions {
descending: false,
nulls_first: false,
};
while !remaining_exprs.is_empty() {
let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
remaining_exprs.shuffle(&mut rng);
let ordering = remaining_exprs
.drain(0..n_sort_expr)
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: options_asc,
})
.collect();
eq_properties.add_new_orderings([ordering]);
}
Ok((test_schema, eq_properties))
}
fn convert_to_sort_reqs(
in_data: &[(&Arc<dyn PhysicalExpr>, Option<SortOptions>)],
) -> Vec<PhysicalSortRequirement> {
in_data
.iter()
.map(|(expr, options)| {
PhysicalSortRequirement::new((*expr).clone(), *options)
})
.collect()
}
fn convert_to_sort_exprs(
in_data: &[(&Arc<dyn PhysicalExpr>, SortOptions)],
) -> Vec<PhysicalSortExpr> {
in_data
.iter()
.map(|(expr, options)| PhysicalSortExpr {
expr: (*expr).clone(),
options: *options,
})
.collect()
}
fn convert_to_orderings(
orderings: &[Vec<(&Arc<dyn PhysicalExpr>, SortOptions)>],
) -> Vec<Vec<PhysicalSortExpr>> {
orderings
.iter()
.map(|sort_exprs| convert_to_sort_exprs(sort_exprs))
.collect()
}
fn convert_to_sort_exprs_owned(
in_data: &[(Arc<dyn PhysicalExpr>, SortOptions)],
) -> Vec<PhysicalSortExpr> {
in_data
.iter()
.map(|(expr, options)| PhysicalSortExpr {
expr: (*expr).clone(),
options: *options,
})
.collect()
}
fn convert_to_orderings_owned(
orderings: &[Vec<(Arc<dyn PhysicalExpr>, SortOptions)>],
) -> Vec<Vec<PhysicalSortExpr>> {
orderings
.iter()
.map(|sort_exprs| convert_to_sort_exprs_owned(sort_exprs))
.collect()
}
fn apply_projection(
proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
input_data: &RecordBatch,
input_eq_properties: &EquivalenceProperties,
) -> Result<(RecordBatch, EquivalenceProperties)> {
let input_schema = input_data.schema();
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let output_schema = output_schema(&projection_mapping, &input_schema)?;
let num_rows = input_data.num_rows();
let projected_values = projection_mapping
.iter()
.map(|(source, _target)| source.evaluate(input_data)?.into_array(num_rows))
.collect::<Result<Vec<_>>>()?;
let projected_batch = if projected_values.is_empty() {
RecordBatch::new_empty(output_schema.clone())
} else {
RecordBatch::try_new(output_schema.clone(), projected_values)?
};
let projected_eq =
input_eq_properties.project(&projection_mapping, output_schema);
Ok((projected_batch, projected_eq))
}
#[test]
fn add_equal_conditions_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("x", DataType::Int64, true),
Field::new("y", DataType::Int64, true),
]));
let mut eq_properties = EquivalenceProperties::new(schema);
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
let col_x_expr = Arc::new(Column::new("x", 3)) as Arc<dyn PhysicalExpr>;
let col_y_expr = Arc::new(Column::new("y", 4)) as Arc<dyn PhysicalExpr>;
eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr);
assert_eq!(eq_properties.eq_group().len(), 1);
eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr);
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 2);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr);
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 3);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
assert!(eq_groups.contains(&col_c_expr));
eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr);
assert_eq!(eq_properties.eq_group().len(), 2);
eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr);
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 5);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
assert!(eq_groups.contains(&col_c_expr));
assert!(eq_groups.contains(&col_x_expr));
assert!(eq_groups.contains(&col_y_expr));
Ok(())
}
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));
let input_properties = EquivalenceProperties::new(input_schema.clone());
let col_a = col("a", &input_schema)?;
let proj_exprs = vec![
(col_a.clone(), "a1".to_string()),
(col_a.clone(), "a2".to_string()),
(col_a.clone(), "a3".to_string()),
(col_a.clone(), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_schema = output_schema(&projection_mapping, &input_schema)?;
let proj_exprs = vec![
(col_a.clone(), "a1".to_string()),
(col_a.clone(), "a2".to_string()),
(col_a.clone(), "a3".to_string()),
(col_a.clone(), "a4".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let col_a1 = &col("a1", &out_schema)?;
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let out_properties = input_properties.project(&projection_mapping, out_schema);
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = &out_properties.eq_group().classes[0];
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
assert!(eq_class.contains(col_a3));
assert!(eq_class.contains(col_a4));
Ok(())
}
#[test]
fn test_ordering_satisfy() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]));
let crude = vec![PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
}];
let finer = vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: SortOptions::default(),
},
];
let mut eq_properties_finer = EquivalenceProperties::new(input_schema.clone());
eq_properties_finer.oeq_class.push(finer.clone());
assert!(eq_properties_finer.ordering_satisfy(&crude));
let mut eq_properties_crude = EquivalenceProperties::new(input_schema.clone());
eq_properties_crude.oeq_class.push(crude.clone());
assert!(!eq_properties_crude.ordering_satisfy(&finer));
Ok(())
}
#[test]
fn test_ordering_satisfy_with_equivalence() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_g = &col("g", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, 625, 5)?;
let requirements = vec![
(vec![(col_a, option_asc)], true),
(vec![(col_a, option_desc)], false),
(vec![(col_c, option_asc)], true),
(vec![(col_c, option_desc)], false),
(vec![(col_d, option_asc)], true),
(vec![(col_d, option_asc), (col_b, option_asc)], true),
(vec![(col_d, option_desc), (col_b, option_asc)], false),
(
vec![
(col_e, option_desc),
(col_f, option_asc),
(col_g, option_asc),
],
true,
),
(vec![(col_e, option_desc), (col_f, option_asc)], true),
(vec![(col_e, option_asc), (col_f, option_asc)], false),
(vec![(col_e, option_desc), (col_b, option_asc)], false),
(vec![(col_e, option_asc), (col_b, option_asc)], false),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_d, option_asc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_desc),
(col_f, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_d, option_desc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_asc),
(col_f, option_asc),
],
false,
),
(
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_asc),
(col_b, option_asc),
],
false,
),
(vec![(col_d, option_asc), (col_e, option_desc)], true),
(
vec![
(col_d, option_asc),
(col_c, option_asc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_f, option_asc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_c, option_asc),
(col_b, option_asc),
],
true,
),
(
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
(col_f, option_asc),
],
true,
),
];
for (cols, expected) in requirements {
let err_msg = format!("Error in test case:{cols:?}");
let required = cols
.into_iter()
.map(|(expr, options)| PhysicalSortExpr {
expr: expr.clone(),
options,
})
.collect::<Vec<_>>();
assert_eq!(
is_table_same_after_sort(
required.clone(),
table_data_with_properties.clone()
)?,
expected
);
assert_eq!(
eq_properties.ordering_satisfy(&required),
expected,
"{err_msg}"
);
}
Ok(())
}
#[test]
fn test_ordering_satisfy_with_equivalence2() -> Result<()> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let floor_a = &create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let floor_f = &create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("f", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let exp_a = &create_physical_expr(
&BuiltinScalarFunction::Exp,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>;
let options = SortOptions {
descending: false,
nulls_first: false,
};
let test_cases = vec![
(
vec![
vec![(col_a, options), (col_d, options), (col_b, options)],
vec![(col_c, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(floor_a, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(floor_f, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_c, options), (&a_plus_b, options)],
true,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(floor_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(exp_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_d, options), (col_b, options)],
vec![(col_c, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (col_d, options), (floor_a, options)],
true,
),
(
vec![
vec![(col_a, options), (col_c, options), (col_b, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_a, options), (floor_a, options), (&a_plus_b, options)],
false,
),
(
vec![
vec![(col_a, options), (col_b, options), (col_c, options)],
vec![(col_d, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![
(col_a, options),
(col_c, options),
(&floor_a, options),
(&a_plus_b, options),
],
false,
),
(
vec![
vec![
(col_a, options),
(col_b, options),
(col_c, options),
(col_d, options),
],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![
(col_a, options),
(col_b, options),
(&col_c, options),
(&floor_a, options),
],
true,
),
(
vec![
vec![(col_d, options), (col_b, options)],
vec![(col_c, options), (col_a, options)],
],
vec![vec![col_a, col_f]],
vec![col_e],
vec![(col_c, options), (col_d, options), (&a_plus_b, options)],
true,
),
];
for (orderings, eq_group, constants, reqs, expected) in test_cases {
let err_msg =
format!("error in test orderings: {orderings:?}, eq_group: {eq_group:?}, constants: {constants:?}, reqs: {reqs:?}, expected: {expected:?}");
let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
let orderings = convert_to_orderings(&orderings);
eq_properties.add_new_orderings(orderings);
let eq_group = eq_group
.into_iter()
.map(|eq_class| {
let eq_classes = eq_class.into_iter().cloned().collect::<Vec<_>>();
EquivalenceClass::new(eq_classes)
})
.collect::<Vec<_>>();
let eq_group = EquivalenceGroup::new(eq_group);
eq_properties.add_equivalence_group(eq_group);
let constants = constants.into_iter().cloned();
eq_properties = eq_properties.add_constants(constants);
let reqs = convert_to_sort_exprs(&reqs);
assert_eq!(
eq_properties.ordering_satisfy(&reqs),
expected,
"{}",
err_msg
);
}
Ok(())
}
#[test]
fn test_ordering_satisfy_with_equivalence_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 5;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
const SORT_OPTIONS: SortOptions = SortOptions {
descending: false,
nulls_first: false,
};
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let col_exprs = vec![
col("a", &test_schema)?,
col("b", &test_schema)?,
col("c", &test_schema)?,
col("d", &test_schema)?,
col("e", &test_schema)?,
col("f", &test_schema)?,
];
for n_req in 0..=col_exprs.len() {
for exprs in col_exprs.iter().combinations(n_req) {
let requirement = exprs
.into_iter()
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: SORT_OPTIONS,
})
.collect::<Vec<_>>();
let expected = is_table_same_after_sort(
requirement.clone(),
table_data_with_properties.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
);
assert_eq!(
eq_properties.ordering_satisfy(&requirement),
expected,
"{}",
err_msg
);
}
}
}
Ok(())
}
#[test]
fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 100;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
const SORT_OPTIONS: SortOptions = SortOptions {
descending: false,
nulls_first: false,
};
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let floor_a = create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let exprs = vec![
col("a", &test_schema)?,
col("b", &test_schema)?,
col("c", &test_schema)?,
col("d", &test_schema)?,
col("e", &test_schema)?,
col("f", &test_schema)?,
floor_a,
a_plus_b,
];
for n_req in 0..=exprs.len() {
for exprs in exprs.iter().combinations(n_req) {
let requirement = exprs
.into_iter()
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: SORT_OPTIONS,
})
.collect::<Vec<_>>();
let expected = is_table_same_after_sort(
requirement.clone(),
table_data_with_properties.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
);
assert_eq!(
eq_properties.ordering_satisfy(&requirement),
(expected | false),
"{}",
err_msg
);
}
}
}
Ok(())
}
#[test]
fn test_ordering_satisfy_different_lengths() -> Result<()> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let options = SortOptions {
descending: false,
nulls_first: false,
};
let mut eq_properties = EquivalenceProperties::new(test_schema);
eq_properties.add_equal_conditions(col_a, col_c);
let orderings = vec![
vec![(col_a, options)],
vec![(col_e, options)],
vec![(col_d, options), (col_f, options)],
];
let orderings = convert_to_orderings(&orderings);
eq_properties.add_new_orderings(orderings);
let test_cases = vec![
(
vec![(col_c, options), (col_a, options), (col_e, options)],
true,
),
(vec![(col_c, options), (col_b, options)], false),
(vec![(col_c, options), (col_d, options)], true),
(
vec![(col_d, options), (col_f, options), (col_b, options)],
false,
),
(vec![(col_d, options), (col_f, options)], true),
];
for (reqs, expected) in test_cases {
let err_msg =
format!("error in test reqs: {:?}, expected: {:?}", reqs, expected,);
let reqs = convert_to_sort_exprs(&reqs);
assert_eq!(
eq_properties.ordering_satisfy(&reqs),
expected,
"{}",
err_msg
);
}
Ok(())
}
#[test]
fn test_bridge_groups() -> Result<()> {
let test_cases = vec![
(
vec![vec![1, 2, 3], vec![2, 4, 5], vec![11, 12, 9], vec![7, 6, 5]],
vec![vec![1, 2, 3, 4, 5, 6, 7], vec![9, 11, 12]],
),
(
vec![vec![1, 2, 3], vec![3, 4, 5], vec![9, 8, 7], vec![7, 6, 5]],
vec![vec![1, 2, 3, 4, 5, 6, 7, 8, 9]],
),
];
for (entries, expected) in test_cases {
let entries = entries
.into_iter()
.map(|entry| entry.into_iter().map(lit).collect::<Vec<_>>())
.map(EquivalenceClass::new)
.collect::<Vec<_>>();
let expected = expected
.into_iter()
.map(|entry| entry.into_iter().map(lit).collect::<Vec<_>>())
.map(EquivalenceClass::new)
.collect::<Vec<_>>();
let mut eq_groups = EquivalenceGroup::new(entries.clone());
eq_groups.bridge_classes();
let eq_groups = eq_groups.classes;
let err_msg = format!(
"error in test entries: {:?}, expected: {:?}, actual:{:?}",
entries, expected, eq_groups
);
assert_eq!(eq_groups.len(), expected.len(), "{}", err_msg);
for idx in 0..eq_groups.len() {
assert_eq!(&eq_groups[idx], &expected[idx], "{}", err_msg);
}
}
Ok(())
}
#[test]
fn test_remove_redundant_entries_eq_group() -> Result<()> {
let entries = vec![
EquivalenceClass::new(vec![lit(1), lit(1), lit(2)]),
EquivalenceClass::new(vec![lit(3), lit(3)]),
EquivalenceClass::new(vec![lit(4), lit(5), lit(6)]),
];
let expected = vec![
EquivalenceClass::new(vec![lit(1), lit(2)]),
EquivalenceClass::new(vec![lit(4), lit(5), lit(6)]),
];
let mut eq_groups = EquivalenceGroup::new(entries);
eq_groups.remove_redundant_entries();
let eq_groups = eq_groups.classes;
assert_eq!(eq_groups.len(), expected.len());
assert_eq!(eq_groups.len(), 2);
assert_eq!(eq_groups[0], expected[0]);
assert_eq!(eq_groups[1], expected[1]);
Ok(())
}
#[test]
fn test_remove_redundant_entries_oeq_class() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let col_e = &col("e", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let test_cases = vec![
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
],
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_desc)],
vec![(col_a, option_asc)],
vec![(col_a, option_asc), (col_c, option_asc)],
],
vec![
vec![(col_a, option_asc), (col_b, option_desc)],
vec![(col_a, option_asc), (col_c, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![(col_a, option_asc)],
],
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
),
(
vec![vec![]],
vec![],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_b, option_asc)],
],
vec![
vec![(col_a, option_asc)],
vec![(col_b, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc), (col_a, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
vec![
vec![(col_b, option_asc), (col_a, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![(col_d, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc), (col_e, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![
(col_d, option_asc),
(col_b, option_asc),
(col_e, option_asc),
(col_c, option_asc),
(col_a, option_asc),
],
],
vec![
vec![(col_b, option_asc), (col_e, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
vec![(col_d, option_asc)],
],
),
(
vec![
vec![(col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![
(col_d, option_asc),
(col_a, option_asc),
(col_b, option_asc),
],
],
vec![
vec![(col_b, option_asc)],
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![(col_d, option_asc)],
],
),
];
for (orderings, expected) in test_cases {
let orderings = convert_to_orderings(&orderings);
let expected = convert_to_orderings(&expected);
let actual = OrderingEquivalenceClass::new(orderings.clone());
let actual = actual.orderings;
let err_msg = format!(
"orderings: {:?}, expected: {:?}, actual :{:?}",
orderings, expected, actual
);
assert_eq!(actual.len(), expected.len(), "{}", err_msg);
for elem in actual {
assert!(expected.contains(&elem), "{}", err_msg);
}
}
Ok(())
}
#[test]
fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> {
let join_type = JoinType::Inner;
let child_fields: Fields = ["x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let child_schema = Schema::new(child_fields);
let col_x = &col("x", &child_schema)?;
let col_y = &col("y", &child_schema)?;
let col_z = &col("z", &child_schema)?;
let col_w = &col("w", &child_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let mut right_oeq_class = OrderingEquivalenceClass::new(orderings);
let left_columns_len = 4;
let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();
let schema = Schema::new(fields);
let col_a = &col("a", &schema)?;
let col_d = &col("d", &schema)?;
let col_x = &col("x", &schema)?;
let col_y = &col("y", &schema)?;
let col_z = &col("z", &schema)?;
let col_w = &col("w", &schema)?;
let mut join_eq_properties = EquivalenceProperties::new(Arc::new(schema));
join_eq_properties.add_equal_conditions(col_a, col_x);
join_eq_properties.add_equal_conditions(col_d, col_w);
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
&join_type,
left_columns_len,
);
join_eq_properties.add_ordering_equivalence_class(right_oeq_class);
let result = join_eq_properties.oeq_class().clone();
let orderings = vec![
vec![(col_x, option_asc), (col_y, option_asc)],
vec![(col_z, option_asc), (col_w, option_asc)],
];
let orderings = convert_to_orderings(&orderings);
let expected = OrderingEquivalenceClass::new(orderings);
assert_eq!(result, expected);
Ok(())
}
fn is_table_same_after_sort(
mut required_ordering: Vec<PhysicalSortExpr>,
batch: RecordBatch,
) -> Result<bool> {
let original_schema = batch.schema();
let mut columns = batch.columns().to_vec();
let n_row = batch.num_rows();
let vals: Vec<usize> = (0..n_row).collect::<Vec<_>>();
let vals: Vec<f64> = vals.into_iter().map(|val| val as f64).collect();
let unique_col = Arc::new(Float64Array::from_iter_values(vals)) as ArrayRef;
columns.push(unique_col.clone());
let unique_col_name = "unique";
let unique_field =
Arc::new(Field::new(unique_col_name, DataType::Float64, false));
let fields: Vec<_> = original_schema
.fields()
.iter()
.cloned()
.chain(std::iter::once(unique_field))
.collect();
let schema = Arc::new(Schema::new(fields));
let new_batch = RecordBatch::try_new(schema.clone(), columns)?;
required_ordering.push(PhysicalSortExpr {
expr: Arc::new(Column::new(unique_col_name, original_schema.fields().len())),
options: Default::default(),
});
let sort_columns = required_ordering
.iter()
.map(|order_expr| {
let expr_result = order_expr.expr.evaluate(&new_batch)?;
let values = expr_result.into_array(new_batch.num_rows())?;
Ok(SortColumn {
values,
options: Some(order_expr.options),
})
})
.collect::<Result<Vec<_>>>()?;
let sorted_indices = lexsort_to_indices(&sort_columns, None)?;
let original_indices = UInt32Array::from_iter_values(0..n_row as u32);
Ok(sorted_indices == original_indices)
}
fn get_representative_arr(
eq_group: &EquivalenceClass,
existing_vec: &[Option<ArrayRef>],
schema: SchemaRef,
) -> Option<ArrayRef> {
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
if let Some(res) = &existing_vec[idx] {
return Some(res.clone());
}
}
None
}
fn generate_table_for_eq_properties(
eq_properties: &EquivalenceProperties,
n_elem: usize,
n_distinct: usize,
) -> Result<RecordBatch> {
let mut rng = StdRng::seed_from_u64(23);
let schema = eq_properties.schema();
let mut schema_vec = vec![None; schema.fields.len()];
let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef {
let values: Vec<f64> = (0..num_elems)
.map(|_| rng.gen_range(0..max_val) as f64 / 2.0)
.collect();
Arc::new(Float64Array::from_iter_values(values))
};
for constant in &eq_properties.constants {
let col = constant.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = Arc::new(Float64Array::from_iter_values(vec![0 as f64; n_elem]))
as ArrayRef;
schema_vec[idx] = Some(arr);
}
for ordering in eq_properties.oeq_class.iter() {
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
(
SortColumn {
values: arr,
options: Some(*options),
},
idx,
)
})
.unzip();
let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
for (idx, arr) in izip!(indices, sort_arrs) {
schema_vec[idx] = Some(arr);
}
}
for eq_group in eq_properties.eq_group.iter() {
let representative_array =
get_representative_arr(eq_group, &schema_vec, schema.clone())
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
schema_vec[idx] = Some(representative_array.clone());
}
}
let res: Vec<_> = schema_vec
.into_iter()
.zip(schema.fields.iter())
.map(|(elem, field)| {
(
field.name(),
elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)),
)
})
.collect();
Ok(RecordBatch::try_from_iter(res)?)
}
#[test]
fn test_schema_normalize_expr_with_equivalence() -> Result<()> {
let col_a = &Column::new("a", 0);
let col_b = &Column::new("b", 1);
let col_c = &Column::new("c", 2);
let (_test_schema, eq_properties) = create_test_params()?;
let col_a_expr = Arc::new(col_a.clone()) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(col_b.clone()) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(col_c.clone()) as Arc<dyn PhysicalExpr>;
let expressions = vec![
(&col_a_expr, &col_a_expr),
(&col_c_expr, &col_a_expr),
(&col_b_expr, &col_b_expr),
];
let eq_group = eq_properties.eq_group();
for (expr, expected_eq) in expressions {
assert!(
expected_eq.eq(&eq_group.normalize_expr(expr.clone())),
"error in test: expr: {expr:?}"
);
}
Ok(())
}
#[test]
fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let test_cases = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
(vec![(col_c, None)], vec![(col_a, None)]),
(vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
];
for (reqs, expected) in test_cases.into_iter() {
let reqs = convert_to_sort_reqs(&reqs);
let expected = convert_to_sort_reqs(&expected);
let normalized = eq_properties.normalize_sort_requirements(&reqs);
assert!(
expected.eq(&normalized),
"error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
);
}
Ok(())
}
#[test]
fn test_normalize_sort_reqs() -> Result<()> {
let (test_schema, eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let requirements = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(
vec![(col_a, Some(option_desc))],
vec![(col_a, Some(option_desc))],
),
(vec![(col_a, None)], vec![(col_a, None)]),
(
vec![(col_c, Some(option_asc))],
vec![(col_a, Some(option_asc))],
),
(vec![(col_c, None)], vec![(col_a, None)]),
(
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
),
(
vec![(col_d, None), (col_b, None)],
vec![(col_d, None), (col_b, None)],
),
(
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
),
(
vec![(col_e, Some(option_desc)), (col_f, None)],
vec![(col_e, Some(option_desc)), (col_f, None)],
),
(
vec![(col_e, None), (col_f, None)],
vec![(col_e, None), (col_f, None)],
),
];
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_sort_reqs(&reqs);
let expected_normalized = convert_to_sort_reqs(&expected_normalized);
assert_eq!(
eq_properties.normalize_sort_requirements(&req),
expected_normalized
);
}
Ok(())
}
#[test]
fn test_get_finer() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, Some(option_asc))],
vec![(col_a, None), (col_b, Some(option_asc))],
Some(vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))]),
),
(
vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
],
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
Some(vec![
(col_a, Some(option_asc)),
(col_b, Some(option_asc)),
(col_c, Some(option_asc)),
]),
),
(
vec![(col_a, Some(option_asc)), (col_b, Some(option_asc))],
vec![(col_a, Some(option_asc)), (col_b, Some(option_desc))],
None,
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_reqs(&lhs);
let rhs = convert_to_sort_reqs(&rhs);
let expected = expected.map(|expected| convert_to_sort_reqs(&expected));
let finer = eq_properties.get_finer_requirement(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_get_meet_ordering() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let eq_properties = EquivalenceProperties::new(schema);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let tests_cases = vec![
(
vec![(col_a, option_asc)],
vec![(col_a, option_asc), (col_b, option_asc)],
Some(vec![(col_a, option_asc)]),
),
(vec![(col_a, option_asc)], vec![(col_a, option_desc)], None),
(
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_a, option_asc), (col_b, option_desc)],
Some(vec![(col_a, option_asc)]),
),
];
for (lhs, rhs, expected) in tests_cases {
let lhs = convert_to_sort_exprs(&lhs);
let rhs = convert_to_sort_exprs(&rhs);
let expected = expected.map(|expected| convert_to_sort_exprs(&expected));
let finer = eq_properties.get_meet_ordering(&lhs, &rhs);
assert_eq!(finer, expected)
}
Ok(())
}
#[test]
fn test_find_longest_permutation() -> Result<()> {
let (test_schema, mut eq_properties) = create_test_params()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_h = &col("h", &test_schema)?;
let a_plus_d = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
},
PhysicalSortExpr {
expr: col_h.clone(),
options: option_desc,
},
]]);
let test_cases = vec![
(vec![col_a], vec![(col_a, option_asc)]),
(vec![col_c], vec![(col_c, option_asc)]),
(
vec![col_d, col_e, col_b],
vec![
(col_d, option_asc),
(col_e, option_desc),
(col_b, option_asc),
],
),
(vec![col_b], vec![]),
(vec![col_d], vec![(col_d, option_asc)]),
(vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
(
vec![col_b, col_d],
vec![(col_d, option_asc), (col_b, option_asc)],
),
(
vec![col_c, col_e],
vec![(col_c, option_asc), (col_e, option_desc)],
),
];
for (exprs, expected) in test_cases {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = convert_to_sort_exprs(&expected);
let (actual, _) = eq_properties.find_longest_permutation(&exprs);
assert_eq!(actual, expected);
}
Ok(())
}
#[test]
fn test_find_longest_permutation_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 100;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let floor_a = create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let exprs = vec![
col("a", &test_schema)?,
col("b", &test_schema)?,
col("c", &test_schema)?,
col("d", &test_schema)?,
col("e", &test_schema)?,
col("f", &test_schema)?,
floor_a,
a_plus_b,
];
for n_req in 0..=exprs.len() {
for exprs in exprs.iter().combinations(n_req) {
let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let (ordering, indices) =
eq_properties.find_longest_permutation(&exprs);
let ordering2 = indices
.iter()
.zip(ordering.iter())
.map(|(&idx, sort_expr)| PhysicalSortExpr {
expr: exprs[idx].clone(),
options: sort_expr.options,
})
.collect::<Vec<_>>();
assert_eq!(
ordering, ordering2,
"indices and lexicographical ordering do not match"
);
let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
);
assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
assert!(
is_table_same_after_sort(
ordering.clone(),
table_data_with_properties.clone(),
)?,
"{}",
err_msg
);
}
}
}
Ok(())
}
#[test]
fn test_update_ordering() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
eq_properties.add_equal_conditions(col_b, col_a);
eq_properties.add_new_orderings(vec![
vec![PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}],
vec![PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
}],
]);
let test_cases = vec![
(
Arc::new(BinaryExpr::new(
col_d.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>,
SortProperties::Ordered(option_asc),
),
(col_b.clone(), SortProperties::Ordered(option_asc)),
(col_a.clone(), SortProperties::Ordered(option_asc)),
(
Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_c.clone(),
)),
SortProperties::Unordered,
),
];
for (expr, expected) in test_cases {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_ordering = eq_properties.get_expr_ordering(expr.clone());
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
expr, expected, expr_ordering.state
);
assert_eq!(expr_ordering.state, expected, "{}", err_msg);
}
Ok(())
}
#[test]
fn test_contains_any() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let cls1 = EquivalenceClass::new(vec![lit_true.clone(), lit_false.clone()]);
let cls2 = EquivalenceClass::new(vec![lit_true.clone(), col_b_expr.clone()]);
let cls3 = EquivalenceClass::new(vec![lit2.clone(), lit1.clone()]);
assert!(cls1.contains_any(&cls2));
assert!(!cls1.contains_any(&cls3));
assert!(!cls2.contains_any(&cls3));
}
#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
}],
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
],
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);
let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0]);
Ok(())
}
#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let sort_options = SortOptions::default();
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
let others = vec![
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
];
eq_properties.add_new_orderings(others);
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_new_orderings([
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
]);
let oeq_class = eq_properties.oeq_class().clone();
let expected = expected_eqs.oeq_class();
assert!(oeq_class.eq(expected));
Ok(())
}
#[test]
fn project_orderings() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let col_e = &col("e", &schema)?;
let col_ts = &col("ts", &schema)?;
let interval = Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
as Arc<dyn PhysicalExpr>;
let date_bin_func = &create_physical_expr(
&BuiltinScalarFunction::DateBin,
&[interval, col_ts.clone()],
&schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>;
let b_plus_d = Arc::new(BinaryExpr::new(
col_b.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let b_plus_e = Arc::new(BinaryExpr::new(
col_b.clone(),
Operator::Plus,
col_e.clone(),
)) as Arc<dyn PhysicalExpr>;
let c_plus_d = Arc::new(BinaryExpr::new(
col_c.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
let test_cases = vec![
(
vec![
vec![(col_b, option_asc)],
],
vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
vec![
vec![("b_new", option_asc)],
],
),
(
vec![
],
vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
vec![
],
),
(
vec![
vec![(col_ts, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_ts, "ts_new".to_string()),
(date_bin_func, "date_bin_res".to_string()),
],
vec![
vec![("date_bin_res", option_asc)],
vec![("ts_new", option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_ts, option_asc)],
vec![(col_b, option_asc), (col_ts, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_ts, "ts_new".to_string()),
(date_bin_func, "date_bin_res".to_string()),
],
vec![
vec![("a_new", option_asc), ("ts_new", option_asc)],
vec![("a_new", option_asc), ("date_bin_res", option_asc)],
vec![("b_new", option_asc), ("ts_new", option_asc)],
vec![("b_new", option_asc), ("date_bin_res", option_asc)],
],
),
(
vec![
vec![(&a_plus_b, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(&a_plus_b, "a+b".to_string()),
],
vec![
vec![("a+b", option_asc)],
],
),
(
vec![
vec![(&a_plus_b, option_asc), (&col_c, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_c, "c_new".to_string()),
(&a_plus_b, "a+b".to_string()),
],
vec![
vec![("a+b", option_asc), ("c_new", option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_a, option_asc), (col_d, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_d, "d_new".to_string()),
(&b_plus_d, "b+d".to_string()),
],
vec![
vec![("a_new", option_asc), ("b_new", option_asc)],
vec![("a_new", option_asc), ("d_new", option_asc)],
vec![("a_new", option_asc), ("b+d", option_asc)],
],
),
(
vec![
vec![(&b_plus_d, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_d, "d_new".to_string()),
(&b_plus_d, "b+d".to_string()),
],
vec![
vec![("b+d", option_asc)],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_d, option_asc),
(col_b, option_asc),
],
vec![(col_c, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_d, "d_new".to_string()),
(col_c, "c_new".to_string()),
],
vec![
vec![
("a_new", option_asc),
("d_new", option_asc),
("b_new", option_asc),
],
vec![("c_new", option_asc)],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![(col_a, option_asc), (col_d, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_c, "c_new".to_string()),
(&c_plus_d, "c+d".to_string()),
],
vec![
vec![
("a_new", option_asc),
("b_new", option_asc),
("c_new", option_asc),
],
vec![
("a_new", option_asc),
("b_new", option_asc),
("c+d", option_asc),
],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_a, option_asc), (col_d, option_asc)],
],
vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(&b_plus_d, "b+d".to_string()),
],
vec![
vec![("a_new", option_asc), ("b_new", option_asc)],
vec![("a_new", option_asc), ("b+d", option_asc)],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
],
vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
vec![
vec![("a_new", option_asc)],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
],
vec![
(col_a, option_asc),
(&a_plus_b, option_asc),
(col_c, option_asc),
],
],
vec![
(col_c, "c_new".to_string()),
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(&a_plus_b, "a+b".to_string()),
],
vec![
vec![
("a_new", option_asc),
("b_new", option_asc),
("c_new", option_asc),
],
vec![
("a_new", option_asc),
("a+b", option_asc),
("c_new", option_asc),
],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_c, option_asc), (col_b, option_asc)],
vec![(col_d, option_asc), (col_e, option_asc)],
],
vec![
(col_c, "c_new".to_string()),
(col_d, "d_new".to_string()),
(col_a, "a_new".to_string()),
(&b_plus_e, "b+e".to_string()),
],
vec![
vec![
("a_new", option_asc),
("d_new", option_asc),
("b+e", option_asc),
],
vec![
("d_new", option_asc),
("a_new", option_asc),
("b+e", option_asc),
],
vec![
("c_new", option_asc),
("d_new", option_asc),
("b+e", option_asc),
],
vec![
("d_new", option_asc),
("c_new", option_asc),
("b+e", option_asc),
],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_c, option_asc),
(&col_b, option_asc),
],
],
vec![
(col_c, "c_new".to_string()),
(col_a, "a_new".to_string()),
(&a_plus_b, "a+b".to_string()),
],
vec![
vec![
("a_new", option_asc),
("c_new", option_asc),
("a+b", option_asc),
],
],
),
(
vec![
vec![(col_a, option_asc), (col_b, option_asc)],
vec![(col_c, option_asc), (col_b, option_desc)],
vec![(col_e, option_asc)],
],
vec![
(col_c, "c_new".to_string()),
(col_a, "a_new".to_string()),
(col_b, "b_new".to_string()),
(&b_plus_e, "b+e".to_string()),
],
vec![
vec![("a_new", option_asc), ("b_new", option_asc)],
vec![("a_new", option_asc), ("b+e", option_asc)],
vec![("c_new", option_asc), ("b_new", option_desc)],
],
),
];
for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
{
let mut eq_properties = EquivalenceProperties::new(schema.clone());
let orderings = convert_to_orderings(&orderings);
eq_properties.add_new_orderings(orderings);
let proj_exprs = proj_exprs
.into_iter()
.map(|(expr, name)| (expr.clone(), name))
.collect::<Vec<_>>();
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?;
let output_schema = output_schema(&projection_mapping, &schema)?;
let expected = expected
.into_iter()
.map(|ordering| {
ordering
.into_iter()
.map(|(name, options)| {
(col(name, &output_schema).unwrap(), options)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let expected = convert_to_orderings_owned(&expected);
let projected_eq = eq_properties.project(&projection_mapping, output_schema);
let orderings = projected_eq.oeq_class();
let err_msg = format!(
"test_idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}",
idx, orderings.orderings, expected, projection_mapping
);
assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
for expected_ordering in &expected {
assert!(orderings.contains(expected_ordering), "{}", err_msg)
}
}
Ok(())
}
#[test]
fn project_orderings2() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_ts = &col("ts", &schema)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>;
let interval = Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
as Arc<dyn PhysicalExpr>;
let date_bin_ts = &create_physical_expr(
&BuiltinScalarFunction::DateBin,
&[interval, col_ts.clone()],
&schema,
&ExecutionProps::default(),
)?;
let round_c = &create_physical_expr(
&BuiltinScalarFunction::Round,
&[col_c.clone()],
&schema,
&ExecutionProps::default(),
)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let proj_exprs = vec![
(col_b, "b_new".to_string()),
(col_a, "a_new".to_string()),
(col_c, "c_new".to_string()),
(date_bin_ts, "date_bin_res".to_string()),
(round_c, "round_c_res".to_string()),
];
let proj_exprs = proj_exprs
.into_iter()
.map(|(expr, name)| (expr.clone(), name))
.collect::<Vec<_>>();
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?;
let output_schema = output_schema(&projection_mapping, &schema)?;
let col_a_new = &col("a_new", &output_schema)?;
let col_b_new = &col("b_new", &output_schema)?;
let col_c_new = &col("c_new", &output_schema)?;
let col_date_bin_res = &col("date_bin_res", &output_schema)?;
let col_round_c_res = &col("round_c_res", &output_schema)?;
let a_new_plus_b_new = Arc::new(BinaryExpr::new(
col_a_new.clone(),
Operator::Plus,
col_b_new.clone(),
)) as Arc<dyn PhysicalExpr>;
let test_cases = vec![
(
vec![
vec![(col_a, option_asc)],
],
vec![
vec![(col_a_new, option_asc)],
],
),
(
vec![
vec![(&a_plus_b, option_asc)],
],
vec![
vec![(&a_new_plus_b_new, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_ts, option_asc)],
],
vec![
vec![(col_a_new, option_asc), (col_date_bin_res, option_asc)],
],
),
(
vec![
vec![
(col_a, option_asc),
(col_ts, option_asc),
(col_b, option_asc),
],
],
vec![
vec![(col_a_new, option_asc), (col_date_bin_res, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc), (col_c, option_asc)],
],
vec![
vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
vec![(col_a_new, option_asc), (col_c_new, option_asc)],
],
),
(
vec![
vec![(col_c, option_asc), (col_b, option_asc)],
],
vec![
vec![(col_round_c_res, option_asc)],
vec![(col_c_new, option_asc), (col_b_new, option_asc)],
],
),
(
vec![
vec![(&a_plus_b, option_asc), (col_c, option_asc)],
],
vec![
vec![
(&a_new_plus_b_new, option_asc),
(&col_round_c_res, option_asc),
],
vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
],
),
];
for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
let mut eq_properties = EquivalenceProperties::new(schema.clone());
let orderings = convert_to_orderings(orderings);
eq_properties.add_new_orderings(orderings);
let expected = convert_to_orderings(expected);
let projected_eq =
eq_properties.project(&projection_mapping, output_schema.clone());
let orderings = projected_eq.oeq_class();
let err_msg = format!(
"test idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}",
idx, orderings.orderings, expected, projection_mapping
);
assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
for expected_ordering in &expected {
assert!(orderings.contains(expected_ordering), "{}", err_msg)
}
}
Ok(())
}
#[test]
fn project_orderings3() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
Field::new("f", DataType::Int32, true),
]));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let col_e = &col("e", &schema)?;
let col_f = &col("f", &schema)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_b.clone(),
)) as Arc<dyn PhysicalExpr>;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let proj_exprs = vec![
(col_c, "c_new".to_string()),
(col_d, "d_new".to_string()),
(&a_plus_b, "a+b".to_string()),
];
let proj_exprs = proj_exprs
.into_iter()
.map(|(expr, name)| (expr.clone(), name))
.collect::<Vec<_>>();
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?;
let output_schema = output_schema(&projection_mapping, &schema)?;
let col_a_plus_b_new = &col("a+b", &output_schema)?;
let col_c_new = &col("c_new", &output_schema)?;
let col_d_new = &col("d_new", &output_schema)?;
let test_cases = vec![
(
vec![
vec![(col_d, option_asc), (col_b, option_asc)],
vec![(col_c, option_asc), (col_a, option_asc)],
],
vec![],
vec![
vec![
(col_d_new, option_asc),
(col_c_new, option_asc),
(col_a_plus_b_new, option_asc),
],
vec![
(col_c_new, option_asc),
(col_d_new, option_asc),
(col_a_plus_b_new, option_asc),
],
],
),
(
vec![
vec![(col_d, option_asc), (col_b, option_asc)],
vec![(col_c, option_asc), (col_e, option_asc)],
],
vec![(col_e, col_a)],
vec![
vec![
(col_d_new, option_asc),
(col_c_new, option_asc),
(col_a_plus_b_new, option_asc),
],
vec![
(col_c_new, option_asc),
(col_d_new, option_asc),
(col_a_plus_b_new, option_asc),
],
],
),
(
vec![
vec![(col_d, option_asc), (col_b, option_asc)],
vec![(col_c, option_asc), (col_e, option_asc)],
],
vec![(col_a, col_f)],
vec![
vec![(col_d_new, option_asc)],
vec![(col_c_new, option_asc)],
],
),
];
for (orderings, equal_columns, expected) in test_cases {
let mut eq_properties = EquivalenceProperties::new(schema.clone());
for (lhs, rhs) in equal_columns {
eq_properties.add_equal_conditions(lhs, rhs);
}
let orderings = convert_to_orderings(&orderings);
eq_properties.add_new_orderings(orderings);
let expected = convert_to_orderings(&expected);
let projected_eq =
eq_properties.project(&projection_mapping, output_schema.clone());
let orderings = projected_eq.oeq_class();
let err_msg = format!(
"actual: {:?}, expected: {:?}, projection_mapping: {:?}",
orderings.orderings, expected, projection_mapping
);
assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
for expected_ordering in &expected {
assert!(orderings.contains(expected_ordering), "{}", err_msg)
}
}
Ok(())
}
#[test]
fn project_orderings_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 20;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let floor_a = create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let proj_exprs = vec![
(col("a", &test_schema)?, "a_new"),
(col("b", &test_schema)?, "b_new"),
(col("c", &test_schema)?, "c_new"),
(col("d", &test_schema)?, "d_new"),
(col("e", &test_schema)?, "e_new"),
(col("f", &test_schema)?, "f_new"),
(floor_a, "floor(a)"),
(a_plus_b, "a+b"),
];
for n_req in 0..=proj_exprs.len() {
for proj_exprs in proj_exprs.iter().combinations(n_req) {
let proj_exprs = proj_exprs
.into_iter()
.map(|(expr, name)| (expr.clone(), name.to_string()))
.collect::<Vec<_>>();
let (projected_batch, projected_eq) = apply_projection(
proj_exprs.clone(),
&table_data_with_properties,
&eq_properties,
)?;
for ordering in projected_eq.oeq_class().iter() {
let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, proj_exprs: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, proj_exprs
);
assert!(
is_table_same_after_sort(
ordering.clone(),
projected_batch.clone(),
)?,
"{}",
err_msg
);
}
}
}
}
Ok(())
}
#[test]
fn ordering_satisfy_after_projection_random() -> Result<()> {
const N_RANDOM_SCHEMA: usize = 20;
const N_ELEMENTS: usize = 125;
const N_DISTINCT: usize = 5;
const SORT_OPTIONS: SortOptions = SortOptions {
descending: false,
nulls_first: false,
};
for seed in 0..N_RANDOM_SCHEMA {
let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?;
let floor_a = create_physical_expr(
&BuiltinScalarFunction::Floor,
&[col("a", &test_schema)?],
&test_schema,
&ExecutionProps::default(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Operator::Plus,
col("b", &test_schema)?,
)) as Arc<dyn PhysicalExpr>;
let proj_exprs = vec![
(col("a", &test_schema)?, "a_new"),
(col("b", &test_schema)?, "b_new"),
(col("c", &test_schema)?, "c_new"),
(col("d", &test_schema)?, "d_new"),
(col("e", &test_schema)?, "e_new"),
(col("f", &test_schema)?, "f_new"),
(floor_a, "floor(a)"),
(a_plus_b, "a+b"),
];
for n_req in 0..=proj_exprs.len() {
for proj_exprs in proj_exprs.iter().combinations(n_req) {
let proj_exprs = proj_exprs
.into_iter()
.map(|(expr, name)| (expr.clone(), name.to_string()))
.collect::<Vec<_>>();
let (projected_batch, projected_eq) = apply_projection(
proj_exprs.clone(),
&table_data_with_properties,
&eq_properties,
)?;
let projection_mapping =
ProjectionMapping::try_new(&proj_exprs, &test_schema)?;
let projected_exprs = projection_mapping
.iter()
.map(|(_source, target)| target.clone())
.collect::<Vec<_>>();
for n_req in 0..=projected_exprs.len() {
for exprs in projected_exprs.iter().combinations(n_req) {
let requirement = exprs
.into_iter()
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: SORT_OPTIONS,
})
.collect::<Vec<_>>();
let expected = is_table_same_after_sort(
requirement.clone(),
projected_batch.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants, projection_mapping
);
assert_eq!(
projected_eq.ordering_satisfy(&requirement),
expected,
"{}",
err_msg
);
}
}
}
}
}
Ok(())
}
#[test]
fn test_expr_consists_of_constants() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_d = col("d", &schema)?;
let b_plus_d = Arc::new(BinaryExpr::new(
col_b.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
let constants = vec![col_a.clone(), col_b.clone()];
let expr = b_plus_d.clone();
assert!(!is_constant_recurse(&constants, &expr));
let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
let expr = b_plus_d.clone();
assert!(is_constant_recurse(&constants, &expr));
Ok(())
}
#[test]
fn test_join_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let offset = schema.fields.len();
let col_a2 = &add_offset_to_expr(col_a.clone(), offset);
let col_b2 = &add_offset_to_expr(col_b.clone(), offset);
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
let test_cases = vec![
(
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
],
),
(
vec![
vec![(col_a, option_asc)],
vec![(col_b, option_asc)],
vec![(col_c, option_asc)],
],
vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
vec![
vec![(col_a, option_asc), (col_a2, option_asc)],
vec![(col_a, option_asc), (col_b2, option_asc)],
vec![(col_b, option_asc), (col_a2, option_asc)],
vec![(col_b, option_asc), (col_b2, option_asc)],
vec![(col_c, option_asc), (col_a2, option_asc)],
vec![(col_c, option_asc), (col_b2, option_asc)],
],
),
];
for (left_orderings, right_orderings, expected) in test_cases {
let mut left_eq_properties = EquivalenceProperties::new(schema.clone());
let mut right_eq_properties = EquivalenceProperties::new(schema.clone());
let left_orderings = convert_to_orderings(&left_orderings);
let right_orderings = convert_to_orderings(&right_orderings);
let expected = convert_to_orderings(&expected);
left_eq_properties.add_new_orderings(left_orderings);
right_eq_properties.add_new_orderings(right_orderings);
let join_eq = join_equivalence_properties(
left_eq_properties,
right_eq_properties,
&JoinType::Inner,
Arc::new(Schema::empty()),
&[true, false],
Some(JoinSide::Left),
&[],
);
let orderings = &join_eq.oeq_class.orderings;
let err_msg = format!("expected: {:?}, actual:{:?}", expected, orderings);
assert_eq!(
join_eq.oeq_class.orderings.len(),
expected.len(),
"{}",
err_msg
);
for ordering in orderings {
assert!(
expected.contains(ordering),
"{}, ordering: {:?}",
err_msg,
ordering
);
}
}
Ok(())
}
}