datafusion_physical_expr/equivalence/
ordering.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Display;
19use std::ops::Deref;
20use std::sync::Arc;
21use std::vec::IntoIter;
22
23use crate::expressions::with_new_schema;
24use crate::{add_offset_to_physical_sort_exprs, LexOrdering, PhysicalExpr};
25
26use arrow::compute::SortOptions;
27use arrow::datatypes::SchemaRef;
28use datafusion_common::{HashSet, Result};
29use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
30
31/// An `OrderingEquivalenceClass` keeps track of distinct alternative orderings
32/// than can describe a table. For example, consider the following table:
33///
34/// ```text
35/// ┌───┬───┬───┬───┐
36/// │ a │ b │ c │ d │
37/// ├───┼───┼───┼───┤
38/// │ 1 │ 4 │ 3 │ 1 │
39/// │ 2 │ 3 │ 3 │ 2 │
40/// │ 3 │ 1 │ 2 │ 2 │
41/// │ 3 │ 2 │ 1 │ 3 │
42/// └───┴───┴───┴───┘
43/// ```
44///
45/// Here, both `[a ASC, b ASC]` and `[c DESC, d ASC]` describe the table
46/// ordering. In this case, we say that these orderings are equivalent.
47///
48/// An `OrderingEquivalenceClass` is a set of such equivalent orderings, which
49/// is represented by a vector of `LexOrdering`s. The set does not store any
50/// redundant information by enforcing the invariant that no suffix of an
51/// ordering in the equivalence class is a prefix of another ordering in the
52/// equivalence class. The set can be empty, which means that there are no
53/// orderings that describe the table.
54#[derive(Clone, Debug, Default, Eq, PartialEq)]
55pub struct OrderingEquivalenceClass {
56    orderings: Vec<LexOrdering>,
57}
58
59impl OrderingEquivalenceClass {
60    /// Clears (empties) this ordering equivalence class.
61    pub fn clear(&mut self) {
62        self.orderings.clear();
63    }
64
65    /// Creates a new ordering equivalence class from the given orderings
66    /// and removes any redundant entries (if given).
67    pub fn new(
68        orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
69    ) -> Self {
70        let mut result = Self {
71            orderings: orderings.into_iter().filter_map(LexOrdering::new).collect(),
72        };
73        result.remove_redundant_entries();
74        result
75    }
76
77    /// Extend this ordering equivalence class with the given orderings.
78    pub fn extend(&mut self, orderings: impl IntoIterator<Item = LexOrdering>) {
79        self.orderings.extend(orderings);
80        // Make sure that there are no redundant orderings:
81        self.remove_redundant_entries();
82    }
83
84    /// Adds new orderings into this ordering equivalence class.
85    pub fn add_orderings(
86        &mut self,
87        sort_exprs: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
88    ) {
89        self.orderings
90            .extend(sort_exprs.into_iter().filter_map(LexOrdering::new));
91        // Make sure that there are no redundant orderings:
92        self.remove_redundant_entries();
93    }
94
95    /// Removes redundant orderings from this ordering equivalence class.
96    ///
97    /// For instance, if we already have the ordering `[a ASC, b ASC, c DESC]`,
98    /// then there is no need to keep ordering `[a ASC, b ASC]` in the state.
99    fn remove_redundant_entries(&mut self) {
100        let mut work = true;
101        while work {
102            work = false;
103            let mut idx = 0;
104            'outer: while idx < self.orderings.len() {
105                let mut ordering_idx = idx + 1;
106                while ordering_idx < self.orderings.len() {
107                    if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
108                        work = true;
109                        if remove {
110                            self.orderings.swap_remove(idx);
111                            continue 'outer;
112                        }
113                    }
114                    if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
115                        work = true;
116                        if remove {
117                            self.orderings.swap_remove(ordering_idx);
118                            continue;
119                        }
120                    }
121                    ordering_idx += 1;
122                }
123                idx += 1;
124            }
125        }
126    }
127
128    /// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of
129    /// `orderings[pre_idx]`. If there is any overlap, returns a `Some(true)`
130    /// if any trimming took place, and `Some(false)` otherwise. If there is
131    /// no overlap, returns `None`.
132    ///
133    /// For example, if `orderings[idx]` is `[a ASC, b ASC, c DESC]` and
134    /// `orderings[pre_idx]` is `[b ASC, c DESC]`, then the function will trim
135    /// `orderings[idx]` to `[a ASC]`.
136    fn resolve_overlap(&mut self, idx: usize, pre_idx: usize) -> Option<bool> {
137        let length = self.orderings[idx].len();
138        let other_length = self.orderings[pre_idx].len();
139        for overlap in 1..=length.min(other_length) {
140            if self.orderings[idx][length - overlap..]
141                == self.orderings[pre_idx][..overlap]
142            {
143                return Some(!self.orderings[idx].truncate(length - overlap));
144            }
145        }
146        None
147    }
148
149    /// Returns the concatenation of all the orderings. This enables merge
150    /// operations to preserve all equivalent orderings simultaneously.
151    pub fn output_ordering(&self) -> Option<LexOrdering> {
152        self.orderings.iter().cloned().reduce(|mut cat, o| {
153            cat.extend(o);
154            cat
155        })
156    }
157
158    // Append orderings in `other` to all existing orderings in this ordering
159    // equivalence class.
160    pub fn join_suffix(mut self, other: &Self) -> Self {
161        let n_ordering = self.orderings.len();
162        // Replicate entries before cross product:
163        let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering);
164        self.orderings = self.orderings.into_iter().cycle().take(n_cross).collect();
165        // Append sort expressions of `other` to the current orderings:
166        for (outer_idx, ordering) in other.iter().enumerate() {
167            let base = outer_idx * n_ordering;
168            // Use the cross product index:
169            for idx in base..(base + n_ordering) {
170                self.orderings[idx].extend(ordering.iter().cloned());
171            }
172        }
173        self
174    }
175
176    /// Adds `offset` value to the index of each expression inside this
177    /// ordering equivalence class.
178    pub fn add_offset(&mut self, offset: isize) -> Result<()> {
179        let orderings = std::mem::take(&mut self.orderings);
180        for ordering_result in orderings
181            .into_iter()
182            .map(|o| add_offset_to_physical_sort_exprs(o, offset))
183        {
184            self.orderings.extend(LexOrdering::new(ordering_result?));
185        }
186        Ok(())
187    }
188
189    /// Transforms this `OrderingEquivalenceClass` by mapping columns in the
190    /// original schema to columns in the new schema by index. The new schema
191    /// and the original schema needs to be aligned; i.e. they should have the
192    /// same number of columns, and fields at the same index have the same type
193    /// in both schemas.
194    pub fn with_new_schema(mut self, schema: &SchemaRef) -> Result<Self> {
195        self.orderings = self
196            .orderings
197            .into_iter()
198            .map(|ordering| {
199                ordering
200                    .into_iter()
201                    .map(|mut sort_expr| {
202                        sort_expr.expr = with_new_schema(sort_expr.expr, schema)?;
203                        Ok(sort_expr)
204                    })
205                    .collect::<Result<Vec<_>>>()
206                    // The following `unwrap` is safe because the vector will always
207                    // be non-empty.
208                    .map(|v| LexOrdering::new(v).unwrap())
209            })
210            .collect::<Result<_>>()?;
211        Ok(self)
212    }
213
214    /// Gets sort options associated with this expression if it is a leading
215    /// ordering expression. Otherwise, returns `None`.
216    pub fn get_options(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<SortOptions> {
217        for ordering in self.iter() {
218            let leading_ordering = &ordering[0];
219            if leading_ordering.expr.eq(expr) {
220                return Some(leading_ordering.options);
221            }
222        }
223        None
224    }
225
226    /// Checks whether the given expression is partially constant according to
227    /// this ordering equivalence class.
228    ///
229    /// This function determines whether `expr` appears in at least one combination
230    /// of `descending` and `nulls_first` options that indicate partial constantness
231    /// in a lexicographical ordering. Specifically, an expression is considered
232    /// a partial constant in this context if its `SortOptions` satisfies either
233    /// of the following conditions:
234    /// - It is `descending` with `nulls_first` and _also_ `ascending` with
235    ///   `nulls_last`, OR
236    /// - It is `descending` with `nulls_last` and _also_ `ascending` with
237    ///   `nulls_first`.
238    ///
239    /// The equivalence mechanism primarily uses `ConstExpr`s to represent globally
240    /// constant expressions. However, some expressions may only be partially
241    /// constant within a lexicographical ordering. This function helps identify
242    /// such cases. If an expression is constant within a prefix ordering, it is
243    /// added as a constant during `ordering_satisfy_requirement()` iterations
244    /// after the corresponding prefix requirement is satisfied.
245    ///
246    /// ### Future Improvements
247    ///
248    /// This function may become unnecessary if any of the following improvements
249    /// are implemented:
250    /// 1. `SortOptions` supports encoding constantness information.
251    /// 2. `EquivalenceProperties` gains `FunctionalDependency` awareness, eliminating
252    ///    the need for `Constant` and `Constraints`.
253    pub fn is_expr_partial_const(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
254        let mut constantness_defining_pairs = [
255            HashSet::from([(false, false), (true, true)]),
256            HashSet::from([(false, true), (true, false)]),
257        ];
258
259        for ordering in self.iter() {
260            let leading_ordering = ordering.first();
261            if leading_ordering.expr.eq(expr) {
262                let opt = (
263                    leading_ordering.options.descending,
264                    leading_ordering.options.nulls_first,
265                );
266                constantness_defining_pairs[0].remove(&opt);
267                constantness_defining_pairs[1].remove(&opt);
268            }
269        }
270
271        constantness_defining_pairs
272            .iter()
273            .any(|pair| pair.is_empty())
274    }
275}
276
277impl Deref for OrderingEquivalenceClass {
278    type Target = [LexOrdering];
279
280    fn deref(&self) -> &Self::Target {
281        self.orderings.as_slice()
282    }
283}
284
285impl From<Vec<LexOrdering>> for OrderingEquivalenceClass {
286    fn from(orderings: Vec<LexOrdering>) -> Self {
287        let mut result = Self { orderings };
288        result.remove_redundant_entries();
289        result
290    }
291}
292
293/// Convert the `OrderingEquivalenceClass` into an iterator of `LexOrdering`s.
294impl IntoIterator for OrderingEquivalenceClass {
295    type Item = LexOrdering;
296    type IntoIter = IntoIter<Self::Item>;
297
298    fn into_iter(self) -> Self::IntoIter {
299        self.orderings.into_iter()
300    }
301}
302
303impl Display for OrderingEquivalenceClass {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        write!(f, "[")?;
306        let mut iter = self.orderings.iter();
307        if let Some(ordering) = iter.next() {
308            write!(f, "[{ordering}]")?;
309        }
310        for ordering in iter {
311            write!(f, ", [{ordering}]")?;
312        }
313        write!(f, "]")
314    }
315}
316
317impl From<OrderingEquivalenceClass> for Vec<LexOrdering> {
318    fn from(oeq_class: OrderingEquivalenceClass) -> Self {
319        oeq_class.orderings
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::Arc;
326
327    use crate::equivalence::tests::create_test_schema;
328    use crate::equivalence::{
329        convert_to_orderings, convert_to_sort_exprs, EquivalenceClass, EquivalenceGroup,
330        EquivalenceProperties, OrderingEquivalenceClass,
331    };
332    use crate::expressions::{col, BinaryExpr, Column};
333    use crate::utils::tests::TestScalarUDF;
334    use crate::{
335        AcrossPartitions, ConstExpr, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
336        ScalarFunctionExpr,
337    };
338
339    use arrow::compute::SortOptions;
340    use arrow::datatypes::{DataType, Field, Schema};
341    use datafusion_common::Result;
342    use datafusion_expr::{Operator, ScalarUDF};
343
344    #[test]
345    fn test_ordering_satisfy() -> Result<()> {
346        let input_schema = Arc::new(Schema::new(vec![
347            Field::new("a", DataType::Int64, true),
348            Field::new("b", DataType::Int64, true),
349        ]));
350        let crude = vec![PhysicalSortExpr {
351            expr: Arc::new(Column::new("a", 0)),
352            options: SortOptions::default(),
353        }];
354        let finer = vec![
355            PhysicalSortExpr {
356                expr: Arc::new(Column::new("a", 0)),
357                options: SortOptions::default(),
358            },
359            PhysicalSortExpr {
360                expr: Arc::new(Column::new("b", 1)),
361                options: SortOptions::default(),
362            },
363        ];
364        // finer ordering satisfies, crude ordering should return true
365        let eq_properties_finer = EquivalenceProperties::new_with_orderings(
366            Arc::clone(&input_schema),
367            [finer.clone()],
368        );
369        assert!(eq_properties_finer.ordering_satisfy(crude.clone())?);
370
371        // Crude ordering doesn't satisfy finer ordering. should return false
372        let eq_properties_crude =
373            EquivalenceProperties::new_with_orderings(Arc::clone(&input_schema), [crude]);
374        assert!(!eq_properties_crude.ordering_satisfy(finer)?);
375        Ok(())
376    }
377
378    #[test]
379    fn test_ordering_satisfy_with_equivalence2() -> Result<()> {
380        let test_schema = create_test_schema()?;
381        let col_a = &col("a", &test_schema)?;
382        let col_b = &col("b", &test_schema)?;
383        let col_c = &col("c", &test_schema)?;
384        let col_d = &col("d", &test_schema)?;
385        let col_e = &col("e", &test_schema)?;
386        let col_f = &col("f", &test_schema)?;
387        let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
388
389        let floor_a = Arc::new(ScalarFunctionExpr::try_new(
390            Arc::clone(&test_fun),
391            vec![Arc::clone(col_a)],
392            &test_schema,
393        )?) as PhysicalExprRef;
394        let floor_f = Arc::new(ScalarFunctionExpr::try_new(
395            Arc::clone(&test_fun),
396            vec![Arc::clone(col_f)],
397            &test_schema,
398        )?) as PhysicalExprRef;
399        let exp_a = Arc::new(ScalarFunctionExpr::try_new(
400            Arc::clone(&test_fun),
401            vec![Arc::clone(col_a)],
402            &test_schema,
403        )?) as PhysicalExprRef;
404
405        let a_plus_b = Arc::new(BinaryExpr::new(
406            Arc::clone(col_a),
407            Operator::Plus,
408            Arc::clone(col_b),
409        )) as Arc<dyn PhysicalExpr>;
410        let options = SortOptions {
411            descending: false,
412            nulls_first: false,
413        };
414
415        let test_cases = vec![
416            // ------------ TEST CASE 1 ------------
417            (
418                // orderings
419                vec![
420                    // [a ASC, d ASC, b ASC]
421                    vec![(col_a, options), (col_d, options), (col_b, options)],
422                    // [c ASC]
423                    vec![(col_c, options)],
424                ],
425                // equivalence classes
426                vec![vec![col_a, col_f]],
427                // constants
428                vec![col_e],
429                // requirement [a ASC, b ASC], requirement is not satisfied.
430                vec![(col_a, options), (col_b, options)],
431                // expected: requirement is not satisfied.
432                false,
433            ),
434            // ------------ TEST CASE 2 ------------
435            (
436                // orderings
437                vec![
438                    // [a ASC, c ASC, b ASC]
439                    vec![(col_a, options), (col_c, options), (col_b, options)],
440                    // [d ASC]
441                    vec![(col_d, options)],
442                ],
443                // equivalence classes
444                vec![vec![col_a, col_f]],
445                // constants
446                vec![col_e],
447                // requirement [floor(a) ASC],
448                vec![(&floor_a, options)],
449                // expected: requirement is satisfied.
450                true,
451            ),
452            // ------------ TEST CASE 2.1 ------------
453            (
454                // orderings
455                vec![
456                    // [a ASC, c ASC, b ASC]
457                    vec![(col_a, options), (col_c, options), (col_b, options)],
458                    // [d ASC]
459                    vec![(col_d, options)],
460                ],
461                // equivalence classes
462                vec![vec![col_a, col_f]],
463                // constants
464                vec![col_e],
465                // requirement [floor(f) ASC], (Please note that a=f)
466                vec![(&floor_f, options)],
467                // expected: requirement is satisfied.
468                true,
469            ),
470            // ------------ TEST CASE 3 ------------
471            (
472                // orderings
473                vec![
474                    // [a ASC, c ASC, b ASC]
475                    vec![(col_a, options), (col_c, options), (col_b, options)],
476                    // [d ASC]
477                    vec![(col_d, options)],
478                ],
479                // equivalence classes
480                vec![vec![col_a, col_f]],
481                // constants
482                vec![col_e],
483                // requirement [a ASC, c ASC, a+b ASC],
484                vec![(col_a, options), (col_c, options), (&a_plus_b, options)],
485                // expected: requirement is satisfied.
486                true,
487            ),
488            // ------------ TEST CASE 4 ------------
489            (
490                // orderings
491                vec![
492                    // [a ASC, b ASC, c ASC, d ASC]
493                    vec![
494                        (col_a, options),
495                        (col_b, options),
496                        (col_c, options),
497                        (col_d, options),
498                    ],
499                ],
500                // equivalence classes
501                vec![vec![col_a, col_f]],
502                // constants
503                vec![col_e],
504                // requirement [floor(a) ASC, a+b ASC],
505                vec![(&floor_a, options), (&a_plus_b, options)],
506                // expected: requirement is satisfied.
507                false,
508            ),
509            // ------------ TEST CASE 5 ------------
510            (
511                // orderings
512                vec![
513                    // [a ASC, b ASC, c ASC, d ASC]
514                    vec![
515                        (col_a, options),
516                        (col_b, options),
517                        (col_c, options),
518                        (col_d, options),
519                    ],
520                ],
521                // equivalence classes
522                vec![vec![col_a, col_f]],
523                // constants
524                vec![col_e],
525                // requirement [exp(a) ASC, a+b ASC],
526                vec![(&exp_a, options), (&a_plus_b, options)],
527                // expected: requirement is not satisfied.
528                // TODO: If we know that exp function is 1-to-1 function.
529                //  we could have deduced that above requirement is satisfied.
530                false,
531            ),
532            // ------------ TEST CASE 6 ------------
533            (
534                // orderings
535                vec![
536                    // [a ASC, d ASC, b ASC]
537                    vec![(col_a, options), (col_d, options), (col_b, options)],
538                    // [c ASC]
539                    vec![(col_c, options)],
540                ],
541                // equivalence classes
542                vec![vec![col_a, col_f]],
543                // constants
544                vec![col_e],
545                // requirement [a ASC, d ASC, floor(a) ASC],
546                vec![(col_a, options), (col_d, options), (&floor_a, options)],
547                // expected: requirement is satisfied.
548                true,
549            ),
550            // ------------ TEST CASE 7 ------------
551            (
552                // orderings
553                vec![
554                    // [a ASC, c ASC, b ASC]
555                    vec![(col_a, options), (col_c, options), (col_b, options)],
556                    // [d ASC]
557                    vec![(col_d, options)],
558                ],
559                // equivalence classes
560                vec![vec![col_a, col_f]],
561                // constants
562                vec![col_e],
563                // requirement [a ASC, floor(a) ASC, a + b ASC],
564                vec![(col_a, options), (&floor_a, options), (&a_plus_b, options)],
565                // expected: requirement is not satisfied.
566                false,
567            ),
568            // ------------ TEST CASE 8 ------------
569            (
570                // orderings
571                vec![
572                    // [a ASC, b ASC, c ASC]
573                    vec![(col_a, options), (col_b, options), (col_c, options)],
574                    // [d ASC]
575                    vec![(col_d, options)],
576                ],
577                // equivalence classes
578                vec![vec![col_a, col_f]],
579                // constants
580                vec![col_e],
581                // requirement [a ASC, c ASC, floor(a) ASC, a + b ASC],
582                vec![
583                    (col_a, options),
584                    (col_c, options),
585                    (&floor_a, options),
586                    (&a_plus_b, options),
587                ],
588                // expected: requirement is not satisfied.
589                false,
590            ),
591            // ------------ TEST CASE 9 ------------
592            (
593                // orderings
594                vec![
595                    // [a ASC, b ASC, c ASC, d ASC]
596                    vec![
597                        (col_a, options),
598                        (col_b, options),
599                        (col_c, options),
600                        (col_d, options),
601                    ],
602                ],
603                // equivalence classes
604                vec![vec![col_a, col_f]],
605                // constants
606                vec![col_e],
607                // requirement [a ASC, b ASC, c ASC, floor(a) ASC],
608                vec![
609                    (col_a, options),
610                    (col_b, options),
611                    (col_c, options),
612                    (&floor_a, options),
613                ],
614                // expected: requirement is satisfied.
615                true,
616            ),
617            // ------------ TEST CASE 10 ------------
618            (
619                // orderings
620                vec![
621                    // [d ASC, b ASC]
622                    vec![(col_d, options), (col_b, options)],
623                    // [c ASC, a ASC]
624                    vec![(col_c, options), (col_a, options)],
625                ],
626                // equivalence classes
627                vec![vec![col_a, col_f]],
628                // constants
629                vec![col_e],
630                // requirement [c ASC, d ASC, a + b ASC],
631                vec![(col_c, options), (col_d, options), (&a_plus_b, options)],
632                // expected: requirement is satisfied.
633                true,
634            ),
635        ];
636
637        for (orderings, eq_group, constants, reqs, expected) in test_cases {
638            let err_msg =
639                format!("error in test orderings: {orderings:?}, eq_group: {eq_group:?}, constants: {constants:?}, reqs: {reqs:?}, expected: {expected:?}");
640            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&test_schema));
641            let orderings = convert_to_orderings(&orderings);
642            eq_properties.add_orderings(orderings);
643            let classes = eq_group
644                .into_iter()
645                .map(|eq_class| EquivalenceClass::new(eq_class.into_iter().cloned()));
646            let eq_group = EquivalenceGroup::new(classes);
647            eq_properties.add_equivalence_group(eq_group)?;
648
649            let constants = constants.into_iter().map(|expr| {
650                ConstExpr::new(Arc::clone(expr), AcrossPartitions::Uniform(None))
651            });
652            eq_properties.add_constants(constants)?;
653
654            let reqs = convert_to_sort_exprs(&reqs);
655            assert_eq!(eq_properties.ordering_satisfy(reqs)?, expected, "{err_msg}");
656        }
657
658        Ok(())
659    }
660
661    #[test]
662    fn test_ordering_satisfy_different_lengths() -> Result<()> {
663        let test_schema = create_test_schema()?;
664        let col_a = &col("a", &test_schema)?;
665        let col_b = &col("b", &test_schema)?;
666        let col_c = &col("c", &test_schema)?;
667        let col_d = &col("d", &test_schema)?;
668        let col_e = &col("e", &test_schema)?;
669        let col_f = &col("f", &test_schema)?;
670        let options = SortOptions {
671            descending: false,
672            nulls_first: false,
673        };
674        // a=c (e.g they are aliases).
675        let mut eq_properties = EquivalenceProperties::new(test_schema);
676        eq_properties.add_equal_conditions(Arc::clone(col_a), Arc::clone(col_c))?;
677
678        let orderings = vec![
679            vec![(col_a, options)],
680            vec![(col_e, options)],
681            vec![(col_d, options), (col_f, options)],
682        ];
683        let orderings = convert_to_orderings(&orderings);
684
685        // Column [a ASC], [e ASC], [d ASC, f ASC] are all valid orderings for the schema.
686        eq_properties.add_orderings(orderings);
687
688        // First entry in the tuple is required ordering, second entry is the expected flag
689        // that indicates whether this required ordering is satisfied.
690        // ([a ASC], true) indicate a ASC requirement is already satisfied by existing orderings.
691        let test_cases = vec![
692            // [c ASC, a ASC, e ASC], expected represents this requirement is satisfied
693            (
694                vec![(col_c, options), (col_a, options), (col_e, options)],
695                true,
696            ),
697            (vec![(col_c, options), (col_b, options)], false),
698            (vec![(col_c, options), (col_d, options)], true),
699            (
700                vec![(col_d, options), (col_f, options), (col_b, options)],
701                false,
702            ),
703            (vec![(col_d, options), (col_f, options)], true),
704        ];
705
706        for (reqs, expected) in test_cases {
707            let err_msg =
708                format!("error in test reqs: {reqs:?}, expected: {expected:?}",);
709            let reqs = convert_to_sort_exprs(&reqs);
710            assert_eq!(eq_properties.ordering_satisfy(reqs)?, expected, "{err_msg}");
711        }
712
713        Ok(())
714    }
715
716    #[test]
717    fn test_remove_redundant_entries_oeq_class() -> Result<()> {
718        let schema = create_test_schema()?;
719        let col_a = &col("a", &schema)?;
720        let col_b = &col("b", &schema)?;
721        let col_c = &col("c", &schema)?;
722        let col_d = &col("d", &schema)?;
723        let col_e = &col("e", &schema)?;
724
725        let option_asc = SortOptions {
726            descending: false,
727            nulls_first: false,
728        };
729        let option_desc = SortOptions {
730            descending: true,
731            nulls_first: true,
732        };
733
734        // First entry in the tuple is the given orderings for the table
735        // Second entry is the simplest version of the given orderings that is functionally equivalent.
736        let test_cases = vec![
737            // ------- TEST CASE 1 ---------
738            (
739                // ORDERINGS GIVEN
740                vec![
741                    // [a ASC, b ASC]
742                    vec![(col_a, option_asc), (col_b, option_asc)],
743                ],
744                // EXPECTED orderings that is succinct.
745                vec![
746                    // [a ASC, b ASC]
747                    vec![(col_a, option_asc), (col_b, option_asc)],
748                ],
749            ),
750            // ------- TEST CASE 2 ---------
751            (
752                // ORDERINGS GIVEN
753                vec![
754                    // [a ASC, b ASC]
755                    vec![(col_a, option_asc), (col_b, option_asc)],
756                    // [a ASC, b ASC, c ASC]
757                    vec![
758                        (col_a, option_asc),
759                        (col_b, option_asc),
760                        (col_c, option_asc),
761                    ],
762                ],
763                // EXPECTED orderings that is succinct.
764                vec![
765                    // [a ASC, b ASC, c ASC]
766                    vec![
767                        (col_a, option_asc),
768                        (col_b, option_asc),
769                        (col_c, option_asc),
770                    ],
771                ],
772            ),
773            // ------- TEST CASE 3 ---------
774            (
775                // ORDERINGS GIVEN
776                vec![
777                    // [a ASC, b DESC]
778                    vec![(col_a, option_asc), (col_b, option_desc)],
779                    // [a ASC]
780                    vec![(col_a, option_asc)],
781                    // [a ASC, c ASC]
782                    vec![(col_a, option_asc), (col_c, option_asc)],
783                ],
784                // EXPECTED orderings that is succinct.
785                vec![
786                    // [a ASC, b DESC]
787                    vec![(col_a, option_asc), (col_b, option_desc)],
788                    // [a ASC, c ASC]
789                    vec![(col_a, option_asc), (col_c, option_asc)],
790                ],
791            ),
792            // ------- TEST CASE 4 ---------
793            (
794                // ORDERINGS GIVEN
795                vec![
796                    // [a ASC, b ASC]
797                    vec![(col_a, option_asc), (col_b, option_asc)],
798                    // [a ASC, b ASC, c ASC]
799                    vec![
800                        (col_a, option_asc),
801                        (col_b, option_asc),
802                        (col_c, option_asc),
803                    ],
804                    // [a ASC]
805                    vec![(col_a, option_asc)],
806                ],
807                // EXPECTED orderings that is succinct.
808                vec![
809                    // [a ASC, b ASC, c ASC]
810                    vec![
811                        (col_a, option_asc),
812                        (col_b, option_asc),
813                        (col_c, option_asc),
814                    ],
815                ],
816            ),
817            // ------- TEST CASE 5 ---------
818            // Empty ordering
819            (
820                vec![],
821                // No ordering in the state (empty ordering is ignored).
822                vec![],
823            ),
824            // ------- TEST CASE 6 ---------
825            (
826                // ORDERINGS GIVEN
827                vec![
828                    // [a ASC, b ASC]
829                    vec![(col_a, option_asc), (col_b, option_asc)],
830                    // [b ASC]
831                    vec![(col_b, option_asc)],
832                ],
833                // EXPECTED orderings that is succinct.
834                vec![
835                    // [a ASC]
836                    vec![(col_a, option_asc)],
837                    // [b ASC]
838                    vec![(col_b, option_asc)],
839                ],
840            ),
841            // ------- TEST CASE 7 ---------
842            // b, a
843            // c, a
844            // d, b, c
845            (
846                // ORDERINGS GIVEN
847                vec![
848                    // [b ASC, a ASC]
849                    vec![(col_b, option_asc), (col_a, option_asc)],
850                    // [c ASC, a ASC]
851                    vec![(col_c, option_asc), (col_a, option_asc)],
852                    // [d ASC, b ASC, c ASC]
853                    vec![
854                        (col_d, option_asc),
855                        (col_b, option_asc),
856                        (col_c, option_asc),
857                    ],
858                ],
859                // EXPECTED orderings that is succinct.
860                vec![
861                    // [b ASC, a ASC]
862                    vec![(col_b, option_asc), (col_a, option_asc)],
863                    // [c ASC, a ASC]
864                    vec![(col_c, option_asc), (col_a, option_asc)],
865                    // [d ASC]
866                    vec![(col_d, option_asc)],
867                ],
868            ),
869            // ------- TEST CASE 8 ---------
870            // b, e
871            // c, a
872            // d, b, e, c, a
873            (
874                // ORDERINGS GIVEN
875                vec![
876                    // [b ASC, e ASC]
877                    vec![(col_b, option_asc), (col_e, option_asc)],
878                    // [c ASC, a ASC]
879                    vec![(col_c, option_asc), (col_a, option_asc)],
880                    // [d ASC, b ASC, e ASC, c ASC, a ASC]
881                    vec![
882                        (col_d, option_asc),
883                        (col_b, option_asc),
884                        (col_e, option_asc),
885                        (col_c, option_asc),
886                        (col_a, option_asc),
887                    ],
888                ],
889                // EXPECTED orderings that is succinct.
890                vec![
891                    // [b ASC, e ASC]
892                    vec![(col_b, option_asc), (col_e, option_asc)],
893                    // [c ASC, a ASC]
894                    vec![(col_c, option_asc), (col_a, option_asc)],
895                    // [d ASC]
896                    vec![(col_d, option_asc)],
897                ],
898            ),
899            // ------- TEST CASE 9 ---------
900            // b
901            // a, b, c
902            // d, a, b
903            (
904                // ORDERINGS GIVEN
905                vec![
906                    // [b ASC]
907                    vec![(col_b, option_asc)],
908                    // [a ASC, b ASC, c ASC]
909                    vec![
910                        (col_a, option_asc),
911                        (col_b, option_asc),
912                        (col_c, option_asc),
913                    ],
914                    // [d ASC, a ASC, b ASC]
915                    vec![
916                        (col_d, option_asc),
917                        (col_a, option_asc),
918                        (col_b, option_asc),
919                    ],
920                ],
921                // EXPECTED orderings that is succinct.
922                vec![
923                    // [b ASC]
924                    vec![(col_b, option_asc)],
925                    // [a ASC, b ASC, c ASC]
926                    vec![
927                        (col_a, option_asc),
928                        (col_b, option_asc),
929                        (col_c, option_asc),
930                    ],
931                    // [d ASC]
932                    vec![(col_d, option_asc)],
933                ],
934            ),
935        ];
936        for (orderings, expected) in test_cases {
937            let orderings = convert_to_orderings(&orderings);
938            let expected = convert_to_orderings(&expected);
939            let actual = OrderingEquivalenceClass::from(orderings.clone());
940            let err_msg = format!(
941                "orderings: {orderings:?}, expected: {expected:?}, actual :{actual:?}"
942            );
943            assert_eq!(actual.len(), expected.len(), "{err_msg}");
944            for elem in actual {
945                assert!(expected.contains(&elem), "{}", err_msg);
946            }
947        }
948
949        Ok(())
950    }
951}