datafusion_physical_expr/equivalence/properties/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22use dependency::{
23    construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
24    Dependencies, DependencyMap,
25};
26pub use joins::*;
27pub use union::*;
28
29use std::fmt::Display;
30use std::hash::{Hash, Hasher};
31use std::sync::Arc;
32use std::{fmt, mem};
33
34use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
35use crate::equivalence::{
36    EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
37};
38use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
39use crate::{
40    physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr,
41    PhysicalSortExpr, PhysicalSortRequirement,
42};
43
44use arrow::compute::SortOptions;
45use arrow::datatypes::SchemaRef;
46use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
47use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
48use datafusion_expr::interval_arithmetic::Interval;
49use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
50use datafusion_physical_expr_common::utils::ExprPropertiesNode;
51
52use indexmap::IndexSet;
53use itertools::Itertools;
54
55/// `EquivalenceProperties` stores information about the output
56/// of a plan node, that can be used to optimize the plan.
57///
58/// Currently, it keeps track of:
59/// - Sort expressions (orderings)
60/// - Equivalent expressions: expressions that are known to have same value.
61/// - Constants expressions: expressions that are known to contain a single
62///   constant value.
63///
64/// Please see the [Using Ordering for Better Plans] blog for more details.
65///
66/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
67///
68/// # Example equivalent sort expressions
69///
70/// Consider table below:
71///
72/// ```text
73/// ┌-------┐
74/// | a | b |
75/// |---|---|
76/// | 1 | 9 |
77/// | 2 | 8 |
78/// | 3 | 7 |
79/// | 5 | 5 |
80/// └---┴---┘
81/// ```
82///
83/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
84/// `EquivalenceProperties`, tracks these different valid sort expressions and
85/// treat `a ASC` and `b DESC` on an equal footing. For example if the query
86/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
87/// avoided.
88///
89/// # Example equivalent expressions
90///
91/// Similarly, consider the table below:
92///
93/// ```text
94/// ┌-------┐
95/// | a | b |
96/// |---|---|
97/// | 1 | 1 |
98/// | 2 | 2 |
99/// | 3 | 3 |
100/// | 5 | 5 |
101/// └---┴---┘
102/// ```
103///
104/// In this case,  columns `a` and `b` always have the same value, which can of
105/// such equivalences inside this object. With this information, Datafusion can
106/// optimize operations such as. For example, if the partition requirement is
107/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
108/// repartitioning the data as the existing partitioning satisfies the
109/// requirement.
110///
111/// # Code Example
112/// ```
113/// # use std::sync::Arc;
114/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
115/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
116/// # use datafusion_physical_expr::expressions::col;
117/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
118/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
119/// #   Field::new("a", DataType::Int32, false),
120/// #   Field::new("b", DataType::Int32, false),
121/// #   Field::new("c", DataType::Int32, false),
122/// # ]));
123/// # let col_a = col("a", &schema).unwrap();
124/// # let col_b = col("b", &schema).unwrap();
125/// # let col_c = col("c", &schema).unwrap();
126/// // This object represents data that is sorted by a ASC, c DESC
127/// // with a single constant value of b
128/// let mut eq_properties = EquivalenceProperties::new(schema)
129///   .with_constants(vec![ConstExpr::from(col_b)]);
130/// eq_properties.add_new_ordering(LexOrdering::new(vec![
131///   PhysicalSortExpr::new_default(col_a).asc(),
132///   PhysicalSortExpr::new_default(col_c).desc(),
133/// ]));
134///
135/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1(heterogeneous)]")
136/// ```
137#[derive(Debug, Clone)]
138pub struct EquivalenceProperties {
139    /// Distinct equivalence classes (exprs known to have the same expressions)
140    eq_group: EquivalenceGroup,
141    /// Equivalent sort expressions
142    oeq_class: OrderingEquivalenceClass,
143    /// Expressions whose values are constant
144    ///
145    /// TODO: We do not need to track constants separately, they can be tracked
146    ///       inside `eq_group` as `Literal` expressions.
147    constants: Vec<ConstExpr>,
148    /// Table constraints
149    constraints: Constraints,
150    /// Schema associated with this object.
151    schema: SchemaRef,
152}
153
154impl EquivalenceProperties {
155    /// Creates an empty `EquivalenceProperties` object.
156    pub fn new(schema: SchemaRef) -> Self {
157        Self {
158            eq_group: EquivalenceGroup::empty(),
159            oeq_class: OrderingEquivalenceClass::empty(),
160            constants: vec![],
161            constraints: Constraints::empty(),
162            schema,
163        }
164    }
165
166    /// Adds constraints to the properties.
167    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
168        self.constraints = constraints;
169        self
170    }
171
172    /// Creates a new `EquivalenceProperties` object with the given orderings.
173    pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
174        Self {
175            eq_group: EquivalenceGroup::empty(),
176            oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
177            constants: vec![],
178            constraints: Constraints::empty(),
179            schema,
180        }
181    }
182
183    /// Returns the associated schema.
184    pub fn schema(&self) -> &SchemaRef {
185        &self.schema
186    }
187
188    /// Returns a reference to the ordering equivalence class within.
189    pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
190        &self.oeq_class
191    }
192
193    /// Return the inner OrderingEquivalenceClass, consuming self
194    pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
195        self.oeq_class
196    }
197
198    /// Returns a reference to the equivalence group within.
199    pub fn eq_group(&self) -> &EquivalenceGroup {
200        &self.eq_group
201    }
202
203    /// Returns a reference to the constant expressions
204    pub fn constants(&self) -> &[ConstExpr] {
205        &self.constants
206    }
207
208    pub fn constraints(&self) -> &Constraints {
209        &self.constraints
210    }
211
212    /// Returns the output ordering of the properties.
213    pub fn output_ordering(&self) -> Option<LexOrdering> {
214        let constants = self.constants();
215        let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
216        // Prune out constant expressions
217        output_ordering
218            .retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
219        (!output_ordering.is_empty()).then_some(output_ordering)
220    }
221
222    /// Returns the normalized version of the ordering equivalence class within.
223    /// Normalization removes constants and duplicates as well as standardizing
224    /// expressions according to the equivalence group within.
225    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
226        OrderingEquivalenceClass::new(
227            self.oeq_class
228                .iter()
229                .map(|ordering| self.normalize_sort_exprs(ordering))
230                .collect(),
231        )
232    }
233
234    /// Extends this `EquivalenceProperties` with the `other` object.
235    pub fn extend(mut self, other: Self) -> Self {
236        self.eq_group.extend(other.eq_group);
237        self.oeq_class.extend(other.oeq_class);
238        self.with_constants(other.constants)
239    }
240
241    /// Clears (empties) the ordering equivalence class within this object.
242    /// Call this method when existing orderings are invalidated.
243    pub fn clear_orderings(&mut self) {
244        self.oeq_class.clear();
245    }
246
247    /// Removes constant expressions that may change across partitions.
248    /// This method should be used when data from different partitions are merged.
249    pub fn clear_per_partition_constants(&mut self) {
250        self.constants.retain(|item| {
251            matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
252        })
253    }
254
255    /// Extends this `EquivalenceProperties` by adding the orderings inside the
256    /// ordering equivalence class `other`.
257    pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
258        self.oeq_class.extend(other);
259    }
260
261    /// Adds new orderings into the existing ordering equivalence class.
262    pub fn add_new_orderings(
263        &mut self,
264        orderings: impl IntoIterator<Item = LexOrdering>,
265    ) {
266        self.oeq_class.add_new_orderings(orderings);
267    }
268
269    /// Adds a single ordering to the existing ordering equivalence class.
270    pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
271        self.add_new_orderings([ordering]);
272    }
273
274    /// Incorporates the given equivalence group to into the existing
275    /// equivalence group within.
276    pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
277        self.eq_group.extend(other_eq_group);
278    }
279
280    /// Adds a new equality condition into the existing equivalence group.
281    /// If the given equality defines a new equivalence class, adds this new
282    /// equivalence class to the equivalence group.
283    pub fn add_equal_conditions(
284        &mut self,
285        left: &Arc<dyn PhysicalExpr>,
286        right: &Arc<dyn PhysicalExpr>,
287    ) -> Result<()> {
288        // Discover new constants in light of new the equality:
289        if self.is_expr_constant(left) {
290            // Left expression is constant, add right as constant
291            if !const_exprs_contains(&self.constants, right) {
292                let const_expr = ConstExpr::from(right)
293                    .with_across_partitions(self.get_expr_constant_value(left));
294                self.constants.push(const_expr);
295            }
296        } else if self.is_expr_constant(right) {
297            // Right expression is constant, add left as constant
298            if !const_exprs_contains(&self.constants, left) {
299                let const_expr = ConstExpr::from(left)
300                    .with_across_partitions(self.get_expr_constant_value(right));
301                self.constants.push(const_expr);
302            }
303        }
304
305        // Add equal expressions to the state
306        self.eq_group.add_equal_conditions(left, right);
307
308        // Discover any new orderings
309        self.discover_new_orderings(left)?;
310        Ok(())
311    }
312
313    /// Track/register physical expressions with constant values.
314    #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
315    pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) -> Self {
316        self.with_constants(constants)
317    }
318
319    /// Remove the specified constant
320    pub fn remove_constant(mut self, c: &ConstExpr) -> Self {
321        self.constants.retain(|existing| existing != c);
322        self
323    }
324
325    /// Track/register physical expressions with constant values.
326    pub fn with_constants(
327        mut self,
328        constants: impl IntoIterator<Item = ConstExpr>,
329    ) -> Self {
330        let normalized_constants = constants
331            .into_iter()
332            .filter_map(|c| {
333                let across_partitions = c.across_partitions();
334                let expr = c.owned_expr();
335                let normalized_expr = self.eq_group.normalize_expr(expr);
336
337                if const_exprs_contains(&self.constants, &normalized_expr) {
338                    return None;
339                }
340
341                let const_expr = ConstExpr::from(normalized_expr)
342                    .with_across_partitions(across_partitions);
343
344                Some(const_expr)
345            })
346            .collect::<Vec<_>>();
347
348        // Add all new normalized constants
349        self.constants.extend(normalized_constants);
350
351        // Discover any new orderings based on the constants
352        for ordering in self.normalized_oeq_class().iter() {
353            if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
354                log::debug!("error discovering new orderings: {e}");
355            }
356        }
357
358        self
359    }
360
361    // Discover new valid orderings in light of a new equality.
362    // Accepts a single argument (`expr`) which is used to determine
363    // which orderings should be updated.
364    // When constants or equivalence classes are changed, there may be new orderings
365    // that can be discovered with the new equivalence properties.
366    // For a discussion, see: https://github.com/apache/datafusion/issues/9812
367    fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
368        let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
369        let eq_class = self
370            .eq_group
371            .iter()
372            .find_map(|class| {
373                class
374                    .contains(&normalized_expr)
375                    .then(|| class.clone().into_vec())
376            })
377            .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
378
379        let mut new_orderings: Vec<LexOrdering> = vec![];
380        for ordering in self.normalized_oeq_class().iter() {
381            if !ordering[0].expr.eq(&normalized_expr) {
382                continue;
383            }
384
385            let leading_ordering_options = ordering[0].options;
386
387            for equivalent_expr in &eq_class {
388                let children = equivalent_expr.children();
389                if children.is_empty() {
390                    continue;
391                }
392
393                // Check if all children match the next expressions in the ordering
394                let mut all_children_match = true;
395                let mut child_properties = vec![];
396
397                // Build properties for each child based on the next expressions
398                for (i, child) in children.iter().enumerate() {
399                    if let Some(next) = ordering.get(i + 1) {
400                        if !child.as_ref().eq(next.expr.as_ref()) {
401                            all_children_match = false;
402                            break;
403                        }
404                        child_properties.push(ExprProperties {
405                            sort_properties: SortProperties::Ordered(next.options),
406                            range: Interval::make_unbounded(
407                                &child.data_type(&self.schema)?,
408                            )?,
409                            preserves_lex_ordering: true,
410                        });
411                    } else {
412                        all_children_match = false;
413                        break;
414                    }
415                }
416
417                if all_children_match {
418                    // Check if the expression is monotonic in all arguments
419                    if let Ok(expr_properties) =
420                        equivalent_expr.get_properties(&child_properties)
421                    {
422                        if expr_properties.preserves_lex_ordering
423                            && SortProperties::Ordered(leading_ordering_options)
424                                == expr_properties.sort_properties
425                        {
426                            // Assume existing ordering is [c ASC, a ASC, b ASC]
427                            // When equality c = f(a,b) is given, if we know that given ordering `[a ASC, b ASC]`,
428                            // ordering `[f(a,b) ASC]` is valid, then we can deduce that ordering `[a ASC, b ASC]` is also valid.
429                            // Hence, ordering `[a ASC, b ASC]` can be added to the state as a valid ordering.
430                            // (e.g. existing ordering where leading ordering is removed)
431                            new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
432                            break;
433                        }
434                    }
435                }
436            }
437        }
438
439        self.oeq_class.add_new_orderings(new_orderings);
440        Ok(())
441    }
442
443    /// Updates the ordering equivalence group within assuming that the table
444    /// is re-sorted according to the argument `sort_exprs`. Note that constants
445    /// and equivalence classes are unchanged as they are unaffected by a re-sort.
446    /// If the given ordering is already satisfied, the function does nothing.
447    pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
448        // Filter out constant expressions as they don't affect ordering
449        let filtered_exprs = LexOrdering::new(
450            sort_exprs
451                .into_iter()
452                .filter(|expr| !self.is_expr_constant(&expr.expr))
453                .collect(),
454        );
455
456        if filtered_exprs.is_empty() {
457            return self;
458        }
459
460        let mut new_orderings = vec![filtered_exprs.clone()];
461
462        // Preserve valid suffixes from existing orderings
463        let oeq_class = mem::take(&mut self.oeq_class);
464        for existing in oeq_class {
465            if self.is_prefix_of(&filtered_exprs, &existing) {
466                let mut extended = filtered_exprs.clone();
467                extended.extend(existing.into_iter().skip(filtered_exprs.len()));
468                new_orderings.push(extended);
469            }
470        }
471
472        self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
473        self
474    }
475
476    /// Checks if the new ordering matches a prefix of the existing ordering
477    /// (considering expression equivalences)
478    fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> bool {
479        // Check if new order is longer than existing - can't be a prefix
480        if new_order.len() > existing.len() {
481            return false;
482        }
483
484        // Check if new order matches existing prefix (considering equivalences)
485        new_order.iter().zip(existing).all(|(new, existing)| {
486            self.eq_group.exprs_equal(&new.expr, &existing.expr)
487                && new.options == existing.options
488        })
489    }
490
491    /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
492    /// equivalence group and the ordering equivalence class within.
493    ///
494    /// Assume that `self.eq_group` states column `a` and `b` are aliases.
495    /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
496    /// are equivalent (in the sense that both describe the ordering of the table).
497    /// If the `sort_exprs` argument were `vec![b ASC, c ASC, a ASC]`, then this
498    /// function would return `vec![a ASC, c ASC]`. Internally, it would first
499    /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
500    /// after deduplication.
501    fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
502        // Convert sort expressions to sort requirements:
503        let sort_reqs = LexRequirement::from(sort_exprs.clone());
504        // Normalize the requirements:
505        let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
506        // Convert sort requirements back to sort expressions:
507        LexOrdering::from(normalized_sort_reqs)
508    }
509
510    /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
511    /// equivalence group and the ordering equivalence class within. It works by:
512    /// - Removing expressions that have a constant value from the given requirement.
513    /// - Replacing sections that belong to some equivalence class in the equivalence
514    ///   group with the first entry in the matching equivalence class.
515    ///
516    /// Assume that `self.eq_group` states column `a` and `b` are aliases.
517    /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
518    /// are equivalent (in the sense that both describe the ordering of the table).
519    /// If the `sort_reqs` argument were `vec![b ASC, c ASC, a ASC]`, then this
520    /// function would return `vec![a ASC, c ASC]`. Internally, it would first
521    /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
522    /// after deduplication.
523    fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> LexRequirement {
524        let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
525        let mut constant_exprs = vec![];
526        constant_exprs.extend(
527            self.constants
528                .iter()
529                .map(|const_expr| Arc::clone(const_expr.expr())),
530        );
531        let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
532        // Prune redundant sections in the requirement:
533        normalized_sort_reqs
534            .iter()
535            .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
536            .cloned()
537            .collect::<LexRequirement>()
538            .collapse()
539    }
540
541    /// Checks whether the given ordering is satisfied by any of the existing
542    /// orderings.
543    pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
544        // Convert the given sort expressions to sort requirements:
545        let sort_requirements = LexRequirement::from(given.clone());
546        self.ordering_satisfy_requirement(&sort_requirements)
547    }
548
549    /// Returns the number of consecutive requirements (starting from the left)
550    /// that are satisfied by the plan ordering.
551    fn compute_common_sort_prefix_length(
552        &self,
553        normalized_reqs: &LexRequirement,
554    ) -> usize {
555        // Check whether given ordering is satisfied by constraints first
556        if self.satisfied_by_constraints(normalized_reqs) {
557            // If the constraints satisfy all requirements, return the full normalized requirements length
558            return normalized_reqs.len();
559        }
560
561        let mut eq_properties = self.clone();
562
563        for (i, normalized_req) in normalized_reqs.iter().enumerate() {
564            // Check whether given ordering is satisfied
565            if !eq_properties.ordering_satisfy_single(normalized_req) {
566                // As soon as one requirement is not satisfied, return
567                // how many we've satisfied so far
568                return i;
569            }
570            // Treat satisfied keys as constants in subsequent iterations. We
571            // can do this because the "next" key only matters in a lexicographical
572            // ordering when the keys to its left have the same values.
573            //
574            // Note that these expressions are not properly "constants". This is just
575            // an implementation strategy confined to this function.
576            //
577            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
578            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
579            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
580            // we add column `a` as constant to the algorithm state. This enables us
581            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
582            eq_properties = eq_properties.with_constants(std::iter::once(
583                ConstExpr::from(Arc::clone(&normalized_req.expr)),
584            ));
585        }
586
587        // All requirements are satisfied.
588        normalized_reqs.len()
589    }
590
591    /// Determines the longest prefix of `reqs` that is satisfied by the existing ordering.
592    /// Returns that prefix as a new `LexRequirement`, and a boolean indicating if all the requirements are satisfied.
593    pub fn extract_common_sort_prefix(
594        &self,
595        reqs: &LexRequirement,
596    ) -> (LexRequirement, bool) {
597        // First, standardize the given requirement:
598        let normalized_reqs = self.normalize_sort_requirements(reqs);
599
600        let prefix_len = self.compute_common_sort_prefix_length(&normalized_reqs);
601        (
602            LexRequirement::new(normalized_reqs[..prefix_len].to_vec()),
603            prefix_len == normalized_reqs.len(),
604        )
605    }
606
607    /// Checks whether the given sort requirements are satisfied by any of the
608    /// existing orderings.
609    pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
610        self.extract_common_sort_prefix(reqs).1
611    }
612
613    /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique).
614    /// Returns true if any constraint fully satisfies the requirements.
615    fn satisfied_by_constraints(
616        &self,
617        normalized_reqs: &[PhysicalSortRequirement],
618    ) -> bool {
619        self.constraints.iter().any(|constraint| match constraint {
620            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self
621                .satisfied_by_constraint(
622                    normalized_reqs,
623                    indices,
624                    matches!(constraint, Constraint::Unique(_)),
625                ),
626        })
627    }
628
629    /// Checks if sort requirements are satisfied by a constraint (primary key or unique).
630    /// Returns true if the constraint indices form a valid prefix of an existing ordering
631    /// that matches the requirements. For unique constraints, also verifies nullable columns.
632    fn satisfied_by_constraint(
633        &self,
634        normalized_reqs: &[PhysicalSortRequirement],
635        indices: &[usize],
636        check_null: bool,
637    ) -> bool {
638        // Requirements must contain indices
639        if indices.len() > normalized_reqs.len() {
640            return false;
641        }
642
643        // Iterate over all orderings
644        self.oeq_class.iter().any(|ordering| {
645            if indices.len() > ordering.len() {
646                return false;
647            }
648
649            // Build a map of column positions in the ordering
650            let mut col_positions = HashMap::with_capacity(ordering.len());
651            for (pos, req) in ordering.iter().enumerate() {
652                if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
653                    col_positions.insert(
654                        col.index(),
655                        (pos, col.nullable(&self.schema).unwrap_or(true)),
656                    );
657                }
658            }
659
660            // Check if all constraint indices appear in valid positions
661            if !indices.iter().all(|&idx| {
662                col_positions
663                    .get(&idx)
664                    .map(|&(pos, nullable)| {
665                        // For unique constraints, verify column is not nullable if it's first/last
666                        !check_null
667                            || (pos != 0 && pos != ordering.len() - 1)
668                            || !nullable
669                    })
670                    .unwrap_or(false)
671            }) {
672                return false;
673            }
674
675            // Check if this ordering matches requirements prefix
676            let ordering_len = ordering.len();
677            normalized_reqs.len() >= ordering_len
678                && normalized_reqs[..ordering_len].iter().zip(ordering).all(
679                    |(req, existing)| {
680                        req.expr.eq(&existing.expr)
681                            && req
682                                .options
683                                .is_none_or(|req_opts| req_opts == existing.options)
684                    },
685                )
686        })
687    }
688
689    /// Determines whether the ordering specified by the given sort requirement
690    /// is satisfied based on the orderings within, equivalence classes, and
691    /// constant expressions.
692    ///
693    /// # Parameters
694    ///
695    /// - `req`: A reference to a `PhysicalSortRequirement` for which the ordering
696    ///   satisfaction check will be done.
697    ///
698    /// # Returns
699    ///
700    /// Returns `true` if the specified ordering is satisfied, `false` otherwise.
701    fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
702        let ExprProperties {
703            sort_properties, ..
704        } = self.get_expr_properties(Arc::clone(&req.expr));
705        match sort_properties {
706            SortProperties::Ordered(options) => {
707                let sort_expr = PhysicalSortExpr {
708                    expr: Arc::clone(&req.expr),
709                    options,
710                };
711                sort_expr.satisfy(req, self.schema())
712            }
713            // Singleton expressions satisfies any ordering.
714            SortProperties::Singleton => true,
715            SortProperties::Unordered => false,
716        }
717    }
718
719    /// Checks whether the `given` sort requirements are equal or more specific
720    /// than the `reference` sort requirements.
721    pub fn requirements_compatible(
722        &self,
723        given: &LexRequirement,
724        reference: &LexRequirement,
725    ) -> bool {
726        let normalized_given = self.normalize_sort_requirements(given);
727        let normalized_reference = self.normalize_sort_requirements(reference);
728
729        (normalized_reference.len() <= normalized_given.len())
730            && normalized_reference
731                .into_iter()
732                .zip(normalized_given)
733                .all(|(reference, given)| given.compatible(&reference))
734    }
735
736    /// Returns the finer ordering among the orderings `lhs` and `rhs`, breaking
737    /// any ties by choosing `lhs`.
738    ///
739    /// The finer ordering is the ordering that satisfies both of the orderings.
740    /// If the orderings are incomparable, returns `None`.
741    ///
742    /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is
743    /// the latter.
744    pub fn get_finer_ordering(
745        &self,
746        lhs: &LexOrdering,
747        rhs: &LexOrdering,
748    ) -> Option<LexOrdering> {
749        // Convert the given sort expressions to sort requirements:
750        let lhs = LexRequirement::from(lhs.clone());
751        let rhs = LexRequirement::from(rhs.clone());
752        let finer = self.get_finer_requirement(&lhs, &rhs);
753        // Convert the chosen sort requirements back to sort expressions:
754        finer.map(LexOrdering::from)
755    }
756
757    /// Returns the finer ordering among the requirements `lhs` and `rhs`,
758    /// breaking any ties by choosing `lhs`.
759    ///
760    /// The finer requirements are the ones that satisfy both of the given
761    /// requirements. If the requirements are incomparable, returns `None`.
762    ///
763    /// For example, the finer requirements among `[a ASC]` and `[a ASC, b ASC]`
764    /// is the latter.
765    pub fn get_finer_requirement(
766        &self,
767        req1: &LexRequirement,
768        req2: &LexRequirement,
769    ) -> Option<LexRequirement> {
770        let mut lhs = self.normalize_sort_requirements(req1);
771        let mut rhs = self.normalize_sort_requirements(req2);
772        lhs.inner
773            .iter_mut()
774            .zip(rhs.inner.iter_mut())
775            .all(|(lhs, rhs)| {
776                lhs.expr.eq(&rhs.expr)
777                    && match (lhs.options, rhs.options) {
778                        (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
779                        (Some(options), None) => {
780                            rhs.options = Some(options);
781                            true
782                        }
783                        (None, Some(options)) => {
784                            lhs.options = Some(options);
785                            true
786                        }
787                        (None, None) => true,
788                    }
789            })
790            .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
791    }
792
793    /// we substitute the ordering according to input expression type, this is a simplified version
794    /// In this case, we just substitute when the expression satisfy the following condition:
795    /// I. just have one column and is a CAST expression
796    /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
797    /// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if
798    ///  x is DESC or ASC
799    /// After substitution, we may generate more than 1 `LexOrdering`. As an example,
800    /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied.
801    pub fn substitute_ordering_component(
802        &self,
803        mapping: &ProjectionMapping,
804        sort_expr: &LexOrdering,
805    ) -> Result<Vec<LexOrdering>> {
806        let new_orderings = sort_expr
807            .iter()
808            .map(|sort_expr| {
809                let referring_exprs: Vec<_> = mapping
810                    .iter()
811                    .map(|(source, _target)| source)
812                    .filter(|source| expr_refers(source, &sort_expr.expr))
813                    .cloned()
814                    .collect();
815                let mut res = LexOrdering::new(vec![sort_expr.clone()]);
816                // TODO: Add one-to-ones analysis for ScalarFunctions.
817                for r_expr in referring_exprs {
818                    // we check whether this expression is substitutable or not
819                    if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
820                        // we need to know whether the Cast Expr matches or not
821                        let expr_type = sort_expr.expr.data_type(&self.schema)?;
822                        if cast_expr.expr.eq(&sort_expr.expr)
823                            && cast_expr.is_bigger_cast(expr_type)
824                        {
825                            res.push(PhysicalSortExpr {
826                                expr: Arc::clone(&r_expr),
827                                options: sort_expr.options,
828                            });
829                        }
830                    }
831                }
832                Ok(res)
833            })
834            .collect::<Result<Vec<_>>>()?;
835        // Generate all valid orderings, given substituted expressions.
836        let res = new_orderings
837            .into_iter()
838            .multi_cartesian_product()
839            .map(LexOrdering::new)
840            .collect::<Vec<_>>();
841        Ok(res)
842    }
843
844    /// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
845    /// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
846    /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
847    /// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
848    /// dependency map, happen in issue 8838: <https://github.com/apache/datafusion/issues/8838>
849    pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
850        let new_order = self
851            .oeq_class
852            .iter()
853            .map(|order| self.substitute_ordering_component(mapping, order))
854            .collect::<Result<Vec<_>>>()?;
855        let new_order = new_order.into_iter().flatten().collect();
856        self.oeq_class = OrderingEquivalenceClass::new(new_order);
857        Ok(())
858    }
859    /// Projects argument `expr` according to `projection_mapping`, taking
860    /// equivalences into account.
861    ///
862    /// For example, assume that columns `a` and `c` are always equal, and that
863    /// `projection_mapping` encodes following mapping:
864    ///
865    /// ```text
866    /// a -> a1
867    /// b -> b1
868    /// ```
869    ///
870    /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
871    /// `Some(a1 + b1)` and `d` to `None`, meaning that it  cannot be projected.
872    pub fn project_expr(
873        &self,
874        expr: &Arc<dyn PhysicalExpr>,
875        projection_mapping: &ProjectionMapping,
876    ) -> Option<Arc<dyn PhysicalExpr>> {
877        self.eq_group.project_expr(projection_mapping, expr)
878    }
879
880    /// Constructs a dependency map based on existing orderings referred to in
881    /// the projection.
882    ///
883    /// This function analyzes the orderings in the normalized order-equivalence
884    /// class and builds a dependency map. The dependency map captures relationships
885    /// between expressions within the orderings, helping to identify dependencies
886    /// and construct valid projected orderings during projection operations.
887    ///
888    /// # Parameters
889    ///
890    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
891    ///   relationship between source and target expressions.
892    ///
893    /// # Returns
894    ///
895    /// A [`DependencyMap`] representing the dependency map, where each
896    /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
897    ///
898    /// # Example
899    ///
900    /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
901    /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
902    /// Then, the dependency map will be:
903    ///
904    /// ```text
905    /// a ASC: Node {Some(a_new ASC), HashSet{}}
906    /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
907    /// c ASC: Node {None, HashSet{a ASC}}
908    /// ```
909    fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
910        let mut dependency_map = DependencyMap::new();
911        for ordering in self.normalized_oeq_class().iter() {
912            for (idx, sort_expr) in ordering.iter().enumerate() {
913                let target_sort_expr =
914                    self.project_expr(&sort_expr.expr, mapping).map(|expr| {
915                        PhysicalSortExpr {
916                            expr,
917                            options: sort_expr.options,
918                        }
919                    });
920                let is_projected = target_sort_expr.is_some();
921                if is_projected
922                    || mapping
923                        .iter()
924                        .any(|(source, _)| expr_refers(source, &sort_expr.expr))
925                {
926                    // Previous ordering is a dependency. Note that there is no,
927                    // dependency for a leading ordering (i.e. the first sort
928                    // expression).
929                    let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
930                    // Add sort expressions that can be projected or referred to
931                    // by any of the projection expressions to the dependency map:
932                    dependency_map.insert(
933                        sort_expr,
934                        target_sort_expr.as_ref(),
935                        dependency,
936                    );
937                }
938                if !is_projected {
939                    // If we can not project, stop constructing the dependency
940                    // map as remaining dependencies will be invalid after projection.
941                    break;
942                }
943            }
944        }
945        dependency_map
946    }
947
948    /// Returns a new `ProjectionMapping` where source expressions are normalized.
949    ///
950    /// This normalization ensures that source expressions are transformed into a
951    /// consistent representation. This is beneficial for algorithms that rely on
952    /// exact equalities, as it allows for more precise and reliable comparisons.
953    ///
954    /// # Parameters
955    ///
956    /// - `mapping`: A reference to the original `ProjectionMapping` to be normalized.
957    ///
958    /// # Returns
959    ///
960    /// A new `ProjectionMapping` with normalized source expressions.
961    fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
962        // Construct the mapping where source expressions are normalized. In this way
963        // In the algorithms below we can work on exact equalities
964        ProjectionMapping {
965            map: mapping
966                .iter()
967                .map(|(source, target)| {
968                    let normalized_source =
969                        self.eq_group.normalize_expr(Arc::clone(source));
970                    (normalized_source, Arc::clone(target))
971                })
972                .collect(),
973        }
974    }
975
976    /// Computes projected orderings based on a given projection mapping.
977    ///
978    /// This function takes a `ProjectionMapping` and computes the possible
979    /// orderings for the projected expressions. It considers dependencies
980    /// between expressions and generates valid orderings according to the
981    /// specified sort properties.
982    ///
983    /// # Parameters
984    ///
985    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
986    ///   relationship between source and target expressions.
987    ///
988    /// # Returns
989    ///
990    /// A vector of `LexOrdering` containing all valid orderings after projection.
991    fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
992        let mapping = self.normalized_mapping(mapping);
993
994        // Get dependency map for existing orderings:
995        let dependency_map = self.construct_dependency_map(&mapping);
996        let orderings = mapping.iter().flat_map(|(source, target)| {
997            referred_dependencies(&dependency_map, source)
998                .into_iter()
999                .filter_map(|relevant_deps| {
1000                    if let Ok(SortProperties::Ordered(options)) =
1001                        get_expr_properties(source, &relevant_deps, &self.schema)
1002                            .map(|prop| prop.sort_properties)
1003                    {
1004                        Some((options, relevant_deps))
1005                    } else {
1006                        // Do not consider unordered cases
1007                        None
1008                    }
1009                })
1010                .flat_map(|(options, relevant_deps)| {
1011                    let sort_expr = PhysicalSortExpr {
1012                        expr: Arc::clone(target),
1013                        options,
1014                    };
1015                    // Generate dependent orderings (i.e. prefixes for `sort_expr`):
1016                    let mut dependency_orderings =
1017                        generate_dependency_orderings(&relevant_deps, &dependency_map);
1018                    // Append `sort_expr` to the dependent orderings:
1019                    for ordering in dependency_orderings.iter_mut() {
1020                        ordering.push(sort_expr.clone());
1021                    }
1022                    dependency_orderings
1023                })
1024        });
1025
1026        // Add valid projected orderings. For example, if existing ordering is
1027        // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1028        // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1029        // `b_new` themselves need not be ordered. Such dependencies cannot be
1030        // deduced via the pass above.
1031        let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1032            let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1033            if prefixes.is_empty() {
1034                // If prefix is empty, there is no dependency. Insert
1035                // empty ordering:
1036                prefixes = vec![LexOrdering::default()];
1037            }
1038            // Append current ordering on top its dependencies:
1039            for ordering in prefixes.iter_mut() {
1040                if let Some(target) = &node.target_sort_expr {
1041                    ordering.push(target.clone())
1042                }
1043            }
1044            prefixes
1045        });
1046
1047        // Simplify each ordering by removing redundant sections:
1048        orderings
1049            .chain(projected_orderings)
1050            .map(|lex_ordering| lex_ordering.collapse())
1051            .collect()
1052    }
1053
1054    /// Projects constants based on the provided `ProjectionMapping`.
1055    ///
1056    /// This function takes a `ProjectionMapping` and identifies/projects
1057    /// constants based on the existing constants and the mapping. It ensures
1058    /// that constants are appropriately propagated through the projection.
1059    ///
1060    /// # Parameters
1061    ///
1062    /// - `mapping`: A reference to a `ProjectionMapping` representing the
1063    ///   mapping of source expressions to target expressions in the projection.
1064    ///
1065    /// # Returns
1066    ///
1067    /// Returns a `Vec<Arc<dyn PhysicalExpr>>` containing the projected constants.
1068    fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> {
1069        // First, project existing constants. For example, assume that `a + b`
1070        // is known to be constant. If the projection were `a as a_new`, `b as b_new`,
1071        // then we would project constant `a + b` as `a_new + b_new`.
1072        let mut projected_constants = self
1073            .constants
1074            .iter()
1075            .flat_map(|const_expr| {
1076                const_expr
1077                    .map(|expr| self.eq_group.project_expr(mapping, expr))
1078                    .map(|projected_expr| {
1079                        projected_expr
1080                            .with_across_partitions(const_expr.across_partitions())
1081                    })
1082            })
1083            .collect::<Vec<_>>();
1084
1085        // Add projection expressions that are known to be constant:
1086        for (source, target) in mapping.iter() {
1087            if self.is_expr_constant(source)
1088                && !const_exprs_contains(&projected_constants, target)
1089            {
1090                if self.is_expr_constant_across_partitions(source) {
1091                    projected_constants.push(
1092                        ConstExpr::from(target)
1093                            .with_across_partitions(self.get_expr_constant_value(source)),
1094                    )
1095                } else {
1096                    projected_constants.push(
1097                        ConstExpr::from(target)
1098                            .with_across_partitions(AcrossPartitions::Heterogeneous),
1099                    )
1100                }
1101            }
1102        }
1103        projected_constants
1104    }
1105
1106    /// Projects constraints according to the given projection mapping.
1107    ///
1108    /// This function takes a projection mapping and extracts the column indices of the target columns.
1109    /// It then projects the constraints to only include relationships between
1110    /// columns that exist in the projected output.
1111    ///
1112    /// # Arguments
1113    ///
1114    /// * `mapping` - A reference to `ProjectionMapping` that defines how expressions are mapped
1115    ///   in the projection operation
1116    ///
1117    /// # Returns
1118    ///
1119    /// Returns a new `Constraints` object containing only the constraints
1120    /// that are valid for the projected columns.
1121    fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1122        let indices = mapping
1123            .iter()
1124            .filter_map(|(_, target)| target.as_any().downcast_ref::<Column>())
1125            .map(|col| col.index())
1126            .collect::<Vec<_>>();
1127        debug_assert_eq!(mapping.map.len(), indices.len());
1128        self.constraints.project(&indices)
1129    }
1130
1131    /// Projects the equivalences within according to `mapping`
1132    /// and `output_schema`.
1133    pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1134        let eq_group = self.eq_group.project(mapping);
1135        let oeq_class = OrderingEquivalenceClass::new(self.projected_orderings(mapping));
1136        let constants = self.projected_constants(mapping);
1137        let constraints = self
1138            .projected_constraints(mapping)
1139            .unwrap_or_else(Constraints::empty);
1140        Self {
1141            schema: output_schema,
1142            eq_group,
1143            oeq_class,
1144            constants,
1145            constraints,
1146        }
1147    }
1148
1149    /// Returns the longest (potentially partial) permutation satisfying the
1150    /// existing ordering. For example, if we have the equivalent orderings
1151    /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1152    /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1153    /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1154    /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1155    /// inside the argument `exprs` (respectively). For the mathematical
1156    /// definition of "partial permutation", see:
1157    ///
1158    /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1159    pub fn find_longest_permutation(
1160        &self,
1161        exprs: &[Arc<dyn PhysicalExpr>],
1162    ) -> (LexOrdering, Vec<usize>) {
1163        let mut eq_properties = self.clone();
1164        let mut result = vec![];
1165        // The algorithm is as follows:
1166        // - Iterate over all the expressions and insert ordered expressions
1167        //   into the result.
1168        // - Treat inserted expressions as constants (i.e. add them as constants
1169        //   to the state).
1170        // - Continue the above procedure until no expression is inserted; i.e.
1171        //   the algorithm reaches a fixed point.
1172        // This algorithm should reach a fixed point in at most `exprs.len()`
1173        // iterations.
1174        let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1175        for _idx in 0..exprs.len() {
1176            // Get ordered expressions with their indices.
1177            let ordered_exprs = search_indices
1178                .iter()
1179                .flat_map(|&idx| {
1180                    let ExprProperties {
1181                        sort_properties, ..
1182                    } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1183                    match sort_properties {
1184                        SortProperties::Ordered(options) => Some((
1185                            PhysicalSortExpr {
1186                                expr: Arc::clone(&exprs[idx]),
1187                                options,
1188                            },
1189                            idx,
1190                        )),
1191                        SortProperties::Singleton => {
1192                            // Assign default ordering to constant expressions
1193                            let options = SortOptions::default();
1194                            Some((
1195                                PhysicalSortExpr {
1196                                    expr: Arc::clone(&exprs[idx]),
1197                                    options,
1198                                },
1199                                idx,
1200                            ))
1201                        }
1202                        SortProperties::Unordered => None,
1203                    }
1204                })
1205                .collect::<Vec<_>>();
1206            // We reached a fixed point, exit.
1207            if ordered_exprs.is_empty() {
1208                break;
1209            }
1210            // Remove indices that have an ordering from `search_indices`, and
1211            // treat ordered expressions as constants in subsequent iterations.
1212            // We can do this because the "next" key only matters in a lexicographical
1213            // ordering when the keys to its left have the same values.
1214            //
1215            // Note that these expressions are not properly "constants". This is just
1216            // an implementation strategy confined to this function.
1217            for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1218                eq_properties =
1219                    eq_properties.with_constants(std::iter::once(ConstExpr::from(expr)));
1220                search_indices.shift_remove(idx);
1221            }
1222            // Add new ordered section to the state.
1223            result.extend(ordered_exprs);
1224        }
1225        let (left, right) = result.into_iter().unzip();
1226        (LexOrdering::new(left), right)
1227    }
1228
1229    /// This function determines whether the provided expression is constant
1230    /// based on the known constants.
1231    ///
1232    /// # Parameters
1233    ///
1234    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1235    ///   expression to be checked.
1236    ///
1237    /// # Returns
1238    ///
1239    /// Returns `true` if the expression is constant according to equivalence
1240    /// group, `false` otherwise.
1241    pub fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
1242        // As an example, assume that we know columns `a` and `b` are constant.
1243        // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1244        // return `false`.
1245        let const_exprs = self
1246            .constants
1247            .iter()
1248            .map(|const_expr| Arc::clone(const_expr.expr()));
1249        let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1250        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1251        is_constant_recurse(&normalized_constants, &normalized_expr)
1252    }
1253
1254    /// This function determines whether the provided expression is constant
1255    /// across partitions based on the known constants.
1256    ///
1257    /// # Parameters
1258    ///
1259    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1260    ///   expression to be checked.
1261    ///
1262    /// # Returns
1263    ///
1264    /// Returns `true` if the expression is constant across all partitions according
1265    /// to equivalence group, `false` otherwise
1266    #[deprecated(
1267        since = "45.0.0",
1268        note = "Use [`is_expr_constant_across_partitions`] instead"
1269    )]
1270    pub fn is_expr_constant_accross_partitions(
1271        &self,
1272        expr: &Arc<dyn PhysicalExpr>,
1273    ) -> bool {
1274        self.is_expr_constant_across_partitions(expr)
1275    }
1276
1277    /// This function determines whether the provided expression is constant
1278    /// across partitions based on the known constants.
1279    ///
1280    /// # Parameters
1281    ///
1282    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1283    ///   expression to be checked.
1284    ///
1285    /// # Returns
1286    ///
1287    /// Returns `true` if the expression is constant across all partitions according
1288    /// to equivalence group, `false` otherwise.
1289    pub fn is_expr_constant_across_partitions(
1290        &self,
1291        expr: &Arc<dyn PhysicalExpr>,
1292    ) -> bool {
1293        // As an example, assume that we know columns `a` and `b` are constant.
1294        // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1295        // return `false`.
1296        let const_exprs = self
1297            .constants
1298            .iter()
1299            .filter_map(|const_expr| {
1300                if matches!(
1301                    const_expr.across_partitions(),
1302                    AcrossPartitions::Uniform { .. }
1303                ) {
1304                    Some(Arc::clone(const_expr.expr()))
1305                } else {
1306                    None
1307                }
1308            })
1309            .collect::<Vec<_>>();
1310        let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1311        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1312        is_constant_recurse(&normalized_constants, &normalized_expr)
1313    }
1314
1315    /// Retrieves the constant value of a given physical expression, if it exists.
1316    ///
1317    /// Normalizes the input expression and checks if it matches any known constants
1318    /// in the current context. Returns whether the expression has a uniform value,
1319    /// varies across partitions, or is not constant.
1320    ///
1321    /// # Parameters
1322    /// - `expr`: A reference to the physical expression to evaluate.
1323    ///
1324    /// # Returns
1325    /// - `AcrossPartitions::Uniform(value)`: If the expression has the same value across partitions.
1326    /// - `AcrossPartitions::Heterogeneous`: If the expression varies across partitions.
1327    /// - `None`: If the expression is not recognized as constant.
1328    pub fn get_expr_constant_value(
1329        &self,
1330        expr: &Arc<dyn PhysicalExpr>,
1331    ) -> AcrossPartitions {
1332        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1333
1334        if let Some(lit) = normalized_expr.as_any().downcast_ref::<Literal>() {
1335            return AcrossPartitions::Uniform(Some(lit.value().clone()));
1336        }
1337
1338        for const_expr in self.constants.iter() {
1339            if normalized_expr.eq(const_expr.expr()) {
1340                return const_expr.across_partitions();
1341            }
1342        }
1343
1344        AcrossPartitions::Heterogeneous
1345    }
1346
1347    /// Retrieves the properties for a given physical expression.
1348    ///
1349    /// This function constructs an [`ExprProperties`] object for the given
1350    /// expression, which encapsulates information about the expression's
1351    /// properties, including its [`SortProperties`] and [`Interval`].
1352    ///
1353    /// # Parameters
1354    ///
1355    /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1356    ///   for which ordering information is sought.
1357    ///
1358    /// # Returns
1359    ///
1360    /// Returns an [`ExprProperties`] object containing the ordering and range
1361    /// information for the given expression.
1362    pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1363        ExprPropertiesNode::new_unknown(expr)
1364            .transform_up(|expr| update_properties(expr, self))
1365            .data()
1366            .map(|node| node.data)
1367            .unwrap_or_else(|_| ExprProperties::new_unknown())
1368    }
1369
1370    /// Transforms this `EquivalenceProperties` into a new `EquivalenceProperties`
1371    /// by mapping columns in the original schema to columns in the new schema
1372    /// by index.
1373    pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
1374        // The new schema and the original schema is aligned when they have the
1375        // same number of columns, and fields at the same index have the same
1376        // type in both schemas.
1377        let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1378            && self
1379                .schema
1380                .fields
1381                .iter()
1382                .zip(schema.fields.iter())
1383                .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1384        if !schemas_aligned {
1385            // Rewriting equivalence properties in terms of new schema is not
1386            // safe when schemas are not aligned:
1387            return plan_err!(
1388                "Cannot rewrite old_schema:{:?} with new schema: {:?}",
1389                self.schema,
1390                schema
1391            );
1392        }
1393        // Rewrite constants according to new schema:
1394        let new_constants = self
1395            .constants
1396            .into_iter()
1397            .map(|const_expr| {
1398                let across_partitions = const_expr.across_partitions();
1399                let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?;
1400                Ok(ConstExpr::new(new_const_expr)
1401                    .with_across_partitions(across_partitions))
1402            })
1403            .collect::<Result<Vec<_>>>()?;
1404
1405        // Rewrite orderings according to new schema:
1406        let mut new_orderings = vec![];
1407        for ordering in self.oeq_class {
1408            let new_ordering = ordering
1409                .into_iter()
1410                .map(|mut sort_expr| {
1411                    sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
1412                    Ok(sort_expr)
1413                })
1414                .collect::<Result<_>>()?;
1415            new_orderings.push(new_ordering);
1416        }
1417
1418        // Rewrite equivalence classes according to the new schema:
1419        let mut eq_classes = vec![];
1420        for eq_class in self.eq_group {
1421            let new_eq_exprs = eq_class
1422                .into_vec()
1423                .into_iter()
1424                .map(|expr| with_new_schema(expr, &schema))
1425                .collect::<Result<_>>()?;
1426            eq_classes.push(EquivalenceClass::new(new_eq_exprs));
1427        }
1428
1429        // Construct the resulting equivalence properties:
1430        let mut result = EquivalenceProperties::new(schema);
1431        result.constants = new_constants;
1432        result.add_new_orderings(new_orderings);
1433        result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
1434
1435        Ok(result)
1436    }
1437}
1438
1439/// More readable display version of the `EquivalenceProperties`.
1440///
1441/// Format:
1442/// ```text
1443/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1]
1444/// ```
1445impl Display for EquivalenceProperties {
1446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1447        if self.eq_group.is_empty()
1448            && self.oeq_class.is_empty()
1449            && self.constants.is_empty()
1450        {
1451            return write!(f, "No properties");
1452        }
1453        if !self.oeq_class.is_empty() {
1454            write!(f, "order: {}", self.oeq_class)?;
1455        }
1456        if !self.eq_group.is_empty() {
1457            write!(f, ", eq: {}", self.eq_group)?;
1458        }
1459        if !self.constants.is_empty() {
1460            write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?;
1461        }
1462        Ok(())
1463    }
1464}
1465
1466/// Calculates the properties of a given [`ExprPropertiesNode`].
1467///
1468/// Order information can be retrieved as:
1469/// - If it is a leaf node, we directly find the order of the node by looking
1470///   at the given sort expression and equivalence properties if it is a `Column`
1471///   leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1472///   it as singleton so that it can cooperate with all ordered columns.
1473/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1474///   and operator has its own rules on how to propagate the children orderings.
1475///   However, before we engage in recursion, we check whether this intermediate
1476///   node directly matches with the sort expression. If there is a match, the
1477///   sort expression emerges at that node immediately, discarding the recursive
1478///   result coming from its children.
1479///
1480/// Range information is calculated as:
1481/// - If it is a `Literal` node, we set the range as a point value. If it is a
1482///   `Column` node, we set the datatype of the range, but cannot give an interval
1483///   for the range, yet.
1484/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1485///   and operator has its own rules on how to propagate the children range.
1486fn update_properties(
1487    mut node: ExprPropertiesNode,
1488    eq_properties: &EquivalenceProperties,
1489) -> Result<Transformed<ExprPropertiesNode>> {
1490    // First, try to gather the information from the children:
1491    if !node.expr.children().is_empty() {
1492        // We have an intermediate (non-leaf) node, account for its children:
1493        let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1494        node.data = node.expr.get_properties(&children_props)?;
1495    } else if node.expr.as_any().is::<Literal>() {
1496        // We have a Literal, which is one of the two possible leaf node types:
1497        node.data = node.expr.get_properties(&[])?;
1498    } else if node.expr.as_any().is::<Column>() {
1499        // We have a Column, which is the other possible leaf node type:
1500        node.data.range =
1501            Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1502    }
1503    // Now, check what we know about orderings:
1504    let normalized_expr = eq_properties
1505        .eq_group
1506        .normalize_expr(Arc::clone(&node.expr));
1507    let oeq_class = eq_properties.normalized_oeq_class();
1508    if eq_properties.is_expr_constant(&normalized_expr)
1509        || oeq_class.is_expr_partial_const(&normalized_expr)
1510    {
1511        node.data.sort_properties = SortProperties::Singleton;
1512    } else if let Some(options) = oeq_class.get_options(&normalized_expr) {
1513        node.data.sort_properties = SortProperties::Ordered(options);
1514    }
1515    Ok(Transformed::yes(node))
1516}
1517
1518/// This function determines whether the provided expression is constant
1519/// based on the known constants.
1520///
1521/// # Parameters
1522///
1523/// - `constants`: A `&[Arc<dyn PhysicalExpr>]` containing expressions known to
1524///   be a constant.
1525/// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the expression
1526///   to check.
1527///
1528/// # Returns
1529///
1530/// Returns `true` if the expression is constant according to equivalence
1531/// group, `false` otherwise.
1532fn is_constant_recurse(
1533    constants: &[Arc<dyn PhysicalExpr>],
1534    expr: &Arc<dyn PhysicalExpr>,
1535) -> bool {
1536    if physical_exprs_contains(constants, expr) || expr.as_any().is::<Literal>() {
1537        return true;
1538    }
1539    let children = expr.children();
1540    !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
1541}
1542
1543/// This function examines whether a referring expression directly refers to a
1544/// given referred expression or if any of its children in the expression tree
1545/// refer to the specified expression.
1546///
1547/// # Parameters
1548///
1549/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1550/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1551///
1552/// # Returns
1553///
1554/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1555/// `referred_expr` or not.
1556fn expr_refers(
1557    referring_expr: &Arc<dyn PhysicalExpr>,
1558    referred_expr: &Arc<dyn PhysicalExpr>,
1559) -> bool {
1560    referring_expr.eq(referred_expr)
1561        || referring_expr
1562            .children()
1563            .iter()
1564            .any(|child| expr_refers(child, referred_expr))
1565}
1566
1567/// This function examines the given expression and its properties to determine
1568/// the ordering properties of the expression. The range knowledge is not utilized
1569/// yet in the scope of this function.
1570///
1571/// # Parameters
1572///
1573/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1574///   which ordering properties need to be determined.
1575/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1576///   referred to by `expr`.
1577/// - `schema``: A reference to the schema which the `expr` columns refer.
1578///
1579/// # Returns
1580///
1581/// A `SortProperties` indicating the ordering information of the given expression.
1582fn get_expr_properties(
1583    expr: &Arc<dyn PhysicalExpr>,
1584    dependencies: &Dependencies,
1585    schema: &SchemaRef,
1586) -> Result<ExprProperties> {
1587    if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1588        // If exact match is found, return its ordering.
1589        Ok(ExprProperties {
1590            sort_properties: SortProperties::Ordered(column_order.options),
1591            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1592            preserves_lex_ordering: false,
1593        })
1594    } else if expr.as_any().downcast_ref::<Column>().is_some() {
1595        Ok(ExprProperties {
1596            sort_properties: SortProperties::Unordered,
1597            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1598            preserves_lex_ordering: false,
1599        })
1600    } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1601        Ok(ExprProperties {
1602            sort_properties: SortProperties::Singleton,
1603            range: literal.value().into(),
1604            preserves_lex_ordering: true,
1605        })
1606    } else {
1607        // Find orderings of its children
1608        let child_states = expr
1609            .children()
1610            .iter()
1611            .map(|child| get_expr_properties(child, dependencies, schema))
1612            .collect::<Result<Vec<_>>>()?;
1613        // Calculate expression ordering using ordering of its children.
1614        expr.get_properties(&child_states)
1615    }
1616}
1617
1618/// Wrapper struct for `Arc<dyn PhysicalExpr>` to use them as keys in a hash map.
1619#[derive(Debug, Clone)]
1620struct ExprWrapper(Arc<dyn PhysicalExpr>);
1621
1622impl PartialEq<Self> for ExprWrapper {
1623    fn eq(&self, other: &Self) -> bool {
1624        self.0.eq(&other.0)
1625    }
1626}
1627
1628impl Eq for ExprWrapper {}
1629
1630impl Hash for ExprWrapper {
1631    fn hash<H: Hasher>(&self, state: &mut H) {
1632        self.0.hash(state);
1633    }
1634}
1635
1636#[cfg(test)]
1637mod tests {
1638
1639    use super::*;
1640    use crate::expressions::{col, BinaryExpr};
1641
1642    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1643    use datafusion_expr::Operator;
1644
1645    #[test]
1646    fn test_expr_consists_of_constants() -> Result<()> {
1647        let schema = Arc::new(Schema::new(vec![
1648            Field::new("a", DataType::Int32, true),
1649            Field::new("b", DataType::Int32, true),
1650            Field::new("c", DataType::Int32, true),
1651            Field::new("d", DataType::Int32, true),
1652            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1653        ]));
1654        let col_a = col("a", &schema)?;
1655        let col_b = col("b", &schema)?;
1656        let col_d = col("d", &schema)?;
1657        let b_plus_d = Arc::new(BinaryExpr::new(
1658            Arc::clone(&col_b),
1659            Operator::Plus,
1660            Arc::clone(&col_d),
1661        )) as Arc<dyn PhysicalExpr>;
1662
1663        let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b)];
1664        let expr = Arc::clone(&b_plus_d);
1665        assert!(!is_constant_recurse(&constants, &expr));
1666
1667        let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_d)];
1668        let expr = Arc::clone(&b_plus_d);
1669        assert!(is_constant_recurse(&constants, &expr));
1670        Ok(())
1671    }
1672}