mod dependency; mod joins; mod union;
use dependency::{
construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
Dependencies, DependencyMap,
};
pub use joins::*;
pub use union::*;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::{fmt, mem};
use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::{
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
use crate::{
physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr,
PhysicalSortExpr, PhysicalSortRequirement,
};
use arrow::compute::SortOptions;
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::IndexSet;
use itertools::Itertools;
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
eq_group: EquivalenceGroup,
oeq_class: OrderingEquivalenceClass,
constants: Vec<ConstExpr>,
constraints: Constraints,
schema: SchemaRef,
}
impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
constraints: Constraints::empty(),
schema,
}
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}
pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
constants: vec![],
constraints: Constraints::empty(),
schema,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
&self.oeq_class
}
pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
self.oeq_class
}
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
}
pub fn constants(&self) -> &[ConstExpr] {
&self.constants
}
pub fn constraints(&self) -> &Constraints {
&self.constraints
}
pub fn output_ordering(&self) -> Option<LexOrdering> {
let constants = self.constants();
let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
output_ordering
.retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
(!output_ordering.is_empty()).then_some(output_ordering)
}
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
)
}
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
self.oeq_class.extend(other.oeq_class);
self.with_constants(other.constants)
}
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
}
pub fn clear_per_partition_constants(&mut self) {
self.constants.retain(|item| {
matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
})
}
pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
self.oeq_class.extend(other);
}
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
self.oeq_class.add_new_orderings(orderings);
}
pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
self.add_new_orderings([ordering]);
}
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
}
pub fn add_equal_conditions(
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) -> Result<()> {
if self.is_expr_constant(left) {
if !const_exprs_contains(&self.constants, right) {
let const_expr = ConstExpr::from(right)
.with_across_partitions(self.get_expr_constant_value(left));
self.constants.push(const_expr);
}
} else if self.is_expr_constant(right) {
if !const_exprs_contains(&self.constants, left) {
let const_expr = ConstExpr::from(left)
.with_across_partitions(self.get_expr_constant_value(right));
self.constants.push(const_expr);
}
}
self.eq_group.add_equal_conditions(left, right);
self.discover_new_orderings(left)?;
Ok(())
}
#[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) -> Self {
self.with_constants(constants)
}
pub fn remove_constant(mut self, c: &ConstExpr) -> Self {
self.constants.retain(|existing| existing != c);
self
}
pub fn with_constants(
mut self,
constants: impl IntoIterator<Item = ConstExpr>,
) -> Self {
let normalized_constants = constants
.into_iter()
.filter_map(|c| {
let across_partitions = c.across_partitions();
let expr = c.owned_expr();
let normalized_expr = self.eq_group.normalize_expr(expr);
if const_exprs_contains(&self.constants, &normalized_expr) {
return None;
}
let const_expr = ConstExpr::from(normalized_expr)
.with_across_partitions(across_partitions);
Some(const_expr)
})
.collect::<Vec<_>>();
self.constants.extend(normalized_constants);
for ordering in self.normalized_oeq_class().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
}
self
}
fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
let eq_class = self
.eq_group
.iter()
.find_map(|class| {
class
.contains(&normalized_expr)
.then(|| class.clone().into_vec())
})
.unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
let mut new_orderings: Vec<LexOrdering> = vec![];
for ordering in self.normalized_oeq_class().iter() {
if !ordering[0].expr.eq(&normalized_expr) {
continue;
}
let leading_ordering_options = ordering[0].options;
for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
if children.is_empty() {
continue;
}
let mut all_children_match = true;
let mut child_properties = vec![];
for (i, child) in children.iter().enumerate() {
if let Some(next) = ordering.get(i + 1) {
if !child.as_ref().eq(next.expr.as_ref()) {
all_children_match = false;
break;
}
child_properties.push(ExprProperties {
sort_properties: SortProperties::Ordered(next.options),
range: Interval::make_unbounded(
&child.data_type(&self.schema)?,
)?,
preserves_lex_ordering: true,
});
} else {
all_children_match = false;
break;
}
}
if all_children_match {
if let Ok(expr_properties) =
equivalent_expr.get_properties(&child_properties)
{
if expr_properties.preserves_lex_ordering
&& SortProperties::Ordered(leading_ordering_options)
== expr_properties.sort_properties
{
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
break;
}
}
}
}
}
self.oeq_class.add_new_orderings(new_orderings);
Ok(())
}
pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
let filtered_exprs = LexOrdering::new(
sort_exprs
.into_iter()
.filter(|expr| !self.is_expr_constant(&expr.expr))
.collect(),
);
if filtered_exprs.is_empty() {
return self;
}
let mut new_orderings = vec![filtered_exprs.clone()];
let oeq_class = mem::take(&mut self.oeq_class);
for existing in oeq_class {
if self.is_prefix_of(&filtered_exprs, &existing) {
let mut extended = filtered_exprs.clone();
extended.extend(existing.into_iter().skip(filtered_exprs.len()));
new_orderings.push(extended);
}
}
self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
self
}
fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> bool {
if new_order.len() > existing.len() {
return false;
}
new_order.iter().zip(existing).all(|(new, existing)| {
self.eq_group.exprs_equal(&new.expr, &existing.expr)
&& new.options == existing.options
})
}
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
LexOrdering::from(normalized_sort_reqs)
}
fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> LexRequirement {
let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
let mut constant_exprs = vec![];
constant_exprs.extend(
self.constants
.iter()
.map(|const_expr| Arc::clone(const_expr.expr())),
);
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
normalized_sort_reqs
.iter()
.filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
.cloned()
.collect::<LexRequirement>()
.collapse()
}
pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
let sort_requirements = LexRequirement::from(given.clone());
self.ordering_satisfy_requirement(&sort_requirements)
}
pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
let mut eq_properties = self.clone();
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
if self.satisfied_by_constraints(&normalized_reqs) {
return true;
}
for normalized_req in normalized_reqs {
if !eq_properties.ordering_satisfy_single(&normalized_req) {
return false;
}
eq_properties = eq_properties
.with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
}
true
}
fn satisfied_by_constraints(
&self,
normalized_reqs: &[PhysicalSortRequirement],
) -> bool {
self.constraints.iter().any(|constraint| match constraint {
Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self
.satisfied_by_constraint(
normalized_reqs,
indices,
matches!(constraint, Constraint::Unique(_)),
),
})
}
fn satisfied_by_constraint(
&self,
normalized_reqs: &[PhysicalSortRequirement],
indices: &[usize],
check_null: bool,
) -> bool {
if indices.len() > normalized_reqs.len() {
return false;
}
self.oeq_class.iter().any(|ordering| {
if indices.len() > ordering.len() {
return false;
}
let mut col_positions = HashMap::with_capacity(ordering.len());
for (pos, req) in ordering.iter().enumerate() {
if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
col_positions.insert(
col.index(),
(pos, col.nullable(&self.schema).unwrap_or(true)),
);
}
}
if !indices.iter().all(|&idx| {
col_positions
.get(&idx)
.map(|&(pos, nullable)| {
!check_null
|| (pos != 0 && pos != ordering.len() - 1)
|| !nullable
})
.unwrap_or(false)
}) {
return false;
}
let ordering_len = ordering.len();
normalized_reqs.len() >= ordering_len
&& normalized_reqs[..ordering_len].iter().zip(ordering).all(
|(req, existing)| {
req.expr.eq(&existing.expr)
&& req
.options
.is_none_or(|req_opts| req_opts == existing.options)
},
)
})
}
fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
let ExprProperties {
sort_properties, ..
} = self.get_expr_properties(Arc::clone(&req.expr));
match sort_properties {
SortProperties::Ordered(options) => {
let sort_expr = PhysicalSortExpr {
expr: Arc::clone(&req.expr),
options,
};
sort_expr.satisfy(req, self.schema())
}
SortProperties::Singleton => true,
SortProperties::Unordered => false,
}
}
pub fn requirements_compatible(
&self,
given: &LexRequirement,
reference: &LexRequirement,
) -> bool {
let normalized_given = self.normalize_sort_requirements(given);
let normalized_reference = self.normalize_sort_requirements(reference);
(normalized_reference.len() <= normalized_given.len())
&& normalized_reference
.into_iter()
.zip(normalized_given)
.all(|(reference, given)| given.compatible(&reference))
}
pub fn get_finer_ordering(
&self,
lhs: &LexOrdering,
rhs: &LexOrdering,
) -> Option<LexOrdering> {
let lhs = LexRequirement::from(lhs.clone());
let rhs = LexRequirement::from(rhs.clone());
let finer = self.get_finer_requirement(&lhs, &rhs);
finer.map(LexOrdering::from)
}
pub fn get_finer_requirement(
&self,
req1: &LexRequirement,
req2: &LexRequirement,
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.inner
.iter_mut()
.zip(rhs.inner.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
(Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
(Some(options), None) => {
rhs.options = Some(options);
true
}
(None, Some(options)) => {
lhs.options = Some(options);
true
}
(None, None) => true,
}
})
.then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
}
pub fn substitute_ordering_component(
&self,
mapping: &ProjectionMapping,
sort_expr: &LexOrdering,
) -> Result<Vec<LexOrdering>> {
let new_orderings = sort_expr
.iter()
.map(|sort_expr| {
let referring_exprs: Vec<_> = mapping
.iter()
.map(|(source, _target)| source)
.filter(|source| expr_refers(source, &sort_expr.expr))
.cloned()
.collect();
let mut res = LexOrdering::new(vec![sort_expr.clone()]);
for r_expr in referring_exprs {
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
let expr_type = sort_expr.expr.data_type(&self.schema)?;
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
res.push(PhysicalSortExpr {
expr: Arc::clone(&r_expr),
options: sort_expr.options,
});
}
}
}
Ok(res)
})
.collect::<Result<Vec<_>>>()?;
let res = new_orderings
.into_iter()
.multi_cartesian_product()
.map(LexOrdering::new)
.collect::<Vec<_>>();
Ok(res)
}
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let new_order = self
.oeq_class
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
Ok(())
}
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
projection_mapping: &ProjectionMapping,
) -> Option<Arc<dyn PhysicalExpr>> {
self.eq_group.project_expr(projection_mapping, expr)
}
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = DependencyMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
self.project_expr(&sort_expr.expr, mapping).map(|expr| {
PhysicalSortExpr {
expr,
options: sort_expr.options,
}
});
let is_projected = target_sort_expr.is_some();
if is_projected
|| mapping
.iter()
.any(|(source, _)| expr_refers(source, &sort_expr.expr))
{
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
dependency_map.insert(
sort_expr,
target_sort_expr.as_ref(),
dependency,
);
}
if !is_projected {
break;
}
}
}
dependency_map
}
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
ProjectionMapping {
map: mapping
.iter()
.map(|(source, target)| {
let normalized_source =
self.eq_group.normalize_expr(Arc::clone(source));
(normalized_source, Arc::clone(target))
})
.collect(),
}
}
fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
let mapping = self.normalized_mapping(mapping);
let dependency_map = self.construct_dependency_map(&mapping);
let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|relevant_deps| {
if let Ok(SortProperties::Ordered(options)) =
get_expr_properties(source, &relevant_deps, &self.schema)
.map(|prop| prop.sort_properties)
{
Some((options, relevant_deps))
} else {
None
}
})
.flat_map(|(options, relevant_deps)| {
let sort_expr = PhysicalSortExpr {
expr: Arc::clone(target),
options,
};
let mut dependency_orderings =
generate_dependency_orderings(&relevant_deps, &dependency_map);
for ordering in dependency_orderings.iter_mut() {
ordering.push(sort_expr.clone());
}
dependency_orderings
})
});
let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
if prefixes.is_empty() {
prefixes = vec![LexOrdering::default()];
}
for ordering in prefixes.iter_mut() {
if let Some(target) = &node.target_sort_expr {
ordering.push(target.clone())
}
}
prefixes
});
orderings
.chain(projected_orderings)
.map(|lex_ordering| lex_ordering.collapse())
.collect()
}
fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> {
let mut projected_constants = self
.constants
.iter()
.flat_map(|const_expr| {
const_expr
.map(|expr| self.eq_group.project_expr(mapping, expr))
.map(|projected_expr| {
projected_expr
.with_across_partitions(const_expr.across_partitions())
})
})
.collect::<Vec<_>>();
for (source, target) in mapping.iter() {
if self.is_expr_constant(source)
&& !const_exprs_contains(&projected_constants, target)
{
if self.is_expr_constant_across_partitions(source) {
projected_constants.push(
ConstExpr::from(target)
.with_across_partitions(self.get_expr_constant_value(source)),
)
} else {
projected_constants.push(
ConstExpr::from(target)
.with_across_partitions(AcrossPartitions::Heterogeneous),
)
}
}
}
projected_constants
}
fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
let indices = mapping
.iter()
.filter_map(|(_, target)| target.as_any().downcast_ref::<Column>())
.map(|col| col.index())
.collect::<Vec<_>>();
debug_assert_eq!(mapping.map.len(), indices.len());
self.constraints.project(&indices)
}
pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
let eq_group = self.eq_group.project(mapping);
let oeq_class = OrderingEquivalenceClass::new(self.projected_orderings(mapping));
let constants = self.projected_constants(mapping);
let constraints = self
.projected_constraints(mapping)
.unwrap_or_else(Constraints::empty);
Self {
schema: output_schema,
eq_group,
oeq_class,
constants,
constraints,
}
}
pub fn find_longest_permutation(
&self,
exprs: &[Arc<dyn PhysicalExpr>],
) -> (LexOrdering, Vec<usize>) {
let mut eq_properties = self.clone();
let mut result = vec![];
let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
for _idx in 0..exprs.len() {
let ordered_exprs = search_indices
.iter()
.flat_map(|&idx| {
let ExprProperties {
sort_properties, ..
} = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
match sort_properties {
SortProperties::Ordered(options) => Some((
PhysicalSortExpr {
expr: Arc::clone(&exprs[idx]),
options,
},
idx,
)),
SortProperties::Singleton => {
let options = SortOptions::default();
Some((
PhysicalSortExpr {
expr: Arc::clone(&exprs[idx]),
options,
},
idx,
))
}
SortProperties::Unordered => None,
}
})
.collect::<Vec<_>>();
if ordered_exprs.is_empty() {
break;
}
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties =
eq_properties.with_constants(std::iter::once(ConstExpr::from(expr)));
search_indices.shift_remove(idx);
}
result.extend(ordered_exprs);
}
let (left, right) = result.into_iter().unzip();
(LexOrdering::new(left), right)
}
pub fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
let const_exprs = self
.constants
.iter()
.map(|const_expr| Arc::clone(const_expr.expr()));
let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
is_constant_recurse(&normalized_constants, &normalized_expr)
}
#[deprecated(
since = "45.0.0",
note = "Use [`is_expr_constant_across_partitions`] instead"
)]
pub fn is_expr_constant_accross_partitions(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
self.is_expr_constant_across_partitions(expr)
}
pub fn is_expr_constant_across_partitions(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
let const_exprs = self
.constants
.iter()
.filter_map(|const_expr| {
if matches!(
const_expr.across_partitions(),
AcrossPartitions::Uniform { .. }
) {
Some(Arc::clone(const_expr.expr()))
} else {
None
}
})
.collect::<Vec<_>>();
let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
is_constant_recurse(&normalized_constants, &normalized_expr)
}
pub fn get_expr_constant_value(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> AcrossPartitions {
let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
if let Some(lit) = normalized_expr.as_any().downcast_ref::<Literal>() {
return AcrossPartitions::Uniform(Some(lit.value().clone()));
}
for const_expr in self.constants.iter() {
if normalized_expr.eq(const_expr.expr()) {
return const_expr.across_partitions();
}
}
AcrossPartitions::Heterogeneous
}
pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
ExprPropertiesNode::new_unknown(expr)
.transform_up(|expr| update_properties(expr, self))
.data()
.map(|node| node.data)
.unwrap_or(ExprProperties::new_unknown())
}
pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
&& self
.schema
.fields
.iter()
.zip(schema.fields.iter())
.all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
if !schemas_aligned {
return plan_err!(
"Cannot rewrite old_schema:{:?} with new schema: {:?}",
self.schema,
schema
);
}
let new_constants = self
.constants
.into_iter()
.map(|const_expr| {
let across_partitions = const_expr.across_partitions();
let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?;
Ok(ConstExpr::new(new_const_expr)
.with_across_partitions(across_partitions))
})
.collect::<Result<Vec<_>>>()?;
let mut new_orderings = vec![];
for ordering in self.oeq_class {
let new_ordering = ordering
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
Ok(sort_expr)
})
.collect::<Result<_>>()?;
new_orderings.push(new_ordering);
}
let mut eq_classes = vec![];
for eq_class in self.eq_group {
let new_eq_exprs = eq_class
.into_vec()
.into_iter()
.map(|expr| with_new_schema(expr, &schema))
.collect::<Result<_>>()?;
eq_classes.push(EquivalenceClass::new(new_eq_exprs));
}
let mut result = EquivalenceProperties::new(schema);
result.constants = new_constants;
result.add_new_orderings(new_orderings);
result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
Ok(result)
}
}
impl Display for EquivalenceProperties {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.eq_group.is_empty()
&& self.oeq_class.is_empty()
&& self.constants.is_empty()
{
return write!(f, "No properties");
}
if !self.oeq_class.is_empty() {
write!(f, "order: {}", self.oeq_class)?;
}
if !self.eq_group.is_empty() {
write!(f, ", eq: {}", self.eq_group)?;
}
if !self.constants.is_empty() {
write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?;
}
Ok(())
}
}
fn update_properties(
mut node: ExprPropertiesNode,
eq_properties: &EquivalenceProperties,
) -> Result<Transformed<ExprPropertiesNode>> {
if !node.expr.children().is_empty() {
let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
node.data = node.expr.get_properties(&children_props)?;
} else if node.expr.as_any().is::<Literal>() {
node.data = node.expr.get_properties(&[])?;
} else if node.expr.as_any().is::<Column>() {
node.data.range =
Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
}
let normalized_expr = eq_properties
.eq_group
.normalize_expr(Arc::clone(&node.expr));
let oeq_class = eq_properties.normalized_oeq_class();
if eq_properties.is_expr_constant(&normalized_expr)
|| oeq_class.is_expr_partial_const(&normalized_expr)
{
node.data.sort_properties = SortProperties::Singleton;
} else if let Some(options) = oeq_class.get_options(&normalized_expr) {
node.data.sort_properties = SortProperties::Ordered(options);
}
Ok(Transformed::yes(node))
}
fn is_constant_recurse(
constants: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
if physical_exprs_contains(constants, expr) || expr.as_any().is::<Literal>() {
return true;
}
let children = expr.children();
!children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
}
fn expr_refers(
referring_expr: &Arc<dyn PhysicalExpr>,
referred_expr: &Arc<dyn PhysicalExpr>,
) -> bool {
referring_expr.eq(referred_expr)
|| referring_expr
.children()
.iter()
.any(|child| expr_refers(child, referred_expr))
}
fn get_expr_properties(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
schema: &SchemaRef,
) -> Result<ExprProperties> {
if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
Ok(ExprProperties {
sort_properties: SortProperties::Ordered(column_order.options),
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
preserves_lex_ordering: false,
})
} else if expr.as_any().downcast_ref::<Column>().is_some() {
Ok(ExprProperties {
sort_properties: SortProperties::Unordered,
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
preserves_lex_ordering: false,
})
} else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
Ok(ExprProperties {
sort_properties: SortProperties::Singleton,
range: Interval::try_new(literal.value().clone(), literal.value().clone())?,
preserves_lex_ordering: true,
})
} else {
let child_states = expr
.children()
.iter()
.map(|child| get_expr_properties(child, dependencies, schema))
.collect::<Result<Vec<_>>>()?;
expr.get_properties(&child_states)
}
}
#[derive(Debug, Clone)]
struct ExprWrapper(Arc<dyn PhysicalExpr>);
impl PartialEq<Self> for ExprWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl Eq for ExprWrapper {}
impl Hash for ExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{col, BinaryExpr};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion_expr::Operator;
#[test]
fn test_expr_consists_of_constants() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_d = col("d", &schema)?;
let b_plus_d = Arc::new(BinaryExpr::new(
Arc::clone(&col_b),
Operator::Plus,
Arc::clone(&col_d),
)) as Arc<dyn PhysicalExpr>;
let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b)];
let expr = Arc::clone(&b_plus_d);
assert!(!is_constant_recurse(&constants, &expr));
let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_d)];
let expr = Arc::clone(&b_plus_d);
assert!(is_constant_recurse(&constants, &expr));
Ok(())
}
}