datafusion_physical_expr/equivalence/properties/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22pub use joins::*;
23pub use union::*;
24
25use std::fmt::{self, Display};
26use std::mem;
27use std::sync::Arc;
28
29use self::dependency::{
30    construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
31    Dependencies, DependencyMap,
32};
33use crate::equivalence::{
34    AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
35};
36use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
37use crate::{
38    ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
39    PhysicalSortRequirement,
40};
41
42use arrow::datatypes::SchemaRef;
43use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
44use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
45use datafusion_expr::interval_arithmetic::Interval;
46use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
47use datafusion_physical_expr_common::sort_expr::options_compatible;
48use datafusion_physical_expr_common::utils::ExprPropertiesNode;
49
50use indexmap::IndexSet;
51use itertools::Itertools;
52
53/// `EquivalenceProperties` stores information about the output of a plan node
54/// that can be used to optimize the plan. Currently, it keeps track of:
55/// - Sort expressions (orderings),
56/// - Equivalent expressions; i.e. expressions known to have the same value.
57/// - Constants expressions; i.e. expressions known to contain a single constant
58///   value.
59///
60/// Please see the [Using Ordering for Better Plans] blog for more details.
61///
62/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties` tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example, if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case,  columns `a` and `b` always have the same value. With this
101/// information, Datafusion can optimize various operations. For example, if
102/// the partition requirement is `Hash(a)` and output partitioning is
103/// `Hash(b)`, then DataFusion avoids repartitioning the data as the existing
104/// partitioning satisfies the requirement.
105///
106/// # Code Example
107/// ```
108/// # use std::sync::Arc;
109/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
110/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
111/// # use datafusion_physical_expr::expressions::col;
112/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
113/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
114/// #   Field::new("a", DataType::Int32, false),
115/// #   Field::new("b", DataType::Int32, false),
116/// #   Field::new("c", DataType::Int32, false),
117/// # ]));
118/// # let col_a = col("a", &schema).unwrap();
119/// # let col_b = col("b", &schema).unwrap();
120/// # let col_c = col("c", &schema).unwrap();
121/// // This object represents data that is sorted by a ASC, c DESC
122/// // with a single constant value of b
123/// let mut eq_properties = EquivalenceProperties::new(schema);
124/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
125/// eq_properties.add_ordering([
126///   PhysicalSortExpr::new_default(col_a).asc(),
127///   PhysicalSortExpr::new_default(col_c).desc(),
128/// ]);
129///
130/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: (heterogeneous)}]");
131/// ```
132#[derive(Clone, Debug)]
133pub struct EquivalenceProperties {
134    /// Distinct equivalence classes (i.e. expressions with the same value).
135    eq_group: EquivalenceGroup,
136    /// Equivalent sort expressions (i.e. those define the same ordering).
137    oeq_class: OrderingEquivalenceClass,
138    /// Cache storing equivalent sort expressions in normal form (i.e. without
139    /// constants/duplicates and in standard form) and a map associating leading
140    /// terms with full sort expressions.
141    oeq_cache: OrderingEquivalenceCache,
142    /// Table constraints that factor in equivalence calculations.
143    constraints: Constraints,
144    /// Schema associated with this object.
145    schema: SchemaRef,
146}
147
148/// This object serves as a cache for storing equivalent sort expressions
149/// in normal form, and a map associating leading sort expressions with
150/// full lexicographical orderings. With this information, DataFusion can
151/// efficiently determine whether a given ordering is satisfied by the
152/// existing orderings, and discover new orderings based on the existing
153/// equivalence properties.
154#[derive(Clone, Debug, Default)]
155struct OrderingEquivalenceCache {
156    /// Equivalent sort expressions in normal form.
157    normal_cls: OrderingEquivalenceClass,
158    /// Map associating leading sort expressions with full lexicographical
159    /// orderings. Values are indices into `normal_cls`.
160    leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
161}
162
163impl OrderingEquivalenceCache {
164    /// Creates a new `OrderingEquivalenceCache` object with the given
165    /// equivalent orderings, which should be in normal form.
166    pub fn new(
167        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
168    ) -> Self {
169        let mut cache = Self {
170            normal_cls: OrderingEquivalenceClass::new(orderings),
171            leading_map: HashMap::new(),
172        };
173        cache.update_map();
174        cache
175    }
176
177    /// Updates/reconstructs the leading expression map according to the normal
178    /// ordering equivalence class within.
179    pub fn update_map(&mut self) {
180        self.leading_map.clear();
181        for (idx, ordering) in self.normal_cls.iter().enumerate() {
182            let expr = Arc::clone(&ordering.first().expr);
183            self.leading_map.entry(expr).or_default().push(idx);
184        }
185    }
186
187    /// Clears the cache, removing all orderings and leading expressions.
188    pub fn clear(&mut self) {
189        self.normal_cls.clear();
190        self.leading_map.clear();
191    }
192}
193
194impl EquivalenceProperties {
195    /// Creates an empty `EquivalenceProperties` object.
196    pub fn new(schema: SchemaRef) -> Self {
197        Self {
198            eq_group: EquivalenceGroup::default(),
199            oeq_class: OrderingEquivalenceClass::default(),
200            oeq_cache: OrderingEquivalenceCache::default(),
201            constraints: Constraints::default(),
202            schema,
203        }
204    }
205
206    /// Adds constraints to the properties.
207    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
208        self.constraints = constraints;
209        self
210    }
211
212    /// Creates a new `EquivalenceProperties` object with the given orderings.
213    pub fn new_with_orderings(
214        schema: SchemaRef,
215        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
216    ) -> Self {
217        let eq_group = EquivalenceGroup::default();
218        let oeq_class = OrderingEquivalenceClass::new(orderings);
219        // Here, we can avoid performing a full normalization, and get by with
220        // only removing constants because the equivalence group is empty.
221        let normal_orderings = oeq_class.iter().cloned().map(|o| {
222            o.into_iter()
223                .filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
224        });
225        Self {
226            oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
227            oeq_class,
228            eq_group,
229            constraints: Constraints::default(),
230            schema,
231        }
232    }
233
234    /// Returns the associated schema.
235    pub fn schema(&self) -> &SchemaRef {
236        &self.schema
237    }
238
239    /// Returns a reference to the ordering equivalence class within.
240    pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
241        &self.oeq_class
242    }
243
244    /// Returns a reference to the equivalence group within.
245    pub fn eq_group(&self) -> &EquivalenceGroup {
246        &self.eq_group
247    }
248
249    /// Returns a reference to the constraints within.
250    pub fn constraints(&self) -> &Constraints {
251        &self.constraints
252    }
253
254    /// Returns all the known constants expressions.
255    pub fn constants(&self) -> Vec<ConstExpr> {
256        self.eq_group
257            .iter()
258            .filter_map(|c| {
259                c.constant.as_ref().and_then(|across| {
260                    c.canonical_expr()
261                        .map(|expr| ConstExpr::new(Arc::clone(expr), across.clone()))
262                })
263            })
264            .collect()
265    }
266
267    /// Returns the output ordering of the properties.
268    pub fn output_ordering(&self) -> Option<LexOrdering> {
269        let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
270        self.normalize_sort_exprs(concat)
271    }
272
273    /// Extends this `EquivalenceProperties` with the `other` object.
274    pub fn extend(mut self, other: Self) -> Result<Self> {
275        self.constraints.extend(other.constraints);
276        self.add_equivalence_group(other.eq_group)?;
277        self.add_orderings(other.oeq_class);
278        Ok(self)
279    }
280
281    /// Clears (empties) the ordering equivalence class within this object.
282    /// Call this method when existing orderings are invalidated.
283    pub fn clear_orderings(&mut self) {
284        self.oeq_class.clear();
285        self.oeq_cache.clear();
286    }
287
288    /// Removes constant expressions that may change across partitions.
289    /// This method should be used when merging data from different partitions.
290    pub fn clear_per_partition_constants(&mut self) {
291        if self.eq_group.clear_per_partition_constants() {
292            // Renormalize orderings if the equivalence group changes:
293            let normal_orderings = self
294                .oeq_class
295                .iter()
296                .cloned()
297                .map(|o| self.eq_group.normalize_sort_exprs(o));
298            self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
299        }
300    }
301
302    /// Adds new orderings into the existing ordering equivalence class.
303    pub fn add_orderings(
304        &mut self,
305        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
306    ) {
307        let orderings: Vec<_> =
308            orderings.into_iter().filter_map(LexOrdering::new).collect();
309        let normal_orderings: Vec<_> = orderings
310            .iter()
311            .cloned()
312            .filter_map(|o| self.normalize_sort_exprs(o))
313            .collect();
314        if !normal_orderings.is_empty() {
315            self.oeq_class.extend(orderings);
316            // Normalize given orderings to update the cache:
317            self.oeq_cache.normal_cls.extend(normal_orderings);
318            // TODO: If no ordering is found to be redunant during extension, we
319            //       can use a shortcut algorithm to update the leading map.
320            self.oeq_cache.update_map();
321        }
322    }
323
324    /// Adds a single ordering to the existing ordering equivalence class.
325    pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
326        self.add_orderings(std::iter::once(ordering));
327    }
328
329    /// Incorporates the given equivalence group to into the existing
330    /// equivalence group within.
331    pub fn add_equivalence_group(
332        &mut self,
333        other_eq_group: EquivalenceGroup,
334    ) -> Result<()> {
335        if !other_eq_group.is_empty() {
336            self.eq_group.extend(other_eq_group);
337            // Renormalize orderings if the equivalence group changes:
338            let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
339            let normal_orderings = normal_cls
340                .into_iter()
341                .map(|o| self.eq_group.normalize_sort_exprs(o));
342            self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
343            self.oeq_cache.update_map();
344            // Discover any new orderings based on the new equivalence classes:
345            let leading_exprs: Vec<_> =
346                self.oeq_cache.leading_map.keys().cloned().collect();
347            for expr in leading_exprs {
348                self.discover_new_orderings(expr)?;
349            }
350        }
351        Ok(())
352    }
353
354    /// Returns the ordering equivalence class within in normal form.
355    /// Normalization standardizes expressions according to the equivalence
356    /// group within, and removes constants/duplicates.
357    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
358        self.oeq_class
359            .iter()
360            .cloned()
361            .filter_map(|ordering| self.normalize_sort_exprs(ordering))
362            .collect::<Vec<_>>()
363            .into()
364    }
365
366    /// Adds a new equality condition into the existing equivalence group.
367    /// If the given equality defines a new equivalence class, adds this new
368    /// equivalence class to the equivalence group.
369    pub fn add_equal_conditions(
370        &mut self,
371        left: Arc<dyn PhysicalExpr>,
372        right: Arc<dyn PhysicalExpr>,
373    ) -> Result<()> {
374        // Add equal expressions to the state:
375        if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
376            // Renormalize orderings if the equivalence group changes:
377            let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
378            let normal_orderings = normal_cls
379                .into_iter()
380                .map(|o| self.eq_group.normalize_sort_exprs(o));
381            self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
382            self.oeq_cache.update_map();
383            // Discover any new orderings:
384            self.discover_new_orderings(left)?;
385        }
386        Ok(())
387    }
388
389    /// Track/register physical expressions with constant values.
390    pub fn add_constants(
391        &mut self,
392        constants: impl IntoIterator<Item = ConstExpr>,
393    ) -> Result<()> {
394        // Add the new constant to the equivalence group:
395        for constant in constants {
396            self.eq_group.add_constant(constant);
397        }
398        // Renormalize the orderings after adding new constants by removing
399        // the constants from existing orderings:
400        let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
401        let normal_orderings = normal_cls.into_iter().map(|ordering| {
402            ordering.into_iter().filter(|sort_expr| {
403                self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
404            })
405        });
406        self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
407        self.oeq_cache.update_map();
408        // Discover any new orderings based on the constants:
409        let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
410        for expr in leading_exprs {
411            self.discover_new_orderings(expr)?;
412        }
413        Ok(())
414    }
415
416    /// Discover new valid orderings in light of a new equality. Accepts a single
417    /// argument (`expr`) which is used to determine the orderings to update.
418    /// When constants or equivalence classes change, there may be new orderings
419    /// that can be discovered with the new equivalence properties.
420    /// For a discussion, see: <https://github.com/apache/datafusion/issues/9812>
421    fn discover_new_orderings(
422        &mut self,
423        normal_expr: Arc<dyn PhysicalExpr>,
424    ) -> Result<()> {
425        let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
426            return Ok(());
427        };
428        let eq_class = self
429            .eq_group
430            .get_equivalence_class(&normal_expr)
431            .map_or_else(|| vec![normal_expr], |class| class.clone().into());
432
433        let mut new_orderings = vec![];
434        for idx in ordering_idxs {
435            let ordering = &self.oeq_cache.normal_cls[*idx];
436            let leading_ordering_options = ordering[0].options;
437
438            'exprs: for equivalent_expr in &eq_class {
439                let children = equivalent_expr.children();
440                if children.is_empty() {
441                    continue;
442                }
443                // Check if all children match the next expressions in the ordering:
444                let mut child_properties = vec![];
445                // Build properties for each child based on the next expression:
446                for (i, child) in children.into_iter().enumerate() {
447                    let Some(next) = ordering.get(i + 1) else {
448                        break 'exprs;
449                    };
450                    if !next.expr.eq(child) {
451                        break 'exprs;
452                    }
453                    let data_type = child.data_type(&self.schema)?;
454                    child_properties.push(ExprProperties {
455                        sort_properties: SortProperties::Ordered(next.options),
456                        range: Interval::make_unbounded(&data_type)?,
457                        preserves_lex_ordering: true,
458                    });
459                }
460                // Check if the expression is monotonic in all arguments:
461                let expr_properties =
462                    equivalent_expr.get_properties(&child_properties)?;
463                if expr_properties.preserves_lex_ordering
464                    && expr_properties.sort_properties
465                        == SortProperties::Ordered(leading_ordering_options)
466                {
467                    // Assume that `[c ASC, a ASC, b ASC]` is among existing
468                    // orderings. If equality `c = f(a, b)` is given, ordering
469                    // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
470                    // ordering `[a ASC, b ASC]` is also a valid ordering.
471                    new_orderings.push(ordering[1..].to_vec());
472                    break;
473                }
474            }
475        }
476
477        if !new_orderings.is_empty() {
478            self.add_orderings(new_orderings);
479        }
480        Ok(())
481    }
482
483    /// Updates the ordering equivalence class within assuming that the table
484    /// is re-sorted according to the argument `ordering`, and returns whether
485    /// this operation resulted in any change. Note that equivalence classes
486    /// (and constants) do not change as they are unaffected by a re-sort. If
487    /// the given ordering is already satisfied, the function does nothing.
488    pub fn reorder(
489        &mut self,
490        ordering: impl IntoIterator<Item = PhysicalSortExpr>,
491    ) -> Result<bool> {
492        let (ordering, ordering_tee) = ordering.into_iter().tee();
493        // First, standardize the given ordering:
494        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
495            // If the ordering vanishes after normalization, it is satisfied:
496            return Ok(false);
497        };
498        if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
499            // If the ordering is unsatisfied, replace existing orderings:
500            self.clear_orderings();
501            self.add_ordering(ordering_tee);
502            return Ok(true);
503        }
504        Ok(false)
505    }
506
507    /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
508    /// equivalence group within. Returns a `LexOrdering` instance if the
509    /// expressions define a proper lexicographical ordering. For more details,
510    /// see [`EquivalenceGroup::normalize_sort_exprs`].
511    pub fn normalize_sort_exprs(
512        &self,
513        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
514    ) -> Option<LexOrdering> {
515        LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
516    }
517
518    /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
519    /// equivalence group within. Returns a `LexRequirement` instance if the
520    /// expressions define a proper lexicographical requirement. For more
521    /// details, see [`EquivalenceGroup::normalize_sort_exprs`].
522    pub fn normalize_sort_requirements(
523        &self,
524        sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
525    ) -> Option<LexRequirement> {
526        LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
527    }
528
529    /// Iteratively checks whether the given ordering is satisfied by any of
530    /// the existing orderings. See [`Self::ordering_satisfy_requirement`] for
531    /// more details and examples.
532    pub fn ordering_satisfy(
533        &self,
534        given: impl IntoIterator<Item = PhysicalSortExpr>,
535    ) -> Result<bool> {
536        // First, standardize the given ordering:
537        let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
538            // If the ordering vanishes after normalization, it is satisfied:
539            return Ok(true);
540        };
541        Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
542    }
543
544    /// Iteratively checks whether the given sort requirement is satisfied by
545    /// any of the existing orderings.
546    ///
547    /// ### Example Scenarios
548    ///
549    /// In these scenarios, assume that all expressions share the same sort
550    /// properties.
551    ///
552    /// #### Case 1: Sort Requirement `[a, c]`
553    ///
554    /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
555    /// 1. The function first checks the leading requirement `a`, which is
556    ///    satisfied by `[a, b, c].first()`.
557    /// 2. `a` is added as a constant for the next iteration.
558    /// 3. Normal orderings become `[[b, c], [d]]`.
559    /// 4. The function fails for `c` in the second iteration, as neither
560    ///    `[b, c]` nor `[d]` satisfies `c`.
561    ///
562    /// #### Case 2: Sort Requirement `[a, d]`
563    ///
564    /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
565    /// 1. The function first checks the leading requirement `a`, which is
566    ///    satisfied by `[a, b, c].first()`.
567    /// 2. `a` is added as a constant for the next iteration.
568    /// 3. Normal orderings become `[[b, c], [d]]`.
569    /// 4. The function returns `true` as `[d]` satisfies `d`.
570    pub fn ordering_satisfy_requirement(
571        &self,
572        given: impl IntoIterator<Item = PhysicalSortRequirement>,
573    ) -> Result<bool> {
574        // First, standardize the given requirement:
575        let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
576            // If the requirement vanishes after normalization, it is satisfied:
577            return Ok(true);
578        };
579        // Then, check whether given requirement is satisfied by constraints:
580        if self.satisfied_by_constraints(&normal_reqs) {
581            return Ok(true);
582        }
583        let schema = self.schema();
584        let mut eq_properties = self.clone();
585        for element in normal_reqs {
586            // Check whether given requirement is satisfied:
587            let ExprProperties {
588                sort_properties, ..
589            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
590            let satisfy = match sort_properties {
591                SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
592                    let nullable = element.expr.nullable(schema).unwrap_or(true);
593                    options_compatible(&options, &opts, nullable)
594                }),
595                // Singleton expressions satisfy any requirement.
596                SortProperties::Singleton => true,
597                SortProperties::Unordered => false,
598            };
599            if !satisfy {
600                return Ok(false);
601            }
602            // Treat satisfied keys as constants in subsequent iterations. We
603            // can do this because the "next" key only matters in a lexicographical
604            // ordering when the keys to its left have the same values.
605            //
606            // Note that these expressions are not properly "constants". This is just
607            // an implementation strategy confined to this function.
608            //
609            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
610            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
611            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
612            // we add column `a` as constant to the algorithm state. This enables us
613            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
614            let const_expr = ConstExpr::from(element.expr);
615            eq_properties.add_constants(std::iter::once(const_expr))?;
616        }
617        Ok(true)
618    }
619
620    /// Returns the number of consecutive sort expressions (starting from the
621    /// left) that are satisfied by the existing ordering.
622    fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
623        let full_length = normal_ordering.len();
624        // Check whether the given ordering is satisfied by constraints:
625        if self.satisfied_by_constraints_ordering(normal_ordering) {
626            // If constraints satisfy all sort expressions, return the full
627            // length:
628            return Ok(full_length);
629        }
630        let schema = self.schema();
631        let mut eq_properties = self.clone();
632        for (idx, element) in normal_ordering.into_iter().enumerate() {
633            // Check whether given ordering is satisfied:
634            let ExprProperties {
635                sort_properties, ..
636            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
637            let satisfy = match sort_properties {
638                SortProperties::Ordered(options) => options_compatible(
639                    &options,
640                    &element.options,
641                    element.expr.nullable(schema).unwrap_or(true),
642                ),
643                // Singleton expressions satisfy any ordering.
644                SortProperties::Singleton => true,
645                SortProperties::Unordered => false,
646            };
647            if !satisfy {
648                // As soon as one sort expression is unsatisfied, return how
649                // many we've satisfied so far:
650                return Ok(idx);
651            }
652            // Treat satisfied keys as constants in subsequent iterations. We
653            // can do this because the "next" key only matters in a lexicographical
654            // ordering when the keys to its left have the same values.
655            //
656            // Note that these expressions are not properly "constants". This is just
657            // an implementation strategy confined to this function.
658            //
659            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
660            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
661            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
662            // we add column `a` as constant to the algorithm state. This enables us
663            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
664            let const_expr = ConstExpr::from(Arc::clone(&element.expr));
665            eq_properties.add_constants(std::iter::once(const_expr))?
666        }
667        // All sort expressions are satisfied, return full length:
668        Ok(full_length)
669    }
670
671    /// Determines the longest normal prefix of `ordering` satisfied by the
672    /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
673    /// boolean indicating whether all the sort expressions are satisfied.
674    pub fn extract_common_sort_prefix(
675        &self,
676        ordering: LexOrdering,
677    ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
678        // First, standardize the given ordering:
679        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
680            // If the ordering vanishes after normalization, it is satisfied:
681            return Ok((vec![], true));
682        };
683        let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
684        let flag = prefix_len == normal_ordering.len();
685        let mut sort_exprs: Vec<_> = normal_ordering.into();
686        if !flag {
687            sort_exprs.truncate(prefix_len);
688        }
689        Ok((sort_exprs, flag))
690    }
691
692    /// Checks if the sort expressions are satisfied by any of the table
693    /// constraints (primary key or unique). Returns true if any constraint
694    /// fully satisfies the expressions (i.e. constraint indices form a valid
695    /// prefix of an existing ordering that matches the expressions). For
696    /// unique constraints, also verifies nullable columns.
697    fn satisfied_by_constraints_ordering(
698        &self,
699        normal_exprs: &[PhysicalSortExpr],
700    ) -> bool {
701        self.constraints.iter().any(|constraint| match constraint {
702            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
703                let check_null = matches!(constraint, Constraint::Unique(_));
704                let normalized_size = normal_exprs.len();
705                indices.len() <= normalized_size
706                    && self.oeq_class.iter().any(|ordering| {
707                        let length = ordering.len();
708                        if indices.len() > length || normalized_size < length {
709                            return false;
710                        }
711                        // Build a map of column positions in the ordering:
712                        let mut col_positions = HashMap::with_capacity(length);
713                        for (pos, req) in ordering.iter().enumerate() {
714                            if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
715                            {
716                                let nullable = col.nullable(&self.schema).unwrap_or(true);
717                                col_positions.insert(col.index(), (pos, nullable));
718                            }
719                        }
720                        // Check if all constraint indices appear in valid positions:
721                        if !indices.iter().all(|idx| {
722                            col_positions.get(idx).is_some_and(|&(pos, nullable)| {
723                                // For unique constraints, verify column is not nullable if it's first/last:
724                                !check_null
725                                    || !nullable
726                                    || (pos != 0 && pos != length - 1)
727                            })
728                        }) {
729                            return false;
730                        }
731                        // Check if this ordering matches the prefix:
732                        normal_exprs.iter().zip(ordering).all(|(given, existing)| {
733                            existing.satisfy_expr(given, &self.schema)
734                        })
735                    })
736            }
737        })
738    }
739
740    /// Checks if the sort requirements are satisfied by any of the table
741    /// constraints (primary key or unique). Returns true if any constraint
742    /// fully satisfies the requirements (i.e. constraint indices form a valid
743    /// prefix of an existing ordering that matches the requirements). For
744    /// unique constraints, also verifies nullable columns.
745    fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
746        self.constraints.iter().any(|constraint| match constraint {
747            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
748                let check_null = matches!(constraint, Constraint::Unique(_));
749                let normalized_size = normal_reqs.len();
750                indices.len() <= normalized_size
751                    && self.oeq_class.iter().any(|ordering| {
752                        let length = ordering.len();
753                        if indices.len() > length || normalized_size < length {
754                            return false;
755                        }
756                        // Build a map of column positions in the ordering:
757                        let mut col_positions = HashMap::with_capacity(length);
758                        for (pos, req) in ordering.iter().enumerate() {
759                            if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
760                            {
761                                let nullable = col.nullable(&self.schema).unwrap_or(true);
762                                col_positions.insert(col.index(), (pos, nullable));
763                            }
764                        }
765                        // Check if all constraint indices appear in valid positions:
766                        if !indices.iter().all(|idx| {
767                            col_positions.get(idx).is_some_and(|&(pos, nullable)| {
768                                // For unique constraints, verify column is not nullable if it's first/last:
769                                !check_null
770                                    || !nullable
771                                    || (pos != 0 && pos != length - 1)
772                            })
773                        }) {
774                            return false;
775                        }
776                        // Check if this ordering matches the prefix:
777                        normal_reqs.iter().zip(ordering).all(|(given, existing)| {
778                            existing.satisfy(given, &self.schema)
779                        })
780                    })
781            }
782        })
783    }
784
785    /// Checks whether the `given` sort requirements are equal or more specific
786    /// than the `reference` sort requirements.
787    pub fn requirements_compatible(
788        &self,
789        given: LexRequirement,
790        reference: LexRequirement,
791    ) -> bool {
792        let Some(normal_given) = self.normalize_sort_requirements(given) else {
793            return true;
794        };
795        let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
796            return true;
797        };
798
799        (normal_reference.len() <= normal_given.len())
800            && normal_reference
801                .into_iter()
802                .zip(normal_given)
803                .all(|(reference, given)| given.compatible(&reference))
804    }
805
806    /// Modify existing orderings by substituting sort expressions with appropriate
807    /// targets from the projection mapping. We substitute a sort expression when
808    /// its physical expression has a one-to-one functional relationship with a
809    /// target expression in the mapping.
810    ///
811    /// After substitution, we may generate more than one `LexOrdering` for each
812    /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
813    /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying projection
814    /// expressions `a, b, CAST(a)`.
815    ///
816    /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is
817    ///       sorted, `atan(x + 1000)` should also be substituted. For now, we
818    ///       only consider single-column `CAST` expressions.
819    fn substitute_oeq_class(
820        schema: &SchemaRef,
821        mapping: &ProjectionMapping,
822        oeq_class: OrderingEquivalenceClass,
823    ) -> OrderingEquivalenceClass {
824        let new_orderings = oeq_class.into_iter().flat_map(|order| {
825            // Modify/expand existing orderings by substituting sort
826            // expressions with appropriate targets from the mapping:
827            order
828                .into_iter()
829                .map(|sort_expr| {
830                    let referring_exprs = mapping
831                        .iter()
832                        .map(|(source, _target)| source)
833                        .filter(|source| expr_refers(source, &sort_expr.expr))
834                        .cloned();
835                    let mut result = vec![];
836                    // The sort expression comes from this schema, so the
837                    // following call to `unwrap` is safe.
838                    let expr_type = sort_expr.expr.data_type(schema).unwrap();
839                    // TODO: Add one-to-one analysis for ScalarFunctions.
840                    for r_expr in referring_exprs {
841                        // We check whether this expression is substitutable.
842                        if let Some(cast_expr) =
843                            r_expr.as_any().downcast_ref::<CastExpr>()
844                        {
845                            // For casts, we need to know whether the cast
846                            // expression matches:
847                            if cast_expr.expr.eq(&sort_expr.expr)
848                                && cast_expr.is_bigger_cast(&expr_type)
849                            {
850                                result.push(PhysicalSortExpr::new(
851                                    r_expr,
852                                    sort_expr.options,
853                                ));
854                            }
855                        }
856                    }
857                    result.push(sort_expr);
858                    result
859                })
860                // Generate all valid orderings given substituted expressions:
861                .multi_cartesian_product()
862        });
863        OrderingEquivalenceClass::new(new_orderings)
864    }
865
866    /// Projects argument `expr` according to the projection described by
867    /// `mapping`, taking equivalences into account.
868    ///
869    /// For example, assume that columns `a` and `c` are always equal, and that
870    /// the projection described by `mapping` encodes the following:
871    ///
872    /// ```text
873    /// a -> a1
874    /// b -> b1
875    /// ```
876    ///
877    /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
878    /// `Some(a1 + b1)` and `d` to `None`, meaning that it is not projectable.
879    pub fn project_expr(
880        &self,
881        expr: &Arc<dyn PhysicalExpr>,
882        mapping: &ProjectionMapping,
883    ) -> Option<Arc<dyn PhysicalExpr>> {
884        self.eq_group.project_expr(mapping, expr)
885    }
886
887    /// Projects the given `expressions` according to the projection described
888    /// by `mapping`, taking equivalences into account. This function is similar
889    /// to [`Self::project_expr`], but projects multiple expressions at once
890    /// more efficiently than calling `project_expr` for each expression.
891    pub fn project_expressions<'a>(
892        &'a self,
893        expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
894        mapping: &'a ProjectionMapping,
895    ) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
896        self.eq_group.project_expressions(mapping, expressions)
897    }
898
899    /// Constructs a dependency map based on existing orderings referred to in
900    /// the projection.
901    ///
902    /// This function analyzes the orderings in the normalized order-equivalence
903    /// class and builds a dependency map. The dependency map captures relationships
904    /// between expressions within the orderings, helping to identify dependencies
905    /// and construct valid projected orderings during projection operations.
906    ///
907    /// # Parameters
908    ///
909    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
910    ///   relationship between source and target expressions.
911    ///
912    /// # Returns
913    ///
914    /// A [`DependencyMap`] representing the dependency map, where each
915    /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
916    ///
917    /// # Example
918    ///
919    /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
920    /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
921    /// Then, the dependency map will be:
922    ///
923    /// ```text
924    /// a ASC: Node {Some(a_new ASC), HashSet{}}
925    /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
926    /// c ASC: Node {None, HashSet{a ASC}}
927    /// ```
928    fn construct_dependency_map(
929        &self,
930        oeq_class: OrderingEquivalenceClass,
931        mapping: &ProjectionMapping,
932    ) -> DependencyMap {
933        let mut map = DependencyMap::default();
934        for ordering in oeq_class.into_iter() {
935            // Previous expression is a dependency. Note that there is no
936            // dependency for the leading expression.
937            if !self.insert_to_dependency_map(
938                mapping,
939                ordering[0].clone(),
940                None,
941                &mut map,
942            ) {
943                continue;
944            }
945            for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
946                if !self.insert_to_dependency_map(
947                    mapping,
948                    sort_expr,
949                    Some(dependency),
950                    &mut map,
951                ) {
952                    // If we can't project, stop constructing the dependency map
953                    // as remaining dependencies will be invalid post projection.
954                    break;
955                }
956            }
957        }
958        map
959    }
960
961    /// Projects the sort expression according to the projection mapping and
962    /// inserts it into the dependency map with the given dependency. Returns
963    /// a boolean flag indicating whether the given expression is projectable.
964    fn insert_to_dependency_map(
965        &self,
966        mapping: &ProjectionMapping,
967        sort_expr: PhysicalSortExpr,
968        dependency: Option<PhysicalSortExpr>,
969        map: &mut DependencyMap,
970    ) -> bool {
971        let target_sort_expr = self
972            .project_expr(&sort_expr.expr, mapping)
973            .map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
974        let projectable = target_sort_expr.is_some();
975        if projectable
976            || mapping
977                .iter()
978                .any(|(source, _)| expr_refers(source, &sort_expr.expr))
979        {
980            // Add sort expressions that can be projected or referred to
981            // by any of the projection expressions to the dependency map:
982            map.insert(sort_expr, target_sort_expr, dependency);
983        }
984        projectable
985    }
986
987    /// Returns a new `ProjectionMapping` where source expressions are in normal
988    /// form. Normalization ensures that source expressions are transformed into
989    /// a consistent representation, which is beneficial for algorithms that rely
990    /// on exact equalities, as it allows for more precise and reliable comparisons.
991    ///
992    /// # Parameters
993    ///
994    /// - `mapping`: A reference to the original `ProjectionMapping` to normalize.
995    ///
996    /// # Returns
997    ///
998    /// A new `ProjectionMapping` with source expressions in normal form.
999    fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
1000        mapping
1001            .iter()
1002            .map(|(source, target)| {
1003                let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
1004                (normal_source, target.clone())
1005            })
1006            .collect()
1007    }
1008
1009    /// Computes projected orderings based on a given projection mapping.
1010    ///
1011    /// This function takes a `ProjectionMapping` and computes the possible
1012    /// orderings for the projected expressions. It considers dependencies
1013    /// between expressions and generates valid orderings according to the
1014    /// specified sort properties.
1015    ///
1016    /// # Parameters
1017    ///
1018    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
1019    ///   relationship between source and target expressions.
1020    /// - `oeq_class`: The `OrderingEquivalenceClass` containing the orderings
1021    ///   to project.
1022    ///
1023    /// # Returns
1024    ///
1025    /// A vector of all valid (but not in normal form) orderings after projection.
1026    fn projected_orderings(
1027        &self,
1028        mapping: &ProjectionMapping,
1029        mut oeq_class: OrderingEquivalenceClass,
1030    ) -> Vec<LexOrdering> {
1031        // Normalize source expressions in the mapping:
1032        let mapping = self.normalize_mapping(mapping);
1033        // Get dependency map for existing orderings:
1034        oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
1035        let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
1036        let orderings = mapping.iter().flat_map(|(source, targets)| {
1037            referred_dependencies(&dependency_map, source)
1038                .into_iter()
1039                .filter_map(|deps| {
1040                    let ep = get_expr_properties(source, &deps, &self.schema);
1041                    let sort_properties = ep.map(|prop| prop.sort_properties);
1042                    if let Ok(SortProperties::Ordered(options)) = sort_properties {
1043                        Some((options, deps))
1044                    } else {
1045                        // Do not consider unordered cases.
1046                        None
1047                    }
1048                })
1049                .flat_map(|(options, relevant_deps)| {
1050                    // Generate dependent orderings (i.e. prefixes for targets):
1051                    let dependency_orderings =
1052                        generate_dependency_orderings(&relevant_deps, &dependency_map);
1053                    let sort_exprs = targets.iter().map(|(target, _)| {
1054                        PhysicalSortExpr::new(Arc::clone(target), options)
1055                    });
1056                    if dependency_orderings.is_empty() {
1057                        sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
1058                    } else {
1059                        sort_exprs
1060                            .flat_map(|sort_expr| {
1061                                let mut result = dependency_orderings.clone();
1062                                for ordering in result.iter_mut() {
1063                                    ordering.push(sort_expr.clone());
1064                                }
1065                                result
1066                            })
1067                            .collect::<Vec<_>>()
1068                    }
1069                })
1070        });
1071
1072        // Add valid projected orderings. For example, if existing ordering is
1073        // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1074        // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1075        // `b_new` themselves need not be ordered. Such dependencies cannot be
1076        // deduced via the pass above.
1077        let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1078            let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1079            if prefixes.is_empty() {
1080                // If prefix is empty, there is no dependency. Insert
1081                // empty ordering:
1082                if let Some(target) = &node.target {
1083                    prefixes.push([target.clone()].into());
1084                }
1085            } else {
1086                // Append current ordering on top its dependencies:
1087                for ordering in prefixes.iter_mut() {
1088                    if let Some(target) = &node.target {
1089                        ordering.push(target.clone());
1090                    }
1091                }
1092            }
1093            prefixes
1094        });
1095
1096        // Simplify each ordering by removing redundant sections:
1097        orderings.chain(projected_orderings).collect()
1098    }
1099
1100    /// Projects constraints according to the given projection mapping.
1101    ///
1102    /// This function takes a projection mapping and extracts column indices of
1103    /// target columns. It then projects the constraints to only include
1104    /// relationships between columns that exist in the projected output.
1105    ///
1106    /// # Parameters
1107    ///
1108    /// * `mapping` - A reference to the `ProjectionMapping` that defines the
1109    ///   projection operation.
1110    ///
1111    /// # Returns
1112    ///
1113    /// Returns an optional `Constraints` object containing only the constraints
1114    /// that are valid for the projected columns (if any exists).
1115    fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1116        let indices = mapping
1117            .iter()
1118            .flat_map(|(_, targets)| {
1119                targets.iter().flat_map(|(target, _)| {
1120                    target.as_any().downcast_ref::<Column>().map(|c| c.index())
1121                })
1122            })
1123            .collect::<Vec<_>>();
1124        self.constraints.project(&indices)
1125    }
1126
1127    /// Projects the equivalences within according to `mapping` and
1128    /// `output_schema`.
1129    pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1130        let eq_group = self.eq_group.project(mapping);
1131        let orderings =
1132            self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
1133        let normal_orderings = orderings
1134            .iter()
1135            .cloned()
1136            .map(|o| eq_group.normalize_sort_exprs(o));
1137        Self {
1138            oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
1139            oeq_class: OrderingEquivalenceClass::new(orderings),
1140            constraints: self.projected_constraints(mapping).unwrap_or_default(),
1141            schema: output_schema,
1142            eq_group,
1143        }
1144    }
1145
1146    /// Returns the longest (potentially partial) permutation satisfying the
1147    /// existing ordering. For example, if we have the equivalent orderings
1148    /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1149    /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1150    /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1151    /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1152    /// inside the argument `exprs` (respectively). For the mathematical
1153    /// definition of "partial permutation", see:
1154    ///
1155    /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1156    pub fn find_longest_permutation(
1157        &self,
1158        exprs: &[Arc<dyn PhysicalExpr>],
1159    ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
1160        let mut eq_properties = self.clone();
1161        let mut result = vec![];
1162        // The algorithm is as follows:
1163        // - Iterate over all the expressions and insert ordered expressions
1164        //   into the result.
1165        // - Treat inserted expressions as constants (i.e. add them as constants
1166        //   to the state).
1167        // - Continue the above procedure until no expression is inserted; i.e.
1168        //   the algorithm reaches a fixed point.
1169        // This algorithm should reach a fixed point in at most `exprs.len()`
1170        // iterations.
1171        let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1172        for _ in 0..exprs.len() {
1173            // Get ordered expressions with their indices.
1174            let ordered_exprs = search_indices
1175                .iter()
1176                .filter_map(|&idx| {
1177                    let ExprProperties {
1178                        sort_properties, ..
1179                    } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1180                    match sort_properties {
1181                        SortProperties::Ordered(options) => {
1182                            let expr = Arc::clone(&exprs[idx]);
1183                            Some((PhysicalSortExpr::new(expr, options), idx))
1184                        }
1185                        SortProperties::Singleton => {
1186                            // Assign default ordering to constant expressions:
1187                            let expr = Arc::clone(&exprs[idx]);
1188                            Some((PhysicalSortExpr::new_default(expr), idx))
1189                        }
1190                        SortProperties::Unordered => None,
1191                    }
1192                })
1193                .collect::<Vec<_>>();
1194            // We reached a fixed point, exit.
1195            if ordered_exprs.is_empty() {
1196                break;
1197            }
1198            // Remove indices that have an ordering from `search_indices`, and
1199            // treat ordered expressions as constants in subsequent iterations.
1200            // We can do this because the "next" key only matters in a lexicographical
1201            // ordering when the keys to its left have the same values.
1202            //
1203            // Note that these expressions are not properly "constants". This is just
1204            // an implementation strategy confined to this function.
1205            for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1206                let const_expr = ConstExpr::from(Arc::clone(expr));
1207                eq_properties.add_constants(std::iter::once(const_expr))?;
1208                search_indices.shift_remove(idx);
1209            }
1210            // Add new ordered section to the state.
1211            result.extend(ordered_exprs);
1212        }
1213        Ok(result.into_iter().unzip())
1214    }
1215
1216    /// This function determines whether the provided expression is constant
1217    /// based on the known constants. For example, if columns `a` and `b` are
1218    /// constant, then expressions `a`, `b` and `a + b` will all return `true`
1219    /// whereas expression `c` will return `false`.
1220    ///
1221    /// # Parameters
1222    ///
1223    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1224    ///   expression to be checked.
1225    ///
1226    /// # Returns
1227    ///
1228    /// Returns a `Some` value if the expression is constant according to
1229    /// equivalence group, and `None` otherwise. The `Some` variant contains
1230    /// an `AcrossPartitions` value indicating whether the expression is
1231    /// constant across partitions, and its actual value (if available).
1232    pub fn is_expr_constant(
1233        &self,
1234        expr: &Arc<dyn PhysicalExpr>,
1235    ) -> Option<AcrossPartitions> {
1236        self.eq_group.is_expr_constant(expr)
1237    }
1238
1239    /// Retrieves the properties for a given physical expression.
1240    ///
1241    /// This function constructs an [`ExprProperties`] object for the given
1242    /// expression, which encapsulates information about the expression's
1243    /// properties, including its [`SortProperties`] and [`Interval`].
1244    ///
1245    /// # Parameters
1246    ///
1247    /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1248    ///   for which ordering information is sought.
1249    ///
1250    /// # Returns
1251    ///
1252    /// Returns an [`ExprProperties`] object containing the ordering and range
1253    /// information for the given expression.
1254    pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1255        ExprPropertiesNode::new_unknown(expr)
1256            .transform_up(|expr| update_properties(expr, self))
1257            .data()
1258            .map(|node| node.data)
1259            .unwrap_or_else(|_| ExprProperties::new_unknown())
1260    }
1261
1262    /// Transforms this `EquivalenceProperties` by mapping columns in the
1263    /// original schema to columns in the new schema by index.
1264    pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
1265        // The new schema and the original schema is aligned when they have the
1266        // same number of columns, and fields at the same index have the same
1267        // type in both schemas.
1268        let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1269            && self
1270                .schema
1271                .fields
1272                .iter()
1273                .zip(schema.fields.iter())
1274                .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1275        if !schemas_aligned {
1276            // Rewriting equivalence properties in terms of new schema is not
1277            // safe when schemas are not aligned:
1278            return plan_err!(
1279                "Schemas have to be aligned to rewrite equivalences:\n Old schema: {:?}\n New schema: {:?}",
1280                self.schema,
1281                schema
1282            );
1283        }
1284
1285        // Rewrite equivalence classes according to the new schema:
1286        let mut eq_classes = vec![];
1287        for mut eq_class in self.eq_group {
1288            // Rewrite the expressions in the equivalence class:
1289            eq_class.exprs = eq_class
1290                .exprs
1291                .into_iter()
1292                .map(|expr| with_new_schema(expr, &schema))
1293                .collect::<Result<_>>()?;
1294            // Rewrite the constant value (if available and known):
1295            let data_type = eq_class
1296                .canonical_expr()
1297                .map(|e| e.data_type(&schema))
1298                .transpose()?;
1299            if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
1300                (data_type, &mut eq_class.constant)
1301            {
1302                *value = value.cast_to(&data_type)?;
1303            }
1304            eq_classes.push(eq_class);
1305        }
1306        self.eq_group = eq_classes.into();
1307
1308        // Rewrite orderings according to new schema:
1309        self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
1310        self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
1311
1312        // Update the schema:
1313        self.schema = schema;
1314
1315        Ok(self)
1316    }
1317}
1318
1319impl From<EquivalenceProperties> for OrderingEquivalenceClass {
1320    fn from(eq_properties: EquivalenceProperties) -> Self {
1321        eq_properties.oeq_class
1322    }
1323}
1324
1325/// More readable display version of the `EquivalenceProperties`.
1326///
1327/// Format:
1328/// ```text
1329/// order: [[b@1 ASC NULLS LAST]], eq: [{members: [a@0], constant: (heterogeneous)}]
1330/// ```
1331impl Display for EquivalenceProperties {
1332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1333        let empty_eq_group = self.eq_group.is_empty();
1334        let empty_oeq_class = self.oeq_class.is_empty();
1335        if empty_oeq_class && empty_eq_group {
1336            write!(f, "No properties")?;
1337        } else if !empty_oeq_class {
1338            write!(f, "order: {}", self.oeq_class)?;
1339            if !empty_eq_group {
1340                write!(f, ", eq: {}", self.eq_group)?;
1341            }
1342        } else {
1343            write!(f, "eq: {}", self.eq_group)?;
1344        }
1345        Ok(())
1346    }
1347}
1348
1349/// Calculates the properties of a given [`ExprPropertiesNode`].
1350///
1351/// Order information can be retrieved as:
1352/// - If it is a leaf node, we directly find the order of the node by looking
1353///   at the given sort expression and equivalence properties if it is a `Column`
1354///   leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1355///   it as singleton so that it can cooperate with all ordered columns.
1356/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1357///   and operator has its own rules on how to propagate the children orderings.
1358///   However, before we engage in recursion, we check whether this intermediate
1359///   node directly matches with the sort expression. If there is a match, the
1360///   sort expression emerges at that node immediately, discarding the recursive
1361///   result coming from its children.
1362///
1363/// Range information is calculated as:
1364/// - If it is a `Literal` node, we set the range as a point value. If it is a
1365///   `Column` node, we set the datatype of the range, but cannot give an interval
1366///   for the range, yet.
1367/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1368///   and operator has its own rules on how to propagate the children range.
1369fn update_properties(
1370    mut node: ExprPropertiesNode,
1371    eq_properties: &EquivalenceProperties,
1372) -> Result<Transformed<ExprPropertiesNode>> {
1373    // First, try to gather the information from the children:
1374    if !node.expr.children().is_empty() {
1375        // We have an intermediate (non-leaf) node, account for its children:
1376        let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1377        node.data = node.expr.get_properties(&children_props)?;
1378    } else if node.expr.as_any().is::<Literal>() {
1379        // We have a Literal, which is one of the two possible leaf node types:
1380        node.data = node.expr.get_properties(&[])?;
1381    } else if node.expr.as_any().is::<Column>() {
1382        // We have a Column, which is the other possible leaf node type:
1383        node.data.range =
1384            Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1385    }
1386    // Now, check what we know about orderings:
1387    let normal_expr = eq_properties
1388        .eq_group
1389        .normalize_expr(Arc::clone(&node.expr));
1390    let oeq_class = &eq_properties.oeq_cache.normal_cls;
1391    if eq_properties.is_expr_constant(&normal_expr).is_some()
1392        || oeq_class.is_expr_partial_const(&normal_expr)
1393    {
1394        node.data.sort_properties = SortProperties::Singleton;
1395    } else if let Some(options) = oeq_class.get_options(&normal_expr) {
1396        node.data.sort_properties = SortProperties::Ordered(options);
1397    }
1398    Ok(Transformed::yes(node))
1399}
1400
1401/// This function examines whether a referring expression directly refers to a
1402/// given referred expression or if any of its children in the expression tree
1403/// refer to the specified expression.
1404///
1405/// # Parameters
1406///
1407/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1408/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1409///
1410/// # Returns
1411///
1412/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1413/// `referred_expr` or not.
1414fn expr_refers(
1415    referring_expr: &Arc<dyn PhysicalExpr>,
1416    referred_expr: &Arc<dyn PhysicalExpr>,
1417) -> bool {
1418    referring_expr.eq(referred_expr)
1419        || referring_expr
1420            .children()
1421            .iter()
1422            .any(|child| expr_refers(child, referred_expr))
1423}
1424
1425/// This function examines the given expression and its properties to determine
1426/// the ordering properties of the expression. The range knowledge is not utilized
1427/// yet in the scope of this function.
1428///
1429/// # Parameters
1430///
1431/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1432///   which ordering properties need to be determined.
1433/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1434///   referred to by `expr`.
1435/// - `schema``: A reference to the schema which the `expr` columns refer.
1436///
1437/// # Returns
1438///
1439/// A `SortProperties` indicating the ordering information of the given expression.
1440fn get_expr_properties(
1441    expr: &Arc<dyn PhysicalExpr>,
1442    dependencies: &Dependencies,
1443    schema: &SchemaRef,
1444) -> Result<ExprProperties> {
1445    if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1446        // If exact match is found, return its ordering.
1447        Ok(ExprProperties {
1448            sort_properties: SortProperties::Ordered(column_order.options),
1449            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1450            preserves_lex_ordering: false,
1451        })
1452    } else if expr.as_any().downcast_ref::<Column>().is_some() {
1453        Ok(ExprProperties {
1454            sort_properties: SortProperties::Unordered,
1455            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1456            preserves_lex_ordering: false,
1457        })
1458    } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1459        Ok(ExprProperties {
1460            sort_properties: SortProperties::Singleton,
1461            range: literal.value().into(),
1462            preserves_lex_ordering: true,
1463        })
1464    } else {
1465        // Find orderings of its children
1466        let child_states = expr
1467            .children()
1468            .iter()
1469            .map(|child| get_expr_properties(child, dependencies, schema))
1470            .collect::<Result<Vec<_>>>()?;
1471        // Calculate expression ordering using ordering of its children.
1472        expr.get_properties(&child_states)
1473    }
1474}