mod dependency; mod joins; mod union;
pub use joins::*;
pub use union::*;
use std::fmt::{self, Display};
use std::mem;
use std::sync::Arc;
use self::dependency::{
Dependencies, DependencyMap, construct_prefix_orderings,
generate_dependency_orderings, referred_dependencies,
};
use crate::equivalence::{
AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::{CastExpr, Column, Literal, with_new_schema};
use crate::{
ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::sort_expr::options_compatible;
use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::IndexSet;
use itertools::Itertools;
#[derive(Clone, Debug)]
pub struct EquivalenceProperties {
eq_group: EquivalenceGroup,
oeq_class: OrderingEquivalenceClass,
oeq_cache: OrderingEquivalenceCache,
constraints: Constraints,
schema: SchemaRef,
}
#[derive(Clone, Debug, Default)]
struct OrderingEquivalenceCache {
normal_cls: OrderingEquivalenceClass,
leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
}
impl OrderingEquivalenceCache {
pub fn new(
orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
) -> Self {
let mut cache = Self {
normal_cls: OrderingEquivalenceClass::new(orderings),
leading_map: HashMap::new(),
};
cache.update_map();
cache
}
pub fn update_map(&mut self) {
self.leading_map.clear();
for (idx, ordering) in self.normal_cls.iter().enumerate() {
let expr = Arc::clone(&ordering.first().expr);
self.leading_map.entry(expr).or_default().push(idx);
}
}
pub fn clear(&mut self) {
self.normal_cls.clear();
self.leading_map.clear();
}
}
impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
Self {
eq_group: EquivalenceGroup::default(),
oeq_class: OrderingEquivalenceClass::default(),
oeq_cache: OrderingEquivalenceCache::default(),
constraints: Constraints::default(),
schema,
}
}
pub fn set_constraints(&mut self, constraints: Constraints) {
self.constraints = constraints;
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.set_constraints(constraints);
self
}
pub fn new_with_orderings(
schema: SchemaRef,
orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
) -> Self {
let eq_group = EquivalenceGroup::default();
let oeq_class = OrderingEquivalenceClass::new(orderings);
let normal_orderings = oeq_class.iter().cloned().map(|o| {
o.into_iter()
.filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
});
Self {
oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
oeq_class,
eq_group,
constraints: Constraints::default(),
schema,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
&self.oeq_class
}
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
}
pub fn constraints(&self) -> &Constraints {
&self.constraints
}
pub fn constants(&self) -> Vec<ConstExpr> {
self.eq_group
.iter()
.flat_map(|c| {
c.iter().filter_map(|expr| {
c.constant
.as_ref()
.map(|across| ConstExpr::new(Arc::clone(expr), across.clone()))
})
})
.collect()
}
pub fn output_ordering(&self) -> Option<LexOrdering> {
let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
self.normalize_sort_exprs(concat)
}
pub fn extend(mut self, other: Self) -> Result<Self> {
self.constraints.extend(other.constraints);
self.add_equivalence_group(other.eq_group)?;
self.add_orderings(other.oeq_class);
Ok(self)
}
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
self.oeq_cache.clear();
}
pub fn clear_per_partition_constants(&mut self) {
if self.eq_group.clear_per_partition_constants() {
let normal_orderings = self
.oeq_class
.iter()
.cloned()
.map(|o| self.eq_group.normalize_sort_exprs(o));
self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
}
}
pub fn add_orderings(
&mut self,
orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
) {
let orderings: Vec<_> =
orderings.into_iter().filter_map(LexOrdering::new).collect();
let normal_orderings: Vec<_> = orderings
.iter()
.cloned()
.filter_map(|o| self.normalize_sort_exprs(o))
.collect();
if !normal_orderings.is_empty() {
self.oeq_class.extend(orderings);
self.oeq_cache.normal_cls.extend(normal_orderings);
self.oeq_cache.update_map();
}
}
pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
self.add_orderings(std::iter::once(ordering));
}
fn update_oeq_cache(&mut self) -> Result<()> {
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
let normal_orderings = normal_cls
.into_iter()
.map(|o| self.eq_group.normalize_sort_exprs(o));
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
self.oeq_cache.update_map();
let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
for expr in leading_exprs {
self.discover_new_orderings(expr)?;
}
Ok(())
}
pub fn add_equivalence_group(
&mut self,
other_eq_group: EquivalenceGroup,
) -> Result<()> {
if !other_eq_group.is_empty() {
self.eq_group.extend(other_eq_group);
self.update_oeq_cache()?;
}
Ok(())
}
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
self.oeq_class
.iter()
.cloned()
.filter_map(|ordering| self.normalize_sort_exprs(ordering))
.collect::<Vec<_>>()
.into()
}
pub fn add_equal_conditions(
&mut self,
left: Arc<dyn PhysicalExpr>,
right: Arc<dyn PhysicalExpr>,
) -> Result<()> {
if self.eq_group.add_equal_conditions(left, right) {
self.update_oeq_cache()?;
}
self.update_oeq_cache()?;
Ok(())
}
pub fn add_constants(
&mut self,
constants: impl IntoIterator<Item = ConstExpr>,
) -> Result<()> {
for constant in constants {
self.eq_group.add_constant(constant);
}
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
let normal_orderings = normal_cls.into_iter().map(|ordering| {
ordering.into_iter().filter(|sort_expr| {
self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
})
});
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
self.oeq_cache.update_map();
let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
for expr in leading_exprs {
self.discover_new_orderings(expr)?;
}
Ok(())
}
fn discover_new_orderings(
&mut self,
normal_expr: Arc<dyn PhysicalExpr>,
) -> Result<()> {
let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
return Ok(());
};
let eq_class = self
.eq_group
.get_equivalence_class(&normal_expr)
.map_or_else(|| vec![normal_expr], |class| class.clone().into());
let mut new_orderings = vec![];
for idx in ordering_idxs {
let ordering = &self.oeq_cache.normal_cls[*idx];
let leading_ordering_options = ordering[0].options;
'exprs: for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
if children.is_empty() {
continue;
}
let mut child_properties = vec![];
for (i, child) in children.into_iter().enumerate() {
let Some(next) = ordering.get(i + 1) else {
break 'exprs;
};
if !next.expr.eq(child) {
break 'exprs;
}
let data_type = child.data_type(&self.schema)?;
child_properties.push(ExprProperties {
sort_properties: SortProperties::Ordered(next.options),
range: Interval::make_unbounded(&data_type)?,
preserves_lex_ordering: true,
});
}
let expr_properties =
equivalent_expr.get_properties(&child_properties)?;
if expr_properties.preserves_lex_ordering
&& expr_properties.sort_properties
== SortProperties::Ordered(leading_ordering_options)
{
new_orderings.push(ordering[1..].to_vec());
break;
}
}
}
if !new_orderings.is_empty() {
self.add_orderings(new_orderings);
}
Ok(())
}
pub fn reorder(
&mut self,
ordering: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Result<bool> {
let (ordering, ordering_tee) = ordering.into_iter().tee();
let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
return Ok(false);
};
if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
self.clear_orderings();
self.add_ordering(ordering_tee);
return Ok(true);
}
Ok(false)
}
pub fn normalize_sort_exprs(
&self,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Option<LexOrdering> {
LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
}
pub fn normalize_sort_requirements(
&self,
sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Option<LexRequirement> {
LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
}
pub fn ordering_satisfy(
&self,
given: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Result<bool> {
let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
return Ok(true);
};
Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
}
pub fn ordering_satisfy_requirement(
&self,
given: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Result<bool> {
let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
return Ok(true);
};
if self.satisfied_by_constraints(&normal_reqs) {
return Ok(true);
}
let schema = self.schema();
let mut eq_properties = self.clone();
for element in normal_reqs {
let ExprProperties {
sort_properties, ..
} = eq_properties.get_expr_properties(Arc::clone(&element.expr));
let satisfy = match sort_properties {
SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
let nullable = element.expr.nullable(schema).unwrap_or(true);
options_compatible(&options, &opts, nullable)
}),
SortProperties::Singleton => true,
SortProperties::Unordered => false,
};
if !satisfy {
return Ok(false);
}
let const_expr = ConstExpr::from(element.expr);
eq_properties.add_constants(std::iter::once(const_expr))?;
}
Ok(true)
}
fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
let full_length = normal_ordering.len();
if self.satisfied_by_constraints_ordering(normal_ordering) {
return Ok(full_length);
}
let schema = self.schema();
let mut eq_properties = self.clone();
for (idx, element) in normal_ordering.into_iter().enumerate() {
let ExprProperties {
sort_properties, ..
} = eq_properties.get_expr_properties(Arc::clone(&element.expr));
let satisfy = match sort_properties {
SortProperties::Ordered(options) => options_compatible(
&options,
&element.options,
element.expr.nullable(schema).unwrap_or(true),
),
SortProperties::Singleton => true,
SortProperties::Unordered => false,
};
if !satisfy {
return Ok(idx);
}
let const_expr = ConstExpr::from(Arc::clone(&element.expr));
eq_properties.add_constants(std::iter::once(const_expr))?
}
Ok(full_length)
}
pub fn extract_common_sort_prefix(
&self,
ordering: LexOrdering,
) -> Result<(Vec<PhysicalSortExpr>, bool)> {
let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
return Ok((vec![], true));
};
let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
let flag = prefix_len == normal_ordering.len();
let mut sort_exprs: Vec<_> = normal_ordering.into();
if !flag {
sort_exprs.truncate(prefix_len);
}
Ok((sort_exprs, flag))
}
fn satisfied_by_constraints_ordering(
&self,
normal_exprs: &[PhysicalSortExpr],
) -> bool {
self.constraints.iter().any(|constraint| match constraint {
Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
let check_null = matches!(constraint, Constraint::Unique(_));
let normalized_size = normal_exprs.len();
indices.len() <= normalized_size
&& self.oeq_class.iter().any(|ordering| {
let length = ordering.len();
if indices.len() > length || normalized_size < length {
return false;
}
let mut col_positions = HashMap::with_capacity(length);
for (pos, req) in ordering.iter().enumerate() {
if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
{
let nullable = col.nullable(&self.schema).unwrap_or(true);
col_positions.insert(col.index(), (pos, nullable));
}
}
if !indices.iter().all(|idx| {
col_positions.get(idx).is_some_and(|&(pos, nullable)| {
!check_null
|| !nullable
|| (pos != 0 && pos != length - 1)
})
}) {
return false;
}
normal_exprs.iter().zip(ordering).all(|(given, existing)| {
existing.satisfy_expr(given, &self.schema)
})
})
}
})
}
fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
self.constraints.iter().any(|constraint| match constraint {
Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
let check_null = matches!(constraint, Constraint::Unique(_));
let normalized_size = normal_reqs.len();
indices.len() <= normalized_size
&& self.oeq_class.iter().any(|ordering| {
let length = ordering.len();
if indices.len() > length || normalized_size < length {
return false;
}
let mut col_positions = HashMap::with_capacity(length);
for (pos, req) in ordering.iter().enumerate() {
if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
{
let nullable = col.nullable(&self.schema).unwrap_or(true);
col_positions.insert(col.index(), (pos, nullable));
}
}
if !indices.iter().all(|idx| {
col_positions.get(idx).is_some_and(|&(pos, nullable)| {
!check_null
|| !nullable
|| (pos != 0 && pos != length - 1)
})
}) {
return false;
}
normal_reqs.iter().zip(ordering).all(|(given, existing)| {
existing.satisfy(given, &self.schema)
})
})
}
})
}
pub fn requirements_compatible(
&self,
given: LexRequirement,
reference: LexRequirement,
) -> bool {
let Some(normal_given) = self.normalize_sort_requirements(given) else {
return true;
};
let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
return true;
};
(normal_reference.len() <= normal_given.len())
&& normal_reference
.into_iter()
.zip(normal_given)
.all(|(reference, given)| given.compatible(&reference))
}
fn substitute_oeq_class(
schema: &SchemaRef,
mapping: &ProjectionMapping,
oeq_class: OrderingEquivalenceClass,
) -> OrderingEquivalenceClass {
let new_orderings = oeq_class.into_iter().flat_map(|order| {
order
.into_iter()
.map(|sort_expr| {
let referring_exprs = mapping
.iter()
.map(|(source, _target)| source)
.filter(|source| expr_refers(source, &sort_expr.expr))
.cloned();
let mut result = vec![];
let expr_type = sort_expr.expr.data_type(schema).unwrap();
for r_expr in referring_exprs {
if let Some(cast_expr) =
r_expr.as_any().downcast_ref::<CastExpr>()
{
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(&expr_type)
{
result.push(PhysicalSortExpr::new(
r_expr,
sort_expr.options,
));
}
}
}
result.push(sort_expr);
result
})
.multi_cartesian_product()
});
OrderingEquivalenceClass::new(new_orderings)
}
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
mapping: &ProjectionMapping,
) -> Option<Arc<dyn PhysicalExpr>> {
self.eq_group.project_expr(mapping, expr)
}
pub fn project_expressions<'a>(
&'a self,
expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
mapping: &'a ProjectionMapping,
) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
self.eq_group.project_expressions(mapping, expressions)
}
fn construct_dependency_map(
&self,
oeq_class: OrderingEquivalenceClass,
mapping: &ProjectionMapping,
) -> DependencyMap {
let mut map = DependencyMap::default();
for ordering in oeq_class.into_iter() {
if !self.insert_to_dependency_map(
mapping,
ordering[0].clone(),
None,
&mut map,
) {
continue;
}
for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
if !self.insert_to_dependency_map(
mapping,
sort_expr,
Some(dependency),
&mut map,
) {
break;
}
}
}
map
}
fn insert_to_dependency_map(
&self,
mapping: &ProjectionMapping,
sort_expr: PhysicalSortExpr,
dependency: Option<PhysicalSortExpr>,
map: &mut DependencyMap,
) -> bool {
let target_sort_expr = self
.project_expr(&sort_expr.expr, mapping)
.map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
let projectable = target_sort_expr.is_some();
if projectable
|| mapping
.iter()
.any(|(source, _)| expr_refers(source, &sort_expr.expr))
{
map.insert(sort_expr, target_sort_expr, dependency);
}
projectable
}
fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
mapping
.iter()
.map(|(source, target)| {
let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
(normal_source, target.clone())
})
.collect()
}
fn projected_orderings(
&self,
mapping: &ProjectionMapping,
mut oeq_class: OrderingEquivalenceClass,
) -> Vec<LexOrdering> {
let mapping = self.normalize_mapping(mapping);
oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
let orderings = mapping.iter().flat_map(|(source, targets)| {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|deps| {
let ep = get_expr_properties(source, &deps, &self.schema);
let sort_properties = ep.map(|prop| prop.sort_properties);
if let Ok(SortProperties::Ordered(options)) = sort_properties {
Some((options, deps))
} else {
None
}
})
.flat_map(|(options, relevant_deps)| {
let dependency_orderings =
generate_dependency_orderings(&relevant_deps, &dependency_map);
let sort_exprs = targets.iter().map(|(target, _)| {
PhysicalSortExpr::new(Arc::clone(target), options)
});
if dependency_orderings.is_empty() {
sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
} else {
sort_exprs
.flat_map(|sort_expr| {
let mut result = dependency_orderings.clone();
for ordering in result.iter_mut() {
ordering.push(sort_expr.clone());
}
result
})
.collect::<Vec<_>>()
}
})
});
let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
if prefixes.is_empty() {
if let Some(target) = &node.target {
prefixes.push([target.clone()].into());
}
} else {
for ordering in prefixes.iter_mut() {
if let Some(target) = &node.target {
ordering.push(target.clone());
}
}
}
prefixes
});
orderings.chain(projected_orderings).collect()
}
fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
let indices = mapping
.iter()
.flat_map(|(_, targets)| {
targets.iter().flat_map(|(target, _)| {
target.as_any().downcast_ref::<Column>().map(|c| c.index())
})
})
.collect::<Vec<_>>();
self.constraints.project(&indices)
}
pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
let eq_group = self.eq_group.project(mapping);
let orderings =
self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
let normal_orderings = orderings
.iter()
.cloned()
.map(|o| eq_group.normalize_sort_exprs(o));
Self {
oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
oeq_class: OrderingEquivalenceClass::new(orderings),
constraints: self.projected_constraints(mapping).unwrap_or_default(),
schema: output_schema,
eq_group,
}
}
pub fn find_longest_permutation(
&self,
exprs: &[Arc<dyn PhysicalExpr>],
) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
let mut eq_properties = self.clone();
let mut result = vec![];
let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
for _ in 0..exprs.len() {
let ordered_exprs = search_indices
.iter()
.filter_map(|&idx| {
let ExprProperties {
sort_properties, ..
} = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
match sort_properties {
SortProperties::Ordered(options) => {
let expr = Arc::clone(&exprs[idx]);
Some((PhysicalSortExpr::new(expr, options), idx))
}
SortProperties::Singleton => {
let expr = Arc::clone(&exprs[idx]);
Some((PhysicalSortExpr::new_default(expr), idx))
}
SortProperties::Unordered => None,
}
})
.collect::<Vec<_>>();
if ordered_exprs.is_empty() {
break;
}
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
let const_expr = ConstExpr::from(Arc::clone(expr));
eq_properties.add_constants(std::iter::once(const_expr))?;
search_indices.shift_remove(idx);
}
result.extend(ordered_exprs);
}
Ok(result.into_iter().unzip())
}
pub fn is_expr_constant(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<AcrossPartitions> {
self.eq_group.is_expr_constant(expr)
}
pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
ExprPropertiesNode::new_unknown(expr)
.transform_up(|expr| update_properties(expr, self))
.data()
.map(|node| node.data)
.unwrap_or_else(|_| ExprProperties::new_unknown())
}
pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
&& self
.schema
.fields
.iter()
.zip(schema.fields.iter())
.all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
if !schemas_aligned {
return plan_err!(
"Schemas have to be aligned to rewrite equivalences:\n Old schema: {}\n New schema: {}",
self.schema,
schema
);
}
let mut eq_classes = vec![];
for mut eq_class in self.eq_group {
eq_class.exprs = eq_class
.exprs
.into_iter()
.map(|expr| with_new_schema(expr, &schema))
.collect::<Result<_>>()?;
let data_type = eq_class
.canonical_expr()
.map(|e| e.data_type(&schema))
.transpose()?;
if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
(data_type, &mut eq_class.constant)
{
*value = value.cast_to(&data_type)?;
}
eq_classes.push(eq_class);
}
self.eq_group = eq_classes.into();
self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
self.schema = schema;
Ok(self)
}
}
impl From<EquivalenceProperties> for OrderingEquivalenceClass {
fn from(eq_properties: EquivalenceProperties) -> Self {
eq_properties.oeq_class
}
}
impl Display for EquivalenceProperties {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let empty_eq_group = self.eq_group.is_empty();
let empty_oeq_class = self.oeq_class.is_empty();
if empty_oeq_class && empty_eq_group {
write!(f, "No properties")?;
} else if !empty_oeq_class {
write!(f, "order: {}", self.oeq_class)?;
if !empty_eq_group {
write!(f, ", eq: {}", self.eq_group)?;
}
} else {
write!(f, "eq: {}", self.eq_group)?;
}
Ok(())
}
}
fn update_properties(
mut node: ExprPropertiesNode,
eq_properties: &EquivalenceProperties,
) -> Result<Transformed<ExprPropertiesNode>> {
if !node.expr.children().is_empty() {
let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
node.data = node.expr.get_properties(&children_props)?;
} else if node.expr.as_any().is::<Literal>() {
node.data = node.expr.get_properties(&[])?;
} else if node.expr.as_any().is::<Column>() {
node.data.range =
Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
}
let normal_expr = eq_properties
.eq_group
.normalize_expr(Arc::clone(&node.expr));
let oeq_class = &eq_properties.oeq_cache.normal_cls;
if eq_properties.is_expr_constant(&normal_expr).is_some()
|| oeq_class.is_expr_partial_const(&normal_expr)
{
node.data.sort_properties = SortProperties::Singleton;
} else if let Some(options) = oeq_class.get_options(&normal_expr) {
node.data.sort_properties = SortProperties::Ordered(options);
}
Ok(Transformed::yes(node))
}
fn expr_refers(
referring_expr: &Arc<dyn PhysicalExpr>,
referred_expr: &Arc<dyn PhysicalExpr>,
) -> bool {
referring_expr.eq(referred_expr)
|| referring_expr
.children()
.iter()
.any(|child| expr_refers(child, referred_expr))
}
fn get_expr_properties(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
schema: &SchemaRef,
) -> Result<ExprProperties> {
if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
Ok(ExprProperties {
sort_properties: SortProperties::Ordered(column_order.options),
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
preserves_lex_ordering: false,
})
} else if expr.as_any().downcast_ref::<Column>().is_some() {
Ok(ExprProperties {
sort_properties: SortProperties::Unordered,
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
preserves_lex_ordering: false,
})
} else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
Ok(ExprProperties {
sort_properties: SortProperties::Singleton,
range: literal.value().into(),
preserves_lex_ordering: true,
})
} else {
let child_states = expr
.children()
.iter()
.map(|child| get_expr_properties(child, dependencies, schema))
.collect::<Result<Vec<_>>>()?;
expr.get_properties(&child_states)
}
}