Skip to main content

datafusion_physical_expr/equivalence/properties/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22pub use joins::*;
23pub use union::*;
24
25use std::fmt::{self, Display};
26use std::mem;
27use std::sync::Arc;
28
29use self::dependency::{
30    Dependencies, DependencyMap, construct_prefix_orderings,
31    generate_dependency_orderings, referred_dependencies,
32};
33use crate::equivalence::{
34    AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
35};
36use crate::expressions::{CastExpr, Column, Literal, with_new_schema};
37use crate::{
38    ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
39    PhysicalSortRequirement,
40};
41
42use arrow::datatypes::SchemaRef;
43use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
44use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
45use datafusion_expr::interval_arithmetic::Interval;
46use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
47use datafusion_physical_expr_common::sort_expr::options_compatible;
48use datafusion_physical_expr_common::utils::ExprPropertiesNode;
49
50use indexmap::IndexSet;
51use itertools::Itertools;
52
53/// `EquivalenceProperties` stores information about the output of a plan node
54/// that can be used to optimize the plan. Currently, it keeps track of:
55/// - Sort expressions (orderings),
56/// - Equivalent expressions; i.e. expressions known to have the same value.
57/// - Constants expressions; i.e. expressions known to contain a single constant
58///   value.
59///
60/// Please see the [Using Ordering for Better Plans] blog for more details.
61///
62/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties` tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example, if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case,  columns `a` and `b` always have the same value. With this
101/// information, Datafusion can optimize various operations. For example, if
102/// the partition requirement is `Hash(a)` and output partitioning is
103/// `Hash(b)`, then DataFusion avoids repartitioning the data as the existing
104/// partitioning satisfies the requirement.
105///
106/// # Code Example
107/// ```
108/// # use std::sync::Arc;
109/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
110/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
111/// # use datafusion_physical_expr::expressions::col;
112/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
113/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
114/// #   Field::new("a", DataType::Int32, false),
115/// #   Field::new("b", DataType::Int32, false),
116/// #   Field::new("c", DataType::Int32, false),
117/// # ]));
118/// # let col_a = col("a", &schema).unwrap();
119/// # let col_b = col("b", &schema).unwrap();
120/// # let col_c = col("c", &schema).unwrap();
121/// // This object represents data that is sorted by a ASC, c DESC
122/// // with a single constant value of b
123/// let mut eq_properties = EquivalenceProperties::new(schema);
124/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
125/// eq_properties.add_ordering([
126///     PhysicalSortExpr::new_default(col_a).asc(),
127///     PhysicalSortExpr::new_default(col_c).desc(),
128/// ]);
129///
130/// assert_eq!(
131///     eq_properties.to_string(),
132///     "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: (heterogeneous)}]"
133/// );
134/// ```
135#[derive(Clone, Debug)]
136pub struct EquivalenceProperties {
137    /// Distinct equivalence classes (i.e. expressions with the same value).
138    eq_group: EquivalenceGroup,
139    /// Equivalent sort expressions (i.e. those define the same ordering).
140    oeq_class: OrderingEquivalenceClass,
141    /// Cache storing equivalent sort expressions in normal form (i.e. without
142    /// constants/duplicates and in standard form) and a map associating leading
143    /// terms with full sort expressions.
144    oeq_cache: OrderingEquivalenceCache,
145    /// Table constraints that factor in equivalence calculations.
146    constraints: Constraints,
147    /// Schema associated with this object.
148    schema: SchemaRef,
149}
150
151/// This object serves as a cache for storing equivalent sort expressions
152/// in normal form, and a map associating leading sort expressions with
153/// full lexicographical orderings. With this information, DataFusion can
154/// efficiently determine whether a given ordering is satisfied by the
155/// existing orderings, and discover new orderings based on the existing
156/// equivalence properties.
157#[derive(Clone, Debug, Default)]
158struct OrderingEquivalenceCache {
159    /// Equivalent sort expressions in normal form.
160    normal_cls: OrderingEquivalenceClass,
161    /// Map associating leading sort expressions with full lexicographical
162    /// orderings. Values are indices into `normal_cls`.
163    leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
164}
165
166impl OrderingEquivalenceCache {
167    /// Creates a new `OrderingEquivalenceCache` object with the given
168    /// equivalent orderings, which should be in normal form.
169    pub fn new(
170        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
171    ) -> Self {
172        let mut cache = Self {
173            normal_cls: OrderingEquivalenceClass::new(orderings),
174            leading_map: HashMap::new(),
175        };
176        cache.update_map();
177        cache
178    }
179
180    /// Updates/reconstructs the leading expression map according to the normal
181    /// ordering equivalence class within.
182    pub fn update_map(&mut self) {
183        self.leading_map.clear();
184        for (idx, ordering) in self.normal_cls.iter().enumerate() {
185            let expr = Arc::clone(&ordering.first().expr);
186            self.leading_map.entry(expr).or_default().push(idx);
187        }
188    }
189
190    /// Clears the cache, removing all orderings and leading expressions.
191    pub fn clear(&mut self) {
192        self.normal_cls.clear();
193        self.leading_map.clear();
194    }
195}
196
197impl EquivalenceProperties {
198    /// Creates an empty `EquivalenceProperties` object.
199    pub fn new(schema: SchemaRef) -> Self {
200        Self {
201            eq_group: EquivalenceGroup::default(),
202            oeq_class: OrderingEquivalenceClass::default(),
203            oeq_cache: OrderingEquivalenceCache::default(),
204            constraints: Constraints::default(),
205            schema,
206        }
207    }
208
209    /// Adds constraints to the properties.
210    pub fn set_constraints(&mut self, constraints: Constraints) {
211        self.constraints = constraints;
212    }
213
214    /// Adds constraints to the properties.
215    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
216        self.set_constraints(constraints);
217        self
218    }
219
220    /// Creates a new `EquivalenceProperties` object with the given orderings.
221    pub fn new_with_orderings(
222        schema: SchemaRef,
223        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
224    ) -> Self {
225        let eq_group = EquivalenceGroup::default();
226        let oeq_class = OrderingEquivalenceClass::new(orderings);
227        // Here, we can avoid performing a full normalization, and get by with
228        // only removing constants because the equivalence group is empty.
229        let normal_orderings = oeq_class.iter().cloned().map(|o| {
230            o.into_iter()
231                .filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
232        });
233        Self {
234            oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
235            oeq_class,
236            eq_group,
237            constraints: Constraints::default(),
238            schema,
239        }
240    }
241
242    /// Returns the associated schema.
243    pub fn schema(&self) -> &SchemaRef {
244        &self.schema
245    }
246
247    /// Returns a reference to the ordering equivalence class within.
248    pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
249        &self.oeq_class
250    }
251
252    /// Returns a reference to the equivalence group within.
253    pub fn eq_group(&self) -> &EquivalenceGroup {
254        &self.eq_group
255    }
256
257    /// Returns a reference to the constraints within.
258    pub fn constraints(&self) -> &Constraints {
259        &self.constraints
260    }
261
262    /// Returns all the known constants expressions.
263    pub fn constants(&self) -> Vec<ConstExpr> {
264        self.eq_group
265            .iter()
266            .flat_map(|c| {
267                c.iter().filter_map(|expr| {
268                    c.constant
269                        .as_ref()
270                        .map(|across| ConstExpr::new(Arc::clone(expr), across.clone()))
271                })
272            })
273            .collect()
274    }
275
276    /// Returns the output ordering of the properties.
277    pub fn output_ordering(&self) -> Option<LexOrdering> {
278        let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
279        self.normalize_sort_exprs(concat)
280    }
281
282    /// Extends this `EquivalenceProperties` with the `other` object.
283    pub fn extend(mut self, other: Self) -> Result<Self> {
284        self.constraints.extend(other.constraints);
285        self.add_equivalence_group(other.eq_group)?;
286        self.add_orderings(other.oeq_class);
287        Ok(self)
288    }
289
290    /// Clears (empties) the ordering equivalence class within this object.
291    /// Call this method when existing orderings are invalidated.
292    pub fn clear_orderings(&mut self) {
293        self.oeq_class.clear();
294        self.oeq_cache.clear();
295    }
296
297    /// Removes constant expressions that may change across partitions.
298    /// This method should be used when merging data from different partitions.
299    pub fn clear_per_partition_constants(&mut self) {
300        if self.eq_group.clear_per_partition_constants() {
301            // Renormalize orderings if the equivalence group changes:
302            let normal_orderings = self
303                .oeq_class
304                .iter()
305                .cloned()
306                .map(|o| self.eq_group.normalize_sort_exprs(o));
307            self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
308        }
309    }
310
311    /// Adds new orderings into the existing ordering equivalence class.
312    pub fn add_orderings(
313        &mut self,
314        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
315    ) {
316        let orderings: Vec<_> =
317            orderings.into_iter().filter_map(LexOrdering::new).collect();
318        let normal_orderings: Vec<_> = orderings
319            .iter()
320            .cloned()
321            .filter_map(|o| self.normalize_sort_exprs(o))
322            .collect();
323        if !normal_orderings.is_empty() {
324            self.oeq_class.extend(orderings);
325            // Normalize given orderings to update the cache:
326            self.oeq_cache.normal_cls.extend(normal_orderings);
327            // TODO: If no ordering is found to be redundant during extension, we
328            //       can use a shortcut algorithm to update the leading map.
329            self.oeq_cache.update_map();
330        }
331    }
332
333    /// Adds a single ordering to the existing ordering equivalence class.
334    pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
335        self.add_orderings(std::iter::once(ordering));
336    }
337
338    fn update_oeq_cache(&mut self) -> Result<()> {
339        // Renormalize orderings if the equivalence group changes:
340        let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
341        let normal_orderings = normal_cls
342            .into_iter()
343            .map(|o| self.eq_group.normalize_sort_exprs(o));
344        self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
345        self.oeq_cache.update_map();
346        // Discover any new orderings based on the new equivalence classes:
347        let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
348        for expr in leading_exprs {
349            self.discover_new_orderings(expr)?;
350        }
351        Ok(())
352    }
353
354    /// Incorporates the given equivalence group to into the existing
355    /// equivalence group within.
356    pub fn add_equivalence_group(
357        &mut self,
358        other_eq_group: EquivalenceGroup,
359    ) -> Result<()> {
360        if !other_eq_group.is_empty() {
361            self.eq_group.extend(other_eq_group);
362            self.update_oeq_cache()?;
363        }
364        Ok(())
365    }
366
367    /// Returns the ordering equivalence class within in normal form.
368    /// Normalization standardizes expressions according to the equivalence
369    /// group within, and removes constants/duplicates.
370    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
371        self.oeq_class
372            .iter()
373            .cloned()
374            .filter_map(|ordering| self.normalize_sort_exprs(ordering))
375            .collect::<Vec<_>>()
376            .into()
377    }
378
379    /// Adds a new equality condition into the existing equivalence group.
380    /// If the given equality defines a new equivalence class, adds this new
381    /// equivalence class to the equivalence group.
382    pub fn add_equal_conditions(
383        &mut self,
384        left: Arc<dyn PhysicalExpr>,
385        right: Arc<dyn PhysicalExpr>,
386    ) -> Result<()> {
387        // Add equal expressions to the state:
388        if self.eq_group.add_equal_conditions(left, right) {
389            self.update_oeq_cache()?;
390        }
391        self.update_oeq_cache()?;
392        Ok(())
393    }
394
395    /// Track/register physical expressions with constant values.
396    pub fn add_constants(
397        &mut self,
398        constants: impl IntoIterator<Item = ConstExpr>,
399    ) -> Result<()> {
400        // Add the new constant to the equivalence group:
401        for constant in constants {
402            self.eq_group.add_constant(constant);
403        }
404        // Renormalize the orderings after adding new constants by removing
405        // the constants from existing orderings:
406        let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
407        let normal_orderings = normal_cls.into_iter().map(|ordering| {
408            ordering.into_iter().filter(|sort_expr| {
409                self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
410            })
411        });
412        self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
413        self.oeq_cache.update_map();
414        // Discover any new orderings based on the constants:
415        let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
416        for expr in leading_exprs {
417            self.discover_new_orderings(expr)?;
418        }
419        Ok(())
420    }
421
422    /// Discover new valid orderings in light of a new equality. Accepts a single
423    /// argument (`expr`) which is used to determine the orderings to update.
424    /// When constants or equivalence classes change, there may be new orderings
425    /// that can be discovered with the new equivalence properties.
426    /// For a discussion, see: <https://github.com/apache/datafusion/issues/9812>
427    fn discover_new_orderings(
428        &mut self,
429        normal_expr: Arc<dyn PhysicalExpr>,
430    ) -> Result<()> {
431        let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
432            return Ok(());
433        };
434        let eq_class = self
435            .eq_group
436            .get_equivalence_class(&normal_expr)
437            .map_or_else(|| vec![normal_expr], |class| class.clone().into());
438
439        let mut new_orderings = vec![];
440        for idx in ordering_idxs {
441            let ordering = &self.oeq_cache.normal_cls[*idx];
442            let leading_ordering_options = ordering[0].options;
443
444            'exprs: for equivalent_expr in &eq_class {
445                let children = equivalent_expr.children();
446                if children.is_empty() {
447                    continue;
448                }
449                // Check if all children match the next expressions in the ordering:
450                let mut child_properties = vec![];
451                // Build properties for each child based on the next expression:
452                for (i, child) in children.into_iter().enumerate() {
453                    let Some(next) = ordering.get(i + 1) else {
454                        break 'exprs;
455                    };
456                    if !next.expr.eq(child) {
457                        break 'exprs;
458                    }
459                    let data_type = child.data_type(&self.schema)?;
460                    child_properties.push(ExprProperties {
461                        sort_properties: SortProperties::Ordered(next.options),
462                        range: Interval::make_unbounded(&data_type)?,
463                        preserves_lex_ordering: true,
464                    });
465                }
466                // Check if the expression is monotonic in all arguments:
467                let expr_properties =
468                    equivalent_expr.get_properties(&child_properties)?;
469                if expr_properties.preserves_lex_ordering
470                    && expr_properties.sort_properties
471                        == SortProperties::Ordered(leading_ordering_options)
472                {
473                    // Assume that `[c ASC, a ASC, b ASC]` is among existing
474                    // orderings. If equality `c = f(a, b)` is given, ordering
475                    // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
476                    // ordering `[a ASC, b ASC]` is also a valid ordering.
477                    new_orderings.push(ordering[1..].to_vec());
478                    break;
479                }
480            }
481        }
482
483        if !new_orderings.is_empty() {
484            self.add_orderings(new_orderings);
485        }
486        Ok(())
487    }
488
489    /// Updates the ordering equivalence class within assuming that the table
490    /// is re-sorted according to the argument `ordering`, and returns whether
491    /// this operation resulted in any change. Note that equivalence classes
492    /// (and constants) do not change as they are unaffected by a re-sort. If
493    /// the given ordering is already satisfied, the function does nothing.
494    pub fn reorder(
495        &mut self,
496        ordering: impl IntoIterator<Item = PhysicalSortExpr>,
497    ) -> Result<bool> {
498        let (ordering, ordering_tee) = ordering.into_iter().tee();
499        // First, standardize the given ordering:
500        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
501            // If the ordering vanishes after normalization, it is satisfied:
502            return Ok(false);
503        };
504        if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
505            // If the ordering is unsatisfied, replace existing orderings:
506            self.clear_orderings();
507            self.add_ordering(ordering_tee);
508            return Ok(true);
509        }
510        Ok(false)
511    }
512
513    /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
514    /// equivalence group within. Returns a `LexOrdering` instance if the
515    /// expressions define a proper lexicographical ordering. For more details,
516    /// see [`EquivalenceGroup::normalize_sort_exprs`].
517    pub fn normalize_sort_exprs(
518        &self,
519        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
520    ) -> Option<LexOrdering> {
521        LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
522    }
523
524    /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
525    /// equivalence group within. Returns a `LexRequirement` instance if the
526    /// expressions define a proper lexicographical requirement. For more
527    /// details, see [`EquivalenceGroup::normalize_sort_exprs`].
528    pub fn normalize_sort_requirements(
529        &self,
530        sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
531    ) -> Option<LexRequirement> {
532        LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
533    }
534
535    /// Iteratively checks whether the given ordering is satisfied by any of
536    /// the existing orderings. See [`Self::ordering_satisfy_requirement`] for
537    /// more details and examples.
538    pub fn ordering_satisfy(
539        &self,
540        given: impl IntoIterator<Item = PhysicalSortExpr>,
541    ) -> Result<bool> {
542        // First, standardize the given ordering:
543        let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
544            // If the ordering vanishes after normalization, it is satisfied:
545            return Ok(true);
546        };
547        Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
548    }
549
550    /// Iteratively checks whether the given sort requirement is satisfied by
551    /// any of the existing orderings.
552    ///
553    /// ### Example Scenarios
554    ///
555    /// In these scenarios, assume that all expressions share the same sort
556    /// properties.
557    ///
558    /// #### Case 1: Sort Requirement `[a, c]`
559    ///
560    /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
561    /// 1. The function first checks the leading requirement `a`, which is
562    ///    satisfied by `[a, b, c].first()`.
563    /// 2. `a` is added as a constant for the next iteration.
564    /// 3. Normal orderings become `[[b, c], [d]]`.
565    /// 4. The function fails for `c` in the second iteration, as neither
566    ///    `[b, c]` nor `[d]` satisfies `c`.
567    ///
568    /// #### Case 2: Sort Requirement `[a, d]`
569    ///
570    /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
571    /// 1. The function first checks the leading requirement `a`, which is
572    ///    satisfied by `[a, b, c].first()`.
573    /// 2. `a` is added as a constant for the next iteration.
574    /// 3. Normal orderings become `[[b, c], [d]]`.
575    /// 4. The function returns `true` as `[d]` satisfies `d`.
576    pub fn ordering_satisfy_requirement(
577        &self,
578        given: impl IntoIterator<Item = PhysicalSortRequirement>,
579    ) -> Result<bool> {
580        // First, standardize the given requirement:
581        let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
582            // If the requirement vanishes after normalization, it is satisfied:
583            return Ok(true);
584        };
585        // Then, check whether given requirement is satisfied by constraints:
586        if self.satisfied_by_constraints(&normal_reqs) {
587            return Ok(true);
588        }
589        let schema = self.schema();
590        let mut eq_properties = self.clone();
591        for element in normal_reqs {
592            // Check whether given requirement is satisfied:
593            let ExprProperties {
594                sort_properties, ..
595            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
596            let satisfy = match sort_properties {
597                SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
598                    let nullable = element.expr.nullable(schema).unwrap_or(true);
599                    options_compatible(&options, &opts, nullable)
600                }),
601                // Singleton expressions satisfy any requirement.
602                SortProperties::Singleton => true,
603                SortProperties::Unordered => false,
604            };
605            if !satisfy {
606                return Ok(false);
607            }
608            // Treat satisfied keys as constants in subsequent iterations. We
609            // can do this because the "next" key only matters in a lexicographical
610            // ordering when the keys to its left have the same values.
611            //
612            // Note that these expressions are not properly "constants". This is just
613            // an implementation strategy confined to this function.
614            //
615            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
616            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
617            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
618            // we add column `a` as constant to the algorithm state. This enables us
619            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
620            let const_expr = ConstExpr::from(element.expr);
621            eq_properties.add_constants(std::iter::once(const_expr))?;
622        }
623        Ok(true)
624    }
625
626    /// Returns the number of consecutive sort expressions (starting from the
627    /// left) that are satisfied by the existing ordering.
628    fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
629        let full_length = normal_ordering.len();
630        // Check whether the given ordering is satisfied by constraints:
631        if self.satisfied_by_constraints_ordering(normal_ordering) {
632            // If constraints satisfy all sort expressions, return the full
633            // length:
634            return Ok(full_length);
635        }
636        let schema = self.schema();
637        let mut eq_properties = self.clone();
638        for (idx, element) in normal_ordering.into_iter().enumerate() {
639            // Check whether given ordering is satisfied:
640            let ExprProperties {
641                sort_properties, ..
642            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
643            let satisfy = match sort_properties {
644                SortProperties::Ordered(options) => options_compatible(
645                    &options,
646                    &element.options,
647                    element.expr.nullable(schema).unwrap_or(true),
648                ),
649                // Singleton expressions satisfy any ordering.
650                SortProperties::Singleton => true,
651                SortProperties::Unordered => false,
652            };
653            if !satisfy {
654                // As soon as one sort expression is unsatisfied, return how
655                // many we've satisfied so far:
656                return Ok(idx);
657            }
658            // Treat satisfied keys as constants in subsequent iterations. We
659            // can do this because the "next" key only matters in a lexicographical
660            // ordering when the keys to its left have the same values.
661            //
662            // Note that these expressions are not properly "constants". This is just
663            // an implementation strategy confined to this function.
664            //
665            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
666            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
667            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
668            // we add column `a` as constant to the algorithm state. This enables us
669            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
670            let const_expr = ConstExpr::from(Arc::clone(&element.expr));
671            eq_properties.add_constants(std::iter::once(const_expr))?
672        }
673        // All sort expressions are satisfied, return full length:
674        Ok(full_length)
675    }
676
677    /// Determines the longest normal prefix of `ordering` satisfied by the
678    /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
679    /// boolean indicating whether all the sort expressions are satisfied.
680    pub fn extract_common_sort_prefix(
681        &self,
682        ordering: LexOrdering,
683    ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
684        // First, standardize the given ordering:
685        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
686            // If the ordering vanishes after normalization, it is satisfied:
687            return Ok((vec![], true));
688        };
689        let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
690        let flag = prefix_len == normal_ordering.len();
691        let mut sort_exprs: Vec<_> = normal_ordering.into();
692        if !flag {
693            sort_exprs.truncate(prefix_len);
694        }
695        Ok((sort_exprs, flag))
696    }
697
698    /// Checks if the sort expressions are satisfied by any of the table
699    /// constraints (primary key or unique). Returns true if any constraint
700    /// fully satisfies the expressions (i.e. constraint indices form a valid
701    /// prefix of an existing ordering that matches the expressions). For
702    /// unique constraints, also verifies nullable columns.
703    fn satisfied_by_constraints_ordering(
704        &self,
705        normal_exprs: &[PhysicalSortExpr],
706    ) -> bool {
707        self.constraints.iter().any(|constraint| match constraint {
708            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
709                let check_null = matches!(constraint, Constraint::Unique(_));
710                let normalized_size = normal_exprs.len();
711                indices.len() <= normalized_size
712                    && self.oeq_class.iter().any(|ordering| {
713                        let length = ordering.len();
714                        if indices.len() > length || normalized_size < length {
715                            return false;
716                        }
717                        // Build a map of column positions in the ordering:
718                        let mut col_positions = HashMap::with_capacity(length);
719                        for (pos, req) in ordering.iter().enumerate() {
720                            if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
721                            {
722                                let nullable = col.nullable(&self.schema).unwrap_or(true);
723                                col_positions.insert(col.index(), (pos, nullable));
724                            }
725                        }
726                        // Check if all constraint indices appear in valid positions:
727                        if !indices.iter().all(|idx| {
728                            col_positions.get(idx).is_some_and(|&(pos, nullable)| {
729                                // For unique constraints, verify column is not nullable if it's first/last:
730                                !check_null
731                                    || !nullable
732                                    || (pos != 0 && pos != length - 1)
733                            })
734                        }) {
735                            return false;
736                        }
737                        // Check if this ordering matches the prefix:
738                        normal_exprs.iter().zip(ordering).all(|(given, existing)| {
739                            existing.satisfy_expr(given, &self.schema)
740                        })
741                    })
742            }
743        })
744    }
745
746    /// Checks if the sort requirements are satisfied by any of the table
747    /// constraints (primary key or unique). Returns true if any constraint
748    /// fully satisfies the requirements (i.e. constraint indices form a valid
749    /// prefix of an existing ordering that matches the requirements). For
750    /// unique constraints, also verifies nullable columns.
751    fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
752        self.constraints.iter().any(|constraint| match constraint {
753            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
754                let check_null = matches!(constraint, Constraint::Unique(_));
755                let normalized_size = normal_reqs.len();
756                indices.len() <= normalized_size
757                    && self.oeq_class.iter().any(|ordering| {
758                        let length = ordering.len();
759                        if indices.len() > length || normalized_size < length {
760                            return false;
761                        }
762                        // Build a map of column positions in the ordering:
763                        let mut col_positions = HashMap::with_capacity(length);
764                        for (pos, req) in ordering.iter().enumerate() {
765                            if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
766                            {
767                                let nullable = col.nullable(&self.schema).unwrap_or(true);
768                                col_positions.insert(col.index(), (pos, nullable));
769                            }
770                        }
771                        // Check if all constraint indices appear in valid positions:
772                        if !indices.iter().all(|idx| {
773                            col_positions.get(idx).is_some_and(|&(pos, nullable)| {
774                                // For unique constraints, verify column is not nullable if it's first/last:
775                                !check_null
776                                    || !nullable
777                                    || (pos != 0 && pos != length - 1)
778                            })
779                        }) {
780                            return false;
781                        }
782                        // Check if this ordering matches the prefix:
783                        normal_reqs.iter().zip(ordering).all(|(given, existing)| {
784                            existing.satisfy(given, &self.schema)
785                        })
786                    })
787            }
788        })
789    }
790
791    /// Checks whether the `given` sort requirements are equal or more specific
792    /// than the `reference` sort requirements.
793    pub fn requirements_compatible(
794        &self,
795        given: LexRequirement,
796        reference: LexRequirement,
797    ) -> bool {
798        let Some(normal_given) = self.normalize_sort_requirements(given) else {
799            return true;
800        };
801        let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
802            return true;
803        };
804
805        (normal_reference.len() <= normal_given.len())
806            && normal_reference
807                .into_iter()
808                .zip(normal_given)
809                .all(|(reference, given)| given.compatible(&reference))
810    }
811
812    /// Modify existing orderings by substituting sort expressions with appropriate
813    /// targets from the projection mapping. We substitute a sort expression when
814    /// its physical expression has a one-to-one functional relationship with a
815    /// target expression in the mapping.
816    ///
817    /// After substitution, we may generate more than one `LexOrdering` for each
818    /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
819    /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying projection
820    /// expressions `a, b, CAST(a)`.
821    ///
822    /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is
823    ///       sorted, `atan(x + 1000)` should also be substituted. For now, we
824    ///       only consider single-column `CAST` expressions.
825    fn substitute_oeq_class(
826        schema: &SchemaRef,
827        mapping: &ProjectionMapping,
828        oeq_class: OrderingEquivalenceClass,
829    ) -> OrderingEquivalenceClass {
830        let new_orderings = oeq_class.into_iter().flat_map(|order| {
831            // Modify/expand existing orderings by substituting sort
832            // expressions with appropriate targets from the mapping:
833            order
834                .into_iter()
835                .map(|sort_expr| {
836                    let referring_exprs = mapping
837                        .iter()
838                        .map(|(source, _target)| source)
839                        .filter(|source| expr_refers(source, &sort_expr.expr))
840                        .cloned();
841                    let mut result = vec![];
842                    // The sort expression comes from this schema, so the
843                    // following call to `unwrap` is safe.
844                    let expr_type = sort_expr.expr.data_type(schema).unwrap();
845                    // TODO: Add one-to-one analysis for ScalarFunctions.
846                    for r_expr in referring_exprs {
847                        // We check whether this expression is substitutable.
848                        if let Some(cast_expr) =
849                            r_expr.as_any().downcast_ref::<CastExpr>()
850                        {
851                            // For casts, we need to know whether the cast
852                            // expression matches:
853                            if cast_expr.expr.eq(&sort_expr.expr)
854                                && cast_expr.is_bigger_cast(&expr_type)
855                            {
856                                result.push(PhysicalSortExpr::new(
857                                    r_expr,
858                                    sort_expr.options,
859                                ));
860                            }
861                        }
862                    }
863                    result.push(sort_expr);
864                    result
865                })
866                // Generate all valid orderings given substituted expressions:
867                .multi_cartesian_product()
868        });
869        OrderingEquivalenceClass::new(new_orderings)
870    }
871
872    /// Projects argument `expr` according to the projection described by
873    /// `mapping`, taking equivalences into account.
874    ///
875    /// For example, assume that columns `a` and `c` are always equal, and that
876    /// the projection described by `mapping` encodes the following:
877    ///
878    /// ```text
879    /// a -> a1
880    /// b -> b1
881    /// ```
882    ///
883    /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
884    /// `Some(a1 + b1)` and `d` to `None`, meaning that it is not projectable.
885    pub fn project_expr(
886        &self,
887        expr: &Arc<dyn PhysicalExpr>,
888        mapping: &ProjectionMapping,
889    ) -> Option<Arc<dyn PhysicalExpr>> {
890        self.eq_group.project_expr(mapping, expr)
891    }
892
893    /// Projects the given `expressions` according to the projection described
894    /// by `mapping`, taking equivalences into account. This function is similar
895    /// to [`Self::project_expr`], but projects multiple expressions at once
896    /// more efficiently than calling `project_expr` for each expression.
897    pub fn project_expressions<'a>(
898        &'a self,
899        expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
900        mapping: &'a ProjectionMapping,
901    ) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
902        self.eq_group.project_expressions(mapping, expressions)
903    }
904
905    /// Constructs a dependency map based on existing orderings referred to in
906    /// the projection.
907    ///
908    /// This function analyzes the orderings in the normalized order-equivalence
909    /// class and builds a dependency map. The dependency map captures relationships
910    /// between expressions within the orderings, helping to identify dependencies
911    /// and construct valid projected orderings during projection operations.
912    ///
913    /// # Parameters
914    ///
915    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
916    ///   relationship between source and target expressions.
917    ///
918    /// # Returns
919    ///
920    /// A [`DependencyMap`] representing the dependency map, where each
921    /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
922    ///
923    /// # Example
924    ///
925    /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
926    /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
927    /// Then, the dependency map will be:
928    ///
929    /// ```text
930    /// a ASC: Node {Some(a_new ASC), HashSet{}}
931    /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
932    /// c ASC: Node {None, HashSet{a ASC}}
933    /// ```
934    fn construct_dependency_map(
935        &self,
936        oeq_class: OrderingEquivalenceClass,
937        mapping: &ProjectionMapping,
938    ) -> DependencyMap {
939        let mut map = DependencyMap::default();
940        for ordering in oeq_class.into_iter() {
941            // Previous expression is a dependency. Note that there is no
942            // dependency for the leading expression.
943            if !self.insert_to_dependency_map(
944                mapping,
945                ordering[0].clone(),
946                None,
947                &mut map,
948            ) {
949                continue;
950            }
951            for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
952                if !self.insert_to_dependency_map(
953                    mapping,
954                    sort_expr,
955                    Some(dependency),
956                    &mut map,
957                ) {
958                    // If we can't project, stop constructing the dependency map
959                    // as remaining dependencies will be invalid post projection.
960                    break;
961                }
962            }
963        }
964        map
965    }
966
967    /// Projects the sort expression according to the projection mapping and
968    /// inserts it into the dependency map with the given dependency. Returns
969    /// a boolean flag indicating whether the given expression is projectable.
970    fn insert_to_dependency_map(
971        &self,
972        mapping: &ProjectionMapping,
973        sort_expr: PhysicalSortExpr,
974        dependency: Option<PhysicalSortExpr>,
975        map: &mut DependencyMap,
976    ) -> bool {
977        let target_sort_expr = self
978            .project_expr(&sort_expr.expr, mapping)
979            .map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
980        let projectable = target_sort_expr.is_some();
981        if projectable
982            || mapping
983                .iter()
984                .any(|(source, _)| expr_refers(source, &sort_expr.expr))
985        {
986            // Add sort expressions that can be projected or referred to
987            // by any of the projection expressions to the dependency map:
988            map.insert(sort_expr, target_sort_expr, dependency);
989        }
990        projectable
991    }
992
993    /// Returns a new `ProjectionMapping` where source expressions are in normal
994    /// form. Normalization ensures that source expressions are transformed into
995    /// a consistent representation, which is beneficial for algorithms that rely
996    /// on exact equalities, as it allows for more precise and reliable comparisons.
997    ///
998    /// # Parameters
999    ///
1000    /// - `mapping`: A reference to the original `ProjectionMapping` to normalize.
1001    ///
1002    /// # Returns
1003    ///
1004    /// A new `ProjectionMapping` with source expressions in normal form.
1005    fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
1006        mapping
1007            .iter()
1008            .map(|(source, target)| {
1009                let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
1010                (normal_source, target.clone())
1011            })
1012            .collect()
1013    }
1014
1015    /// Computes projected orderings based on a given projection mapping.
1016    ///
1017    /// This function takes a `ProjectionMapping` and computes the possible
1018    /// orderings for the projected expressions. It considers dependencies
1019    /// between expressions and generates valid orderings according to the
1020    /// specified sort properties.
1021    ///
1022    /// # Parameters
1023    ///
1024    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
1025    ///   relationship between source and target expressions.
1026    /// - `oeq_class`: The `OrderingEquivalenceClass` containing the orderings
1027    ///   to project.
1028    ///
1029    /// # Returns
1030    ///
1031    /// A vector of all valid (but not in normal form) orderings after projection.
1032    fn projected_orderings(
1033        &self,
1034        mapping: &ProjectionMapping,
1035        mut oeq_class: OrderingEquivalenceClass,
1036    ) -> Vec<LexOrdering> {
1037        // Normalize source expressions in the mapping:
1038        let mapping = self.normalize_mapping(mapping);
1039        // Get dependency map for existing orderings:
1040        oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
1041        let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
1042        let orderings = mapping.iter().flat_map(|(source, targets)| {
1043            referred_dependencies(&dependency_map, source)
1044                .into_iter()
1045                .filter_map(|deps| {
1046                    let ep = get_expr_properties(source, &deps, &self.schema);
1047                    let sort_properties = ep.map(|prop| prop.sort_properties);
1048                    if let Ok(SortProperties::Ordered(options)) = sort_properties {
1049                        Some((options, deps))
1050                    } else {
1051                        // Do not consider unordered cases.
1052                        None
1053                    }
1054                })
1055                .flat_map(|(options, relevant_deps)| {
1056                    // Generate dependent orderings (i.e. prefixes for targets):
1057                    let dependency_orderings =
1058                        generate_dependency_orderings(&relevant_deps, &dependency_map);
1059                    let sort_exprs = targets.iter().map(|(target, _)| {
1060                        PhysicalSortExpr::new(Arc::clone(target), options)
1061                    });
1062                    if dependency_orderings.is_empty() {
1063                        sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
1064                    } else {
1065                        sort_exprs
1066                            .flat_map(|sort_expr| {
1067                                let mut result = dependency_orderings.clone();
1068                                for ordering in result.iter_mut() {
1069                                    ordering.push(sort_expr.clone());
1070                                }
1071                                result
1072                            })
1073                            .collect::<Vec<_>>()
1074                    }
1075                })
1076        });
1077
1078        // Add valid projected orderings. For example, if existing ordering is
1079        // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1080        // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1081        // `b_new` themselves need not be ordered. Such dependencies cannot be
1082        // deduced via the pass above.
1083        let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1084            let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1085            if prefixes.is_empty() {
1086                // If prefix is empty, there is no dependency. Insert
1087                // empty ordering:
1088                if let Some(target) = &node.target {
1089                    prefixes.push([target.clone()].into());
1090                }
1091            } else {
1092                // Append current ordering on top its dependencies:
1093                for ordering in prefixes.iter_mut() {
1094                    if let Some(target) = &node.target {
1095                        ordering.push(target.clone());
1096                    }
1097                }
1098            }
1099            prefixes
1100        });
1101
1102        // Simplify each ordering by removing redundant sections:
1103        orderings.chain(projected_orderings).collect()
1104    }
1105
1106    /// Projects constraints according to the given projection mapping.
1107    ///
1108    /// This function takes a projection mapping and extracts column indices of
1109    /// target columns. It then projects the constraints to only include
1110    /// relationships between columns that exist in the projected output.
1111    ///
1112    /// # Parameters
1113    ///
1114    /// * `mapping` - A reference to the `ProjectionMapping` that defines the
1115    ///   projection operation.
1116    ///
1117    /// # Returns
1118    ///
1119    /// Returns an optional `Constraints` object containing only the constraints
1120    /// that are valid for the projected columns (if any exists).
1121    fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1122        let indices = mapping
1123            .iter()
1124            .flat_map(|(_, targets)| {
1125                targets.iter().flat_map(|(target, _)| {
1126                    target.as_any().downcast_ref::<Column>().map(|c| c.index())
1127                })
1128            })
1129            .collect::<Vec<_>>();
1130        self.constraints.project(&indices)
1131    }
1132
1133    /// Projects the equivalences within according to `mapping` and
1134    /// `output_schema`.
1135    pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1136        let eq_group = self.eq_group.project(mapping);
1137        let orderings =
1138            self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
1139        let normal_orderings = orderings
1140            .iter()
1141            .cloned()
1142            .map(|o| eq_group.normalize_sort_exprs(o));
1143        Self {
1144            oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
1145            oeq_class: OrderingEquivalenceClass::new(orderings),
1146            constraints: self.projected_constraints(mapping).unwrap_or_default(),
1147            schema: output_schema,
1148            eq_group,
1149        }
1150    }
1151
1152    /// Returns the longest (potentially partial) permutation satisfying the
1153    /// existing ordering. For example, if we have the equivalent orderings
1154    /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1155    /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1156    /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1157    /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1158    /// inside the argument `exprs` (respectively). For the mathematical
1159    /// definition of "partial permutation", see:
1160    ///
1161    /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1162    pub fn find_longest_permutation(
1163        &self,
1164        exprs: &[Arc<dyn PhysicalExpr>],
1165    ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
1166        let mut eq_properties = self.clone();
1167        let mut result = vec![];
1168        // The algorithm is as follows:
1169        // - Iterate over all the expressions and insert ordered expressions
1170        //   into the result.
1171        // - Treat inserted expressions as constants (i.e. add them as constants
1172        //   to the state).
1173        // - Continue the above procedure until no expression is inserted; i.e.
1174        //   the algorithm reaches a fixed point.
1175        // This algorithm should reach a fixed point in at most `exprs.len()`
1176        // iterations.
1177        let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1178        for _ in 0..exprs.len() {
1179            // Get ordered expressions with their indices.
1180            let ordered_exprs = search_indices
1181                .iter()
1182                .filter_map(|&idx| {
1183                    let ExprProperties {
1184                        sort_properties, ..
1185                    } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1186                    match sort_properties {
1187                        SortProperties::Ordered(options) => {
1188                            let expr = Arc::clone(&exprs[idx]);
1189                            Some((PhysicalSortExpr::new(expr, options), idx))
1190                        }
1191                        SortProperties::Singleton => {
1192                            // Assign default ordering to constant expressions:
1193                            let expr = Arc::clone(&exprs[idx]);
1194                            Some((PhysicalSortExpr::new_default(expr), idx))
1195                        }
1196                        SortProperties::Unordered => None,
1197                    }
1198                })
1199                .collect::<Vec<_>>();
1200            // We reached a fixed point, exit.
1201            if ordered_exprs.is_empty() {
1202                break;
1203            }
1204            // Remove indices that have an ordering from `search_indices`, and
1205            // treat ordered expressions as constants in subsequent iterations.
1206            // We can do this because the "next" key only matters in a lexicographical
1207            // ordering when the keys to its left have the same values.
1208            //
1209            // Note that these expressions are not properly "constants". This is just
1210            // an implementation strategy confined to this function.
1211            for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1212                let const_expr = ConstExpr::from(Arc::clone(expr));
1213                eq_properties.add_constants(std::iter::once(const_expr))?;
1214                search_indices.shift_remove(idx);
1215            }
1216            // Add new ordered section to the state.
1217            result.extend(ordered_exprs);
1218        }
1219        Ok(result.into_iter().unzip())
1220    }
1221
1222    /// This function determines whether the provided expression is constant
1223    /// based on the known constants. For example, if columns `a` and `b` are
1224    /// constant, then expressions `a`, `b` and `a + b` will all return `true`
1225    /// whereas expression `c` will return `false`.
1226    ///
1227    /// # Parameters
1228    ///
1229    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1230    ///   expression to be checked.
1231    ///
1232    /// # Returns
1233    ///
1234    /// Returns a `Some` value if the expression is constant according to
1235    /// equivalence group, and `None` otherwise. The `Some` variant contains
1236    /// an `AcrossPartitions` value indicating whether the expression is
1237    /// constant across partitions, and its actual value (if available).
1238    pub fn is_expr_constant(
1239        &self,
1240        expr: &Arc<dyn PhysicalExpr>,
1241    ) -> Option<AcrossPartitions> {
1242        self.eq_group.is_expr_constant(expr)
1243    }
1244
1245    /// Retrieves the properties for a given physical expression.
1246    ///
1247    /// This function constructs an [`ExprProperties`] object for the given
1248    /// expression, which encapsulates information about the expression's
1249    /// properties, including its [`SortProperties`] and [`Interval`].
1250    ///
1251    /// # Parameters
1252    ///
1253    /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1254    ///   for which ordering information is sought.
1255    ///
1256    /// # Returns
1257    ///
1258    /// Returns an [`ExprProperties`] object containing the ordering and range
1259    /// information for the given expression.
1260    pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1261        ExprPropertiesNode::new_unknown(expr)
1262            .transform_up(|expr| update_properties(expr, self))
1263            .data()
1264            .map(|node| node.data)
1265            .unwrap_or_else(|_| ExprProperties::new_unknown())
1266    }
1267
1268    /// Transforms this `EquivalenceProperties` by mapping columns in the
1269    /// original schema to columns in the new schema by index.
1270    pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
1271        // The new schema and the original schema is aligned when they have the
1272        // same number of columns, and fields at the same index have the same
1273        // type in both schemas.
1274        let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1275            && self
1276                .schema
1277                .fields
1278                .iter()
1279                .zip(schema.fields.iter())
1280                .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1281        if !schemas_aligned {
1282            // Rewriting equivalence properties in terms of new schema is not
1283            // safe when schemas are not aligned:
1284            return plan_err!(
1285                "Schemas have to be aligned to rewrite equivalences:\n Old schema: {}\n New schema: {}",
1286                self.schema,
1287                schema
1288            );
1289        }
1290
1291        // Rewrite equivalence classes according to the new schema:
1292        let mut eq_classes = vec![];
1293        for mut eq_class in self.eq_group {
1294            // Rewrite the expressions in the equivalence class:
1295            eq_class.exprs = eq_class
1296                .exprs
1297                .into_iter()
1298                .map(|expr| with_new_schema(expr, &schema))
1299                .collect::<Result<_>>()?;
1300            // Rewrite the constant value (if available and known):
1301            let data_type = eq_class
1302                .canonical_expr()
1303                .map(|e| e.data_type(&schema))
1304                .transpose()?;
1305            if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
1306                (data_type, &mut eq_class.constant)
1307            {
1308                *value = value.cast_to(&data_type)?;
1309            }
1310            eq_classes.push(eq_class);
1311        }
1312        self.eq_group = eq_classes.into();
1313
1314        // Rewrite orderings according to new schema:
1315        self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
1316        self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
1317
1318        // Update the schema:
1319        self.schema = schema;
1320
1321        Ok(self)
1322    }
1323}
1324
1325impl From<EquivalenceProperties> for OrderingEquivalenceClass {
1326    fn from(eq_properties: EquivalenceProperties) -> Self {
1327        eq_properties.oeq_class
1328    }
1329}
1330
1331/// More readable display version of the `EquivalenceProperties`.
1332///
1333/// Format:
1334/// ```text
1335/// order: [[b@1 ASC NULLS LAST]], eq: [{members: [a@0], constant: (heterogeneous)}]
1336/// ```
1337impl Display for EquivalenceProperties {
1338    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1339        let empty_eq_group = self.eq_group.is_empty();
1340        let empty_oeq_class = self.oeq_class.is_empty();
1341        if empty_oeq_class && empty_eq_group {
1342            write!(f, "No properties")?;
1343        } else if !empty_oeq_class {
1344            write!(f, "order: {}", self.oeq_class)?;
1345            if !empty_eq_group {
1346                write!(f, ", eq: {}", self.eq_group)?;
1347            }
1348        } else {
1349            write!(f, "eq: {}", self.eq_group)?;
1350        }
1351        Ok(())
1352    }
1353}
1354
1355/// Calculates the properties of a given [`ExprPropertiesNode`].
1356///
1357/// Order information can be retrieved as:
1358/// - If it is a leaf node, we directly find the order of the node by looking
1359///   at the given sort expression and equivalence properties if it is a `Column`
1360///   leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1361///   it as singleton so that it can cooperate with all ordered columns.
1362/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1363///   and operator has its own rules on how to propagate the children orderings.
1364///   However, before we engage in recursion, we check whether this intermediate
1365///   node directly matches with the sort expression. If there is a match, the
1366///   sort expression emerges at that node immediately, discarding the recursive
1367///   result coming from its children.
1368///
1369/// Range information is calculated as:
1370/// - If it is a `Literal` node, we set the range as a point value. If it is a
1371///   `Column` node, we set the datatype of the range, but cannot give an interval
1372///   for the range, yet.
1373/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1374///   and operator has its own rules on how to propagate the children range.
1375fn update_properties(
1376    mut node: ExprPropertiesNode,
1377    eq_properties: &EquivalenceProperties,
1378) -> Result<Transformed<ExprPropertiesNode>> {
1379    // First, try to gather the information from the children:
1380    if !node.expr.children().is_empty() {
1381        // We have an intermediate (non-leaf) node, account for its children:
1382        let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1383        node.data = node.expr.get_properties(&children_props)?;
1384    } else if node.expr.as_any().is::<Literal>() {
1385        // We have a Literal, which is one of the two possible leaf node types:
1386        node.data = node.expr.get_properties(&[])?;
1387    } else if node.expr.as_any().is::<Column>() {
1388        // We have a Column, which is the other possible leaf node type:
1389        node.data.range =
1390            Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1391    }
1392    // Now, check what we know about orderings:
1393    let normal_expr = eq_properties
1394        .eq_group
1395        .normalize_expr(Arc::clone(&node.expr));
1396    let oeq_class = &eq_properties.oeq_cache.normal_cls;
1397    if eq_properties.is_expr_constant(&normal_expr).is_some()
1398        || oeq_class.is_expr_partial_const(&normal_expr)
1399    {
1400        node.data.sort_properties = SortProperties::Singleton;
1401    } else if let Some(options) = oeq_class.get_options(&normal_expr) {
1402        node.data.sort_properties = SortProperties::Ordered(options);
1403    }
1404    Ok(Transformed::yes(node))
1405}
1406
1407/// This function examines whether a referring expression directly refers to a
1408/// given referred expression or if any of its children in the expression tree
1409/// refer to the specified expression.
1410///
1411/// # Parameters
1412///
1413/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1414/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1415///
1416/// # Returns
1417///
1418/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1419/// `referred_expr` or not.
1420fn expr_refers(
1421    referring_expr: &Arc<dyn PhysicalExpr>,
1422    referred_expr: &Arc<dyn PhysicalExpr>,
1423) -> bool {
1424    referring_expr.eq(referred_expr)
1425        || referring_expr
1426            .children()
1427            .iter()
1428            .any(|child| expr_refers(child, referred_expr))
1429}
1430
1431/// This function examines the given expression and its properties to determine
1432/// the ordering properties of the expression. The range knowledge is not utilized
1433/// yet in the scope of this function.
1434///
1435/// # Parameters
1436///
1437/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1438///   which ordering properties need to be determined.
1439/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1440///   referred to by `expr`.
1441/// - `schema``: A reference to the schema which the `expr` columns refer.
1442///
1443/// # Returns
1444///
1445/// A `SortProperties` indicating the ordering information of the given expression.
1446fn get_expr_properties(
1447    expr: &Arc<dyn PhysicalExpr>,
1448    dependencies: &Dependencies,
1449    schema: &SchemaRef,
1450) -> Result<ExprProperties> {
1451    if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1452        // If exact match is found, return its ordering.
1453        Ok(ExprProperties {
1454            sort_properties: SortProperties::Ordered(column_order.options),
1455            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1456            preserves_lex_ordering: false,
1457        })
1458    } else if expr.as_any().downcast_ref::<Column>().is_some() {
1459        Ok(ExprProperties {
1460            sort_properties: SortProperties::Unordered,
1461            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1462            preserves_lex_ordering: false,
1463        })
1464    } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1465        Ok(ExprProperties {
1466            sort_properties: SortProperties::Singleton,
1467            range: literal.value().into(),
1468            preserves_lex_ordering: true,
1469        })
1470    } else {
1471        // Find orderings of its children
1472        let child_states = expr
1473            .children()
1474            .iter()
1475            .map(|child| get_expr_properties(child, dependencies, schema))
1476            .collect::<Result<Vec<_>>>()?;
1477        // Calculate expression ordering using ordering of its children.
1478        expr.get_properties(&child_states)
1479    }
1480}