datafusion_physical_expr/equivalence/properties/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22pub use joins::*;
23pub use union::*;
24
25use std::fmt::{self, Display};
26use std::mem;
27use std::sync::Arc;
28
29use self::dependency::{
30 Dependencies, DependencyMap, construct_prefix_orderings,
31 generate_dependency_orderings, referred_dependencies,
32};
33use crate::equivalence::{
34 AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
35};
36use crate::expressions::{CastExpr, Column, Literal, with_new_schema};
37use crate::{
38 ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
39 PhysicalSortRequirement,
40};
41
42use arrow::datatypes::{DataType, SchemaRef};
43use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
44use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
45use datafusion_expr::interval_arithmetic::Interval;
46use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
47use datafusion_physical_expr_common::sort_expr::options_compatible;
48use datafusion_physical_expr_common::utils::ExprPropertiesNode;
49
50use indexmap::IndexSet;
51use itertools::Itertools;
52
53/// `EquivalenceProperties` stores information about the output of a plan node
54/// that can be used to optimize the plan. Currently, it keeps track of:
55/// - Sort expressions (orderings),
56/// - Equivalent expressions; i.e. expressions known to have the same value.
57/// - Constants expressions; i.e. expressions known to contain a single constant
58/// value.
59///
60/// Please see the [Using Ordering for Better Plans] blog for more details.
61///
62/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties` tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example, if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case, columns `a` and `b` always have the same value. With this
101/// information, Datafusion can optimize various operations. For example, if
102/// the partition requirement is `Hash(a)` and output partitioning is
103/// `Hash(b)`, then DataFusion avoids repartitioning the data as the existing
104/// partitioning satisfies the requirement.
105///
106/// # Code Example
107/// ```
108/// # use std::sync::Arc;
109/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
110/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
111/// # use datafusion_physical_expr::expressions::col;
112/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
113/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
114/// # Field::new("a", DataType::Int32, false),
115/// # Field::new("b", DataType::Int32, false),
116/// # Field::new("c", DataType::Int32, false),
117/// # ]));
118/// # let col_a = col("a", &schema).unwrap();
119/// # let col_b = col("b", &schema).unwrap();
120/// # let col_c = col("c", &schema).unwrap();
121/// // This object represents data that is sorted by a ASC, c DESC
122/// // with a single constant value of b
123/// let mut eq_properties = EquivalenceProperties::new(schema);
124/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
125/// eq_properties.add_ordering([
126/// PhysicalSortExpr::new_default(col_a).asc(),
127/// PhysicalSortExpr::new_default(col_c).desc(),
128/// ]);
129///
130/// assert_eq!(
131/// eq_properties.to_string(),
132/// "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: (heterogeneous)}]"
133/// );
134/// ```
135#[derive(Clone, Debug)]
136pub struct EquivalenceProperties {
137 /// Distinct equivalence classes (i.e. expressions with the same value).
138 eq_group: EquivalenceGroup,
139 /// Equivalent sort expressions (i.e. those define the same ordering).
140 oeq_class: OrderingEquivalenceClass,
141 /// Cache storing equivalent sort expressions in normal form (i.e. without
142 /// constants/duplicates and in standard form) and a map associating leading
143 /// terms with full sort expressions.
144 oeq_cache: OrderingEquivalenceCache,
145 /// Table constraints that factor in equivalence calculations.
146 constraints: Constraints,
147 /// Schema associated with this object.
148 schema: SchemaRef,
149}
150
151/// This object serves as a cache for storing equivalent sort expressions
152/// in normal form, and a map associating leading sort expressions with
153/// full lexicographical orderings. With this information, DataFusion can
154/// efficiently determine whether a given ordering is satisfied by the
155/// existing orderings, and discover new orderings based on the existing
156/// equivalence properties.
157#[derive(Clone, Debug, Default)]
158struct OrderingEquivalenceCache {
159 /// Equivalent sort expressions in normal form.
160 normal_cls: OrderingEquivalenceClass,
161 /// Map associating leading sort expressions with full lexicographical
162 /// orderings. Values are indices into `normal_cls`.
163 leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
164}
165
166impl OrderingEquivalenceCache {
167 /// Creates a new `OrderingEquivalenceCache` object with the given
168 /// equivalent orderings, which should be in normal form.
169 pub fn new(
170 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
171 ) -> Self {
172 let mut cache = Self {
173 normal_cls: OrderingEquivalenceClass::new(orderings),
174 leading_map: HashMap::new(),
175 };
176 cache.update_map();
177 cache
178 }
179
180 /// Updates/reconstructs the leading expression map according to the normal
181 /// ordering equivalence class within.
182 pub fn update_map(&mut self) {
183 self.leading_map.clear();
184 for (idx, ordering) in self.normal_cls.iter().enumerate() {
185 let expr = Arc::clone(&ordering.first().expr);
186 self.leading_map.entry(expr).or_default().push(idx);
187 }
188 }
189
190 /// Clears the cache, removing all orderings and leading expressions.
191 pub fn clear(&mut self) {
192 self.normal_cls.clear();
193 self.leading_map.clear();
194 }
195}
196
197impl EquivalenceProperties {
198 /// Helper used by the ordering equivalence rule when considering whether a
199 /// cast-bearing expression can replace an existing sort key without
200 /// invalidating the ordering.
201 ///
202 /// The substitution is only allowed when the cast wraps the very same child
203 /// expression that the original sort used and the casted type is a
204 /// widening/order-preserving conversion. Without those restrictions, a
205 /// narrowing cast could collapse distinct values and violate the existing
206 /// sort order.
207 fn substitute_cast_ordering(
208 r_expr: Arc<dyn PhysicalExpr>,
209 sort_expr: &PhysicalSortExpr,
210 expr_type: &DataType,
211 ) -> Option<PhysicalSortExpr> {
212 let cast_expr = r_expr.downcast_ref::<CastExpr>()?;
213
214 (cast_expr.expr().eq(&sort_expr.expr)
215 && CastExpr::check_bigger_cast(cast_expr.cast_type(), expr_type))
216 .then(|| PhysicalSortExpr::new(r_expr, sort_expr.options))
217 }
218
219 /// Creates an empty `EquivalenceProperties` object.
220 pub fn new(schema: SchemaRef) -> Self {
221 Self {
222 eq_group: EquivalenceGroup::default(),
223 oeq_class: OrderingEquivalenceClass::default(),
224 oeq_cache: OrderingEquivalenceCache::default(),
225 constraints: Constraints::default(),
226 schema,
227 }
228 }
229
230 /// Adds constraints to the properties.
231 pub fn set_constraints(&mut self, constraints: Constraints) {
232 self.constraints = constraints;
233 }
234
235 /// Adds constraints to the properties.
236 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
237 self.set_constraints(constraints);
238 self
239 }
240
241 /// Creates a new `EquivalenceProperties` object with the given orderings.
242 pub fn new_with_orderings(
243 schema: SchemaRef,
244 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
245 ) -> Self {
246 let eq_group = EquivalenceGroup::default();
247 let oeq_class = OrderingEquivalenceClass::new(orderings);
248 // Here, we can avoid performing a full normalization, and get by with
249 // only removing constants because the equivalence group is empty.
250 let normal_orderings = oeq_class.iter().cloned().map(|o| {
251 o.into_iter()
252 .filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
253 });
254 Self {
255 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
256 oeq_class,
257 eq_group,
258 constraints: Constraints::default(),
259 schema,
260 }
261 }
262
263 /// Returns the associated schema.
264 pub fn schema(&self) -> &SchemaRef {
265 &self.schema
266 }
267
268 /// Returns a reference to the ordering equivalence class within.
269 pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
270 &self.oeq_class
271 }
272
273 /// Returns a reference to the equivalence group within.
274 pub fn eq_group(&self) -> &EquivalenceGroup {
275 &self.eq_group
276 }
277
278 /// Returns a reference to the constraints within.
279 pub fn constraints(&self) -> &Constraints {
280 &self.constraints
281 }
282
283 /// Returns all the known constants expressions.
284 pub fn constants(&self) -> Vec<ConstExpr> {
285 self.eq_group
286 .iter()
287 .flat_map(|c| {
288 c.iter().filter_map(|expr| {
289 c.constant
290 .as_ref()
291 .map(|across| ConstExpr::new(Arc::clone(expr), across.clone()))
292 })
293 })
294 .collect()
295 }
296
297 /// Returns the output ordering of the properties.
298 pub fn output_ordering(&self) -> Option<LexOrdering> {
299 let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
300 self.normalize_sort_exprs(concat)
301 }
302
303 /// Extends this `EquivalenceProperties` with the `other` object.
304 pub fn extend(mut self, other: Self) -> Result<Self> {
305 self.constraints.extend(other.constraints);
306 self.add_equivalence_group(other.eq_group)?;
307 self.add_orderings(other.oeq_class);
308 Ok(self)
309 }
310
311 /// Clears (empties) the ordering equivalence class within this object.
312 /// Call this method when existing orderings are invalidated.
313 pub fn clear_orderings(&mut self) {
314 self.oeq_class.clear();
315 self.oeq_cache.clear();
316 }
317
318 /// Removes constant expressions that may change across partitions.
319 /// This method should be used when merging data from different partitions.
320 pub fn clear_per_partition_constants(&mut self) {
321 if self.eq_group.clear_per_partition_constants() {
322 // Renormalize orderings if the equivalence group changes:
323 let normal_orderings = self
324 .oeq_class
325 .iter()
326 .cloned()
327 .map(|o| self.eq_group.normalize_sort_exprs(o));
328 self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
329 }
330 }
331
332 /// Adds new orderings into the existing ordering equivalence class.
333 pub fn add_orderings(
334 &mut self,
335 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
336 ) {
337 let orderings: Vec<_> =
338 orderings.into_iter().filter_map(LexOrdering::new).collect();
339 let normal_orderings: Vec<_> = orderings
340 .iter()
341 .cloned()
342 .filter_map(|o| self.normalize_sort_exprs(o))
343 .collect();
344 if !normal_orderings.is_empty() {
345 self.oeq_class.extend(orderings);
346 // Normalize given orderings to update the cache:
347 self.oeq_cache.normal_cls.extend(normal_orderings);
348 // TODO: If no ordering is found to be redundant during extension, we
349 // can use a shortcut algorithm to update the leading map.
350 self.oeq_cache.update_map();
351 }
352 }
353
354 /// Adds a single ordering to the existing ordering equivalence class.
355 pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
356 self.add_orderings(std::iter::once(ordering));
357 }
358
359 fn update_oeq_cache(&mut self) -> Result<()> {
360 // Renormalize orderings if the equivalence group changes:
361 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
362 let normal_orderings = normal_cls
363 .into_iter()
364 .map(|o| self.eq_group.normalize_sort_exprs(o));
365 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
366 self.oeq_cache.update_map();
367 // Discover any new orderings based on the new equivalence classes:
368 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
369 for expr in leading_exprs {
370 self.discover_new_orderings(expr)?;
371 }
372 Ok(())
373 }
374
375 /// Incorporates the given equivalence group to into the existing
376 /// equivalence group within.
377 pub fn add_equivalence_group(
378 &mut self,
379 other_eq_group: EquivalenceGroup,
380 ) -> Result<()> {
381 if !other_eq_group.is_empty() {
382 self.eq_group.extend(other_eq_group);
383 self.update_oeq_cache()?;
384 }
385 Ok(())
386 }
387
388 /// Returns the ordering equivalence class within in normal form.
389 /// Normalization standardizes expressions according to the equivalence
390 /// group within, and removes constants/duplicates.
391 pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
392 self.oeq_class
393 .iter()
394 .cloned()
395 .filter_map(|ordering| self.normalize_sort_exprs(ordering))
396 .collect::<Vec<_>>()
397 .into()
398 }
399
400 /// Adds a new equality condition into the existing equivalence group.
401 /// If the given equality defines a new equivalence class, adds this new
402 /// equivalence class to the equivalence group.
403 pub fn add_equal_conditions(
404 &mut self,
405 left: Arc<dyn PhysicalExpr>,
406 right: Arc<dyn PhysicalExpr>,
407 ) -> Result<()> {
408 // Add equal expressions to the state:
409 if self.eq_group.add_equal_conditions(left, right) {
410 self.update_oeq_cache()?;
411 }
412 self.update_oeq_cache()?;
413 Ok(())
414 }
415
416 /// Track/register physical expressions with constant values.
417 pub fn add_constants(
418 &mut self,
419 constants: impl IntoIterator<Item = ConstExpr>,
420 ) -> Result<()> {
421 // Add the new constant to the equivalence group:
422 for constant in constants {
423 self.eq_group.add_constant(constant);
424 }
425 // Renormalize the orderings after adding new constants by removing
426 // the constants from existing orderings:
427 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
428 let normal_orderings = normal_cls.into_iter().map(|ordering| {
429 ordering.into_iter().filter(|sort_expr| {
430 self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
431 })
432 });
433 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
434 self.oeq_cache.update_map();
435 // Discover any new orderings based on the constants:
436 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
437 for expr in leading_exprs {
438 self.discover_new_orderings(expr)?;
439 }
440 Ok(())
441 }
442
443 /// Discover new valid orderings in light of a new equality. Accepts a single
444 /// argument (`expr`) which is used to determine the orderings to update.
445 /// When constants or equivalence classes change, there may be new orderings
446 /// that can be discovered with the new equivalence properties.
447 /// For a discussion, see: <https://github.com/apache/datafusion/issues/9812>
448 fn discover_new_orderings(
449 &mut self,
450 normal_expr: Arc<dyn PhysicalExpr>,
451 ) -> Result<()> {
452 let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
453 return Ok(());
454 };
455 let eq_class = self
456 .eq_group
457 .get_equivalence_class(&normal_expr)
458 .map_or_else(|| vec![normal_expr], |class| class.clone().into());
459
460 let mut new_orderings = vec![];
461 for idx in ordering_idxs {
462 let ordering = &self.oeq_cache.normal_cls[*idx];
463 let leading_ordering_options = ordering[0].options;
464
465 'exprs: for equivalent_expr in &eq_class {
466 let children = equivalent_expr.children();
467 if children.is_empty() {
468 continue;
469 }
470 // Check if all children match the next expressions in the ordering:
471 let mut child_properties = vec![];
472 // Build properties for each child based on the next expression:
473 for (i, child) in children.into_iter().enumerate() {
474 let Some(next) = ordering.get(i + 1) else {
475 break 'exprs;
476 };
477 if !next.expr.eq(child) {
478 break 'exprs;
479 }
480 let data_type = child.data_type(&self.schema)?;
481 child_properties.push(ExprProperties {
482 sort_properties: SortProperties::Ordered(next.options),
483 range: Interval::make_unbounded(&data_type)?,
484 preserves_lex_ordering: true,
485 });
486 }
487 // Check if the expression is monotonic in all arguments:
488 let expr_properties =
489 equivalent_expr.get_properties(&child_properties)?;
490 if expr_properties.preserves_lex_ordering
491 && expr_properties.sort_properties
492 == SortProperties::Ordered(leading_ordering_options)
493 {
494 // Assume that `[c ASC, a ASC, b ASC]` is among existing
495 // orderings. If equality `c = f(a, b)` is given, ordering
496 // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
497 // ordering `[a ASC, b ASC]` is also a valid ordering.
498 new_orderings.push(ordering[1..].to_vec());
499 break;
500 }
501 }
502 }
503
504 if !new_orderings.is_empty() {
505 self.add_orderings(new_orderings);
506 }
507 Ok(())
508 }
509
510 /// Updates the ordering equivalence class within assuming that the table
511 /// is re-sorted according to the argument `ordering`, and returns whether
512 /// this operation resulted in any change. Note that equivalence classes
513 /// (and constants) do not change as they are unaffected by a re-sort. If
514 /// the given ordering is already satisfied, the function does nothing.
515 pub fn reorder(
516 &mut self,
517 ordering: impl IntoIterator<Item = PhysicalSortExpr>,
518 ) -> Result<bool> {
519 let (ordering, ordering_tee) = ordering.into_iter().tee();
520 // First, standardize the given ordering:
521 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
522 // If the ordering vanishes after normalization, it is satisfied:
523 return Ok(false);
524 };
525 if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
526 // If the ordering is unsatisfied, replace existing orderings:
527 self.clear_orderings();
528 self.add_ordering(ordering_tee);
529 return Ok(true);
530 }
531 Ok(false)
532 }
533
534 /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
535 /// equivalence group within. Returns a `LexOrdering` instance if the
536 /// expressions define a proper lexicographical ordering. For more details,
537 /// see [`EquivalenceGroup::normalize_sort_exprs`].
538 pub fn normalize_sort_exprs(
539 &self,
540 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
541 ) -> Option<LexOrdering> {
542 LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
543 }
544
545 /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
546 /// equivalence group within. Returns a `LexRequirement` instance if the
547 /// expressions define a proper lexicographical requirement. For more
548 /// details, see [`EquivalenceGroup::normalize_sort_exprs`].
549 pub fn normalize_sort_requirements(
550 &self,
551 sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
552 ) -> Option<LexRequirement> {
553 LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
554 }
555
556 /// Iteratively checks whether the given ordering is satisfied by any of
557 /// the existing orderings. See [`Self::ordering_satisfy_requirement`] for
558 /// more details and examples.
559 pub fn ordering_satisfy(
560 &self,
561 given: impl IntoIterator<Item = PhysicalSortExpr>,
562 ) -> Result<bool> {
563 // First, standardize the given ordering:
564 let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
565 // If the ordering vanishes after normalization, it is satisfied:
566 return Ok(true);
567 };
568 Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
569 }
570
571 /// Iteratively checks whether the given sort requirement is satisfied by
572 /// any of the existing orderings.
573 ///
574 /// ### Example Scenarios
575 ///
576 /// In these scenarios, assume that all expressions share the same sort
577 /// properties.
578 ///
579 /// #### Case 1: Sort Requirement `[a, c]`
580 ///
581 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
582 /// 1. The function first checks the leading requirement `a`, which is
583 /// satisfied by `[a, b, c].first()`.
584 /// 2. `a` is added as a constant for the next iteration.
585 /// 3. Normal orderings become `[[b, c], [d]]`.
586 /// 4. The function fails for `c` in the second iteration, as neither
587 /// `[b, c]` nor `[d]` satisfies `c`.
588 ///
589 /// #### Case 2: Sort Requirement `[a, d]`
590 ///
591 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
592 /// 1. The function first checks the leading requirement `a`, which is
593 /// satisfied by `[a, b, c].first()`.
594 /// 2. `a` is added as a constant for the next iteration.
595 /// 3. Normal orderings become `[[b, c], [d]]`.
596 /// 4. The function returns `true` as `[d]` satisfies `d`.
597 pub fn ordering_satisfy_requirement(
598 &self,
599 given: impl IntoIterator<Item = PhysicalSortRequirement>,
600 ) -> Result<bool> {
601 // First, standardize the given requirement:
602 let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
603 // If the requirement vanishes after normalization, it is satisfied:
604 return Ok(true);
605 };
606 // Then, check whether given requirement is satisfied by constraints:
607 if self.satisfied_by_constraints(&normal_reqs) {
608 return Ok(true);
609 }
610 let schema = self.schema();
611 let mut eq_properties = self.clone();
612 for element in normal_reqs {
613 // Check whether given requirement is satisfied:
614 let ExprProperties {
615 sort_properties, ..
616 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
617 let satisfy = match sort_properties {
618 SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
619 let nullable = element.expr.nullable(schema).unwrap_or(true);
620 options_compatible(&options, &opts, nullable)
621 }),
622 // Singleton expressions satisfy any requirement.
623 SortProperties::Singleton => true,
624 SortProperties::Unordered => false,
625 };
626 if !satisfy {
627 return Ok(false);
628 }
629 // Treat satisfied keys as constants in subsequent iterations. We
630 // can do this because the "next" key only matters in a lexicographical
631 // ordering when the keys to its left have the same values.
632 //
633 // Note that these expressions are not properly "constants". This is just
634 // an implementation strategy confined to this function.
635 //
636 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
637 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
638 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
639 // we add column `a` as constant to the algorithm state. This enables us
640 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
641 let const_expr = ConstExpr::from(element.expr);
642 eq_properties.add_constants(std::iter::once(const_expr))?;
643 }
644 Ok(true)
645 }
646
647 /// Returns the number of consecutive sort expressions (starting from the
648 /// left) that are satisfied by the existing ordering.
649 fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
650 let full_length = normal_ordering.len();
651 // Check whether the given ordering is satisfied by constraints:
652 if self.satisfied_by_constraints_ordering(normal_ordering) {
653 // If constraints satisfy all sort expressions, return the full
654 // length:
655 return Ok(full_length);
656 }
657 let schema = self.schema();
658 let mut eq_properties = self.clone();
659 for (idx, element) in normal_ordering.into_iter().enumerate() {
660 // Check whether given ordering is satisfied:
661 let ExprProperties {
662 sort_properties, ..
663 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
664 let satisfy = match sort_properties {
665 SortProperties::Ordered(options) => options_compatible(
666 &options,
667 &element.options,
668 element.expr.nullable(schema).unwrap_or(true),
669 ),
670 // Singleton expressions satisfy any ordering.
671 SortProperties::Singleton => true,
672 SortProperties::Unordered => false,
673 };
674 if !satisfy {
675 // As soon as one sort expression is unsatisfied, return how
676 // many we've satisfied so far:
677 return Ok(idx);
678 }
679 // Treat satisfied keys as constants in subsequent iterations. We
680 // can do this because the "next" key only matters in a lexicographical
681 // ordering when the keys to its left have the same values.
682 //
683 // Note that these expressions are not properly "constants". This is just
684 // an implementation strategy confined to this function.
685 //
686 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
687 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
688 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
689 // we add column `a` as constant to the algorithm state. This enables us
690 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
691 let const_expr = ConstExpr::from(Arc::clone(&element.expr));
692 eq_properties.add_constants(std::iter::once(const_expr))?
693 }
694 // All sort expressions are satisfied, return full length:
695 Ok(full_length)
696 }
697
698 /// Determines the longest normal prefix of `ordering` satisfied by the
699 /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
700 /// boolean indicating whether all the sort expressions are satisfied.
701 pub fn extract_common_sort_prefix(
702 &self,
703 ordering: LexOrdering,
704 ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
705 // First, standardize the given ordering:
706 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
707 // If the ordering vanishes after normalization, it is satisfied:
708 return Ok((vec![], true));
709 };
710 let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
711 let flag = prefix_len == normal_ordering.len();
712 let mut sort_exprs: Vec<_> = normal_ordering.into();
713 if !flag {
714 sort_exprs.truncate(prefix_len);
715 }
716 Ok((sort_exprs, flag))
717 }
718
719 /// Checks if the sort expressions are satisfied by any of the table
720 /// constraints (primary key or unique). Returns true if any constraint
721 /// fully satisfies the expressions (i.e. constraint indices form a valid
722 /// prefix of an existing ordering that matches the expressions). For
723 /// unique constraints, also verifies nullable columns.
724 fn satisfied_by_constraints_ordering(
725 &self,
726 normal_exprs: &[PhysicalSortExpr],
727 ) -> bool {
728 self.constraints.iter().any(|constraint| match constraint {
729 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
730 let check_null = matches!(constraint, Constraint::Unique(_));
731 let normalized_size = normal_exprs.len();
732 indices.len() <= normalized_size
733 && self.oeq_class.iter().any(|ordering| {
734 let length = ordering.len();
735 if indices.len() > length || normalized_size < length {
736 return false;
737 }
738 // Build a map of column positions in the ordering:
739 let mut col_positions = HashMap::with_capacity(length);
740 for (pos, req) in ordering.iter().enumerate() {
741 if let Some(col) = req.expr.downcast_ref::<Column>() {
742 let nullable = col.nullable(&self.schema).unwrap_or(true);
743 col_positions.insert(col.index(), (pos, nullable));
744 }
745 }
746 // Check if all constraint indices appear in valid positions:
747 if !indices.iter().all(|idx| {
748 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
749 // For unique constraints, verify column is not nullable if it's first/last:
750 !check_null
751 || !nullable
752 || (pos != 0 && pos != length - 1)
753 })
754 }) {
755 return false;
756 }
757 // Check if this ordering matches the prefix:
758 normal_exprs.iter().zip(ordering).all(|(given, existing)| {
759 existing.satisfy_expr(given, &self.schema)
760 })
761 })
762 }
763 })
764 }
765
766 /// Checks if the sort requirements are satisfied by any of the table
767 /// constraints (primary key or unique). Returns true if any constraint
768 /// fully satisfies the requirements (i.e. constraint indices form a valid
769 /// prefix of an existing ordering that matches the requirements). For
770 /// unique constraints, also verifies nullable columns.
771 fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
772 self.constraints.iter().any(|constraint| match constraint {
773 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
774 let check_null = matches!(constraint, Constraint::Unique(_));
775 let normalized_size = normal_reqs.len();
776 indices.len() <= normalized_size
777 && self.oeq_class.iter().any(|ordering| {
778 let length = ordering.len();
779 if indices.len() > length || normalized_size < length {
780 return false;
781 }
782 // Build a map of column positions in the ordering:
783 let mut col_positions = HashMap::with_capacity(length);
784 for (pos, req) in ordering.iter().enumerate() {
785 if let Some(col) = req.expr.downcast_ref::<Column>() {
786 let nullable = col.nullable(&self.schema).unwrap_or(true);
787 col_positions.insert(col.index(), (pos, nullable));
788 }
789 }
790 // Check if all constraint indices appear in valid positions:
791 if !indices.iter().all(|idx| {
792 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
793 // For unique constraints, verify column is not nullable if it's first/last:
794 !check_null
795 || !nullable
796 || (pos != 0 && pos != length - 1)
797 })
798 }) {
799 return false;
800 }
801 // Check if this ordering matches the prefix:
802 normal_reqs.iter().zip(ordering).all(|(given, existing)| {
803 existing.satisfy(given, &self.schema)
804 })
805 })
806 }
807 })
808 }
809
810 /// Checks whether the `given` sort requirements are equal or more specific
811 /// than the `reference` sort requirements.
812 pub fn requirements_compatible(
813 &self,
814 given: LexRequirement,
815 reference: LexRequirement,
816 ) -> bool {
817 let Some(normal_given) = self.normalize_sort_requirements(given) else {
818 return true;
819 };
820 let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
821 return true;
822 };
823
824 (normal_reference.len() <= normal_given.len())
825 && normal_reference
826 .into_iter()
827 .zip(normal_given)
828 .all(|(reference, given)| given.compatible(&reference))
829 }
830
831 /// Modify existing orderings by substituting sort expressions with appropriate
832 /// targets from the projection mapping. We substitute a sort expression when
833 /// its physical expression has a one-to-one functional relationship with a
834 /// target expression in the mapping.
835 ///
836 /// After substitution, we may generate more than one `LexOrdering` for each
837 /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
838 /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying projection
839 /// expressions `a, b, CAST(a)`.
840 ///
841 /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is
842 /// sorted, `atan(x + 1000)` should also be substituted. For now, we
843 /// only consider single-column `CAST` expressions.
844 fn substitute_oeq_class(
845 schema: &SchemaRef,
846 mapping: &ProjectionMapping,
847 oeq_class: OrderingEquivalenceClass,
848 ) -> OrderingEquivalenceClass {
849 let new_orderings = oeq_class.into_iter().flat_map(|order| {
850 // Modify/expand existing orderings by substituting sort
851 // expressions with appropriate targets from the mapping:
852 order
853 .into_iter()
854 .map(|sort_expr| {
855 // The sort expression comes from this schema, so the
856 // following call to `unwrap` is safe.
857 let expr_type = sort_expr.expr.data_type(schema).unwrap();
858 let original_sort_expr = sort_expr.clone();
859 // TODO: Add one-to-one analysis for ScalarFunctions.
860 mapping
861 .iter()
862 .map(|(source, _target)| source)
863 .filter(|source| expr_refers(source, &original_sort_expr.expr))
864 .cloned()
865 .filter_map(|r_expr| {
866 Self::substitute_cast_ordering(
867 r_expr,
868 &original_sort_expr,
869 &expr_type,
870 )
871 })
872 .chain(std::iter::once(sort_expr))
873 .collect::<Vec<_>>()
874 })
875 // Generate all valid orderings given substituted expressions:
876 .multi_cartesian_product()
877 });
878 OrderingEquivalenceClass::new(new_orderings)
879 }
880
881 /// Projects argument `expr` according to the projection described by
882 /// `mapping`, taking equivalences into account.
883 ///
884 /// For example, assume that columns `a` and `c` are always equal, and that
885 /// the projection described by `mapping` encodes the following:
886 ///
887 /// ```text
888 /// a -> a1
889 /// b -> b1
890 /// ```
891 ///
892 /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
893 /// `Some(a1 + b1)` and `d` to `None`, meaning that it is not projectable.
894 pub fn project_expr(
895 &self,
896 expr: &Arc<dyn PhysicalExpr>,
897 mapping: &ProjectionMapping,
898 ) -> Option<Arc<dyn PhysicalExpr>> {
899 self.eq_group.project_expr(mapping, expr)
900 }
901
902 /// Projects the given `expressions` according to the projection described
903 /// by `mapping`, taking equivalences into account. This function is similar
904 /// to [`Self::project_expr`], but projects multiple expressions at once
905 /// more efficiently than calling `project_expr` for each expression.
906 pub fn project_expressions<'a>(
907 &'a self,
908 expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
909 mapping: &'a ProjectionMapping,
910 ) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
911 self.eq_group.project_expressions(mapping, expressions)
912 }
913
914 /// Constructs a dependency map based on existing orderings referred to in
915 /// the projection.
916 ///
917 /// This function analyzes the orderings in the normalized order-equivalence
918 /// class and builds a dependency map. The dependency map captures relationships
919 /// between expressions within the orderings, helping to identify dependencies
920 /// and construct valid projected orderings during projection operations.
921 ///
922 /// # Parameters
923 ///
924 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
925 /// relationship between source and target expressions.
926 ///
927 /// # Returns
928 ///
929 /// A [`DependencyMap`] representing the dependency map, where each
930 /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
931 ///
932 /// # Example
933 ///
934 /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
935 /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
936 /// Then, the dependency map will be:
937 ///
938 /// ```text
939 /// a ASC: Node {Some(a_new ASC), HashSet{}}
940 /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
941 /// c ASC: Node {None, HashSet{a ASC}}
942 /// ```
943 fn construct_dependency_map(
944 &self,
945 oeq_class: OrderingEquivalenceClass,
946 mapping: &ProjectionMapping,
947 ) -> DependencyMap {
948 let mut map = DependencyMap::default();
949 for ordering in oeq_class.into_iter() {
950 // Previous expression is a dependency. Note that there is no
951 // dependency for the leading expression.
952 if !self.insert_to_dependency_map(
953 mapping,
954 ordering[0].clone(),
955 None,
956 &mut map,
957 ) {
958 continue;
959 }
960 for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
961 if !self.insert_to_dependency_map(
962 mapping,
963 sort_expr,
964 Some(dependency),
965 &mut map,
966 ) {
967 // If we can't project, stop constructing the dependency map
968 // as remaining dependencies will be invalid post projection.
969 break;
970 }
971 }
972 }
973 map
974 }
975
976 /// Projects the sort expression according to the projection mapping and
977 /// inserts it into the dependency map with the given dependency. Returns
978 /// a boolean flag indicating whether the given expression is projectable.
979 fn insert_to_dependency_map(
980 &self,
981 mapping: &ProjectionMapping,
982 sort_expr: PhysicalSortExpr,
983 dependency: Option<PhysicalSortExpr>,
984 map: &mut DependencyMap,
985 ) -> bool {
986 let target_sort_expr = self
987 .project_expr(&sort_expr.expr, mapping)
988 .map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
989 let projectable = target_sort_expr.is_some();
990 if projectable
991 || mapping
992 .iter()
993 .any(|(source, _)| expr_refers(source, &sort_expr.expr))
994 {
995 // Add sort expressions that can be projected or referred to
996 // by any of the projection expressions to the dependency map:
997 map.insert(sort_expr, target_sort_expr, dependency);
998 }
999 projectable
1000 }
1001
1002 /// Returns a new `ProjectionMapping` where source expressions are in normal
1003 /// form. Normalization ensures that source expressions are transformed into
1004 /// a consistent representation, which is beneficial for algorithms that rely
1005 /// on exact equalities, as it allows for more precise and reliable comparisons.
1006 ///
1007 /// # Parameters
1008 ///
1009 /// - `mapping`: A reference to the original `ProjectionMapping` to normalize.
1010 ///
1011 /// # Returns
1012 ///
1013 /// A new `ProjectionMapping` with source expressions in normal form.
1014 fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
1015 mapping
1016 .iter()
1017 .map(|(source, target)| {
1018 let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
1019 (normal_source, target.clone())
1020 })
1021 .collect()
1022 }
1023
1024 /// Computes projected orderings based on a given projection mapping.
1025 ///
1026 /// This function takes a `ProjectionMapping` and computes the possible
1027 /// orderings for the projected expressions. It considers dependencies
1028 /// between expressions and generates valid orderings according to the
1029 /// specified sort properties.
1030 ///
1031 /// # Parameters
1032 ///
1033 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
1034 /// relationship between source and target expressions.
1035 /// - `oeq_class`: The `OrderingEquivalenceClass` containing the orderings
1036 /// to project.
1037 ///
1038 /// # Returns
1039 ///
1040 /// A vector of all valid (but not in normal form) orderings after projection.
1041 fn projected_orderings(
1042 &self,
1043 mapping: &ProjectionMapping,
1044 mut oeq_class: OrderingEquivalenceClass,
1045 ) -> Vec<LexOrdering> {
1046 // Normalize source expressions in the mapping:
1047 let mapping = self.normalize_mapping(mapping);
1048 // Get dependency map for existing orderings:
1049 oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
1050 let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
1051 let orderings = mapping.iter().flat_map(|(source, targets)| {
1052 referred_dependencies(&dependency_map, source)
1053 .into_iter()
1054 .filter_map(|deps| {
1055 let ep = get_expr_properties(source, &deps, &self.schema);
1056 let sort_properties = ep.map(|prop| prop.sort_properties);
1057 if let Ok(SortProperties::Ordered(options)) = sort_properties {
1058 Some((options, deps))
1059 } else {
1060 // Do not consider unordered cases.
1061 None
1062 }
1063 })
1064 .flat_map(|(options, relevant_deps)| {
1065 // Generate dependent orderings (i.e. prefixes for targets):
1066 let dependency_orderings =
1067 generate_dependency_orderings(&relevant_deps, &dependency_map);
1068 let sort_exprs = targets.iter().map(|(target, _)| {
1069 PhysicalSortExpr::new(Arc::clone(target), options)
1070 });
1071 if dependency_orderings.is_empty() {
1072 sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
1073 } else {
1074 sort_exprs
1075 .flat_map(|sort_expr| {
1076 let mut result = dependency_orderings.clone();
1077 for ordering in result.iter_mut() {
1078 ordering.push(sort_expr.clone());
1079 }
1080 result
1081 })
1082 .collect::<Vec<_>>()
1083 }
1084 })
1085 });
1086
1087 // Add valid projected orderings. For example, if existing ordering is
1088 // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1089 // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1090 // `b_new` themselves need not be ordered. Such dependencies cannot be
1091 // deduced via the pass above.
1092 let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1093 let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1094 if prefixes.is_empty() {
1095 // If prefix is empty, there is no dependency. Insert
1096 // empty ordering:
1097 if let Some(target) = &node.target {
1098 prefixes.push([target.clone()].into());
1099 }
1100 } else {
1101 // Append current ordering on top its dependencies:
1102 for ordering in prefixes.iter_mut() {
1103 if let Some(target) = &node.target {
1104 ordering.push(target.clone());
1105 }
1106 }
1107 }
1108 prefixes
1109 });
1110
1111 // Simplify each ordering by removing redundant sections:
1112 orderings.chain(projected_orderings).collect()
1113 }
1114
1115 /// Projects constraints according to the given projection mapping.
1116 ///
1117 /// This function takes a projection mapping and extracts column indices of
1118 /// target columns. It then projects the constraints to only include
1119 /// relationships between columns that exist in the projected output.
1120 ///
1121 /// # Parameters
1122 ///
1123 /// * `mapping` - A reference to the `ProjectionMapping` that defines the
1124 /// projection operation.
1125 ///
1126 /// # Returns
1127 ///
1128 /// Returns an optional `Constraints` object containing only the constraints
1129 /// that are valid for the projected columns (if any exists).
1130 fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1131 let indices = mapping
1132 .iter()
1133 .flat_map(|(_, targets)| {
1134 targets.iter().flat_map(|(target, _)| {
1135 target.downcast_ref::<Column>().map(|c| c.index())
1136 })
1137 })
1138 .collect::<Vec<_>>();
1139 self.constraints.project(&indices)
1140 }
1141
1142 /// Projects the equivalences within according to `mapping` and
1143 /// `output_schema`.
1144 pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1145 let eq_group = self.eq_group.project(mapping);
1146 let orderings =
1147 self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
1148 let normal_orderings = orderings
1149 .iter()
1150 .cloned()
1151 .map(|o| eq_group.normalize_sort_exprs(o));
1152 Self {
1153 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
1154 oeq_class: OrderingEquivalenceClass::new(orderings),
1155 constraints: self.projected_constraints(mapping).unwrap_or_default(),
1156 schema: output_schema,
1157 eq_group,
1158 }
1159 }
1160
1161 /// Returns the longest (potentially partial) permutation satisfying the
1162 /// existing ordering. For example, if we have the equivalent orderings
1163 /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1164 /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1165 /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1166 /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1167 /// inside the argument `exprs` (respectively). For the mathematical
1168 /// definition of "partial permutation", see:
1169 ///
1170 /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1171 pub fn find_longest_permutation(
1172 &self,
1173 exprs: &[Arc<dyn PhysicalExpr>],
1174 ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
1175 let mut eq_properties = self.clone();
1176 let mut result = vec![];
1177 // The algorithm is as follows:
1178 // - Iterate over all the expressions and insert ordered expressions
1179 // into the result.
1180 // - Treat inserted expressions as constants (i.e. add them as constants
1181 // to the state).
1182 // - Continue the above procedure until no expression is inserted; i.e.
1183 // the algorithm reaches a fixed point.
1184 // This algorithm should reach a fixed point in at most `exprs.len()`
1185 // iterations.
1186 let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1187 for _ in 0..exprs.len() {
1188 // Get ordered expressions with their indices.
1189 let ordered_exprs = search_indices
1190 .iter()
1191 .filter_map(|&idx| {
1192 let ExprProperties {
1193 sort_properties, ..
1194 } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1195 match sort_properties {
1196 SortProperties::Ordered(options) => {
1197 let expr = Arc::clone(&exprs[idx]);
1198 Some((PhysicalSortExpr::new(expr, options), idx))
1199 }
1200 SortProperties::Singleton => {
1201 // Assign default ordering to constant expressions:
1202 let expr = Arc::clone(&exprs[idx]);
1203 Some((PhysicalSortExpr::new_default(expr), idx))
1204 }
1205 SortProperties::Unordered => None,
1206 }
1207 })
1208 .collect::<Vec<_>>();
1209 // We reached a fixed point, exit.
1210 if ordered_exprs.is_empty() {
1211 break;
1212 }
1213 // Remove indices that have an ordering from `search_indices`, and
1214 // treat ordered expressions as constants in subsequent iterations.
1215 // We can do this because the "next" key only matters in a lexicographical
1216 // ordering when the keys to its left have the same values.
1217 //
1218 // Note that these expressions are not properly "constants". This is just
1219 // an implementation strategy confined to this function.
1220 for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1221 let const_expr = ConstExpr::from(Arc::clone(expr));
1222 eq_properties.add_constants(std::iter::once(const_expr))?;
1223 search_indices.shift_remove(idx);
1224 }
1225 // Add new ordered section to the state.
1226 result.extend(ordered_exprs);
1227 }
1228 Ok(result.into_iter().unzip())
1229 }
1230
1231 /// This function determines whether the provided expression is constant
1232 /// based on the known constants. For example, if columns `a` and `b` are
1233 /// constant, then expressions `a`, `b` and `a + b` will all return `true`
1234 /// whereas expression `c` will return `false`.
1235 ///
1236 /// # Parameters
1237 ///
1238 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1239 /// expression to be checked.
1240 ///
1241 /// # Returns
1242 ///
1243 /// Returns a `Some` value if the expression is constant according to
1244 /// equivalence group, and `None` otherwise. The `Some` variant contains
1245 /// an `AcrossPartitions` value indicating whether the expression is
1246 /// constant across partitions, and its actual value (if available).
1247 pub fn is_expr_constant(
1248 &self,
1249 expr: &Arc<dyn PhysicalExpr>,
1250 ) -> Option<AcrossPartitions> {
1251 self.eq_group.is_expr_constant(expr)
1252 }
1253
1254 /// Retrieves the properties for a given physical expression.
1255 ///
1256 /// This function constructs an [`ExprProperties`] object for the given
1257 /// expression, which encapsulates information about the expression's
1258 /// properties, including its [`SortProperties`] and [`Interval`].
1259 ///
1260 /// # Parameters
1261 ///
1262 /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1263 /// for which ordering information is sought.
1264 ///
1265 /// # Returns
1266 ///
1267 /// Returns an [`ExprProperties`] object containing the ordering and range
1268 /// information for the given expression.
1269 pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1270 ExprPropertiesNode::new_unknown(expr)
1271 .transform_up(|expr| update_properties(expr, self))
1272 .data()
1273 .map(|node| node.data)
1274 .unwrap_or_else(|_| ExprProperties::new_unknown())
1275 }
1276
1277 /// Transforms this `EquivalenceProperties` by mapping columns in the
1278 /// original schema to columns in the new schema by index.
1279 pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
1280 // The new schema and the original schema is aligned when they have the
1281 // same number of columns, and fields at the same index have the same
1282 // type in both schemas.
1283 let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1284 && self
1285 .schema
1286 .fields
1287 .iter()
1288 .zip(schema.fields.iter())
1289 .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1290 if !schemas_aligned {
1291 // Rewriting equivalence properties in terms of new schema is not
1292 // safe when schemas are not aligned:
1293 return plan_err!(
1294 "Schemas have to be aligned to rewrite equivalences:\n Old schema: {}\n New schema: {}",
1295 self.schema,
1296 schema
1297 );
1298 }
1299
1300 // Rewrite equivalence classes according to the new schema:
1301 let mut eq_classes = vec![];
1302 for mut eq_class in self.eq_group {
1303 // Rewrite the expressions in the equivalence class:
1304 eq_class.exprs = eq_class
1305 .exprs
1306 .into_iter()
1307 .map(|expr| with_new_schema(expr, &schema))
1308 .collect::<Result<_>>()?;
1309 // Rewrite the constant value (if available and known):
1310 let data_type = eq_class
1311 .canonical_expr()
1312 .map(|e| e.data_type(&schema))
1313 .transpose()?;
1314 if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
1315 (data_type, &mut eq_class.constant)
1316 {
1317 *value = value.cast_to(&data_type)?;
1318 }
1319 eq_classes.push(eq_class);
1320 }
1321 self.eq_group = eq_classes.into();
1322
1323 // Rewrite orderings according to new schema:
1324 self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
1325 self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
1326
1327 // Update the schema:
1328 self.schema = schema;
1329
1330 Ok(self)
1331 }
1332}
1333
1334impl From<EquivalenceProperties> for OrderingEquivalenceClass {
1335 fn from(eq_properties: EquivalenceProperties) -> Self {
1336 eq_properties.oeq_class
1337 }
1338}
1339
1340/// More readable display version of the `EquivalenceProperties`.
1341///
1342/// Format:
1343/// ```text
1344/// order: [[b@1 ASC NULLS LAST]], eq: [{members: [a@0], constant: (heterogeneous)}]
1345/// ```
1346impl Display for EquivalenceProperties {
1347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1348 let empty_eq_group = self.eq_group.is_empty();
1349 let empty_oeq_class = self.oeq_class.is_empty();
1350 if empty_oeq_class && empty_eq_group {
1351 write!(f, "No properties")?;
1352 } else if !empty_oeq_class {
1353 write!(f, "order: {}", self.oeq_class)?;
1354 if !empty_eq_group {
1355 write!(f, ", eq: {}", self.eq_group)?;
1356 }
1357 } else {
1358 write!(f, "eq: {}", self.eq_group)?;
1359 }
1360 Ok(())
1361 }
1362}
1363
1364/// Calculates the properties of a given [`ExprPropertiesNode`].
1365///
1366/// Order information can be retrieved as:
1367/// - If it is a leaf node, we directly find the order of the node by looking
1368/// at the given sort expression and equivalence properties if it is a `Column`
1369/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1370/// it as singleton so that it can cooperate with all ordered columns.
1371/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1372/// and operator has its own rules on how to propagate the children orderings.
1373/// However, before we engage in recursion, we check whether this intermediate
1374/// node directly matches with the sort expression. If there is a match, the
1375/// sort expression emerges at that node immediately, discarding the recursive
1376/// result coming from its children.
1377///
1378/// Range information is calculated as:
1379/// - If it is a `Literal` node, we set the range as a point value. If it is a
1380/// `Column` node, we set the datatype of the range, but cannot give an interval
1381/// for the range, yet.
1382/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1383/// and operator has its own rules on how to propagate the children range.
1384fn update_properties(
1385 mut node: ExprPropertiesNode,
1386 eq_properties: &EquivalenceProperties,
1387) -> Result<Transformed<ExprPropertiesNode>> {
1388 // First, try to gather the information from the children:
1389 if !node.expr.children().is_empty() {
1390 // We have an intermediate (non-leaf) node, account for its children:
1391 let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1392 node.data = node.expr.get_properties(&children_props)?;
1393 } else if node.expr.is::<Literal>() {
1394 // We have a Literal, which is one of the two possible leaf node types:
1395 node.data = node.expr.get_properties(&[])?;
1396 } else if node.expr.is::<Column>() {
1397 // We have a Column, which is the other possible leaf node type:
1398 node.data.range =
1399 Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1400 }
1401 // Now, check what we know about orderings:
1402 let normal_expr = eq_properties
1403 .eq_group
1404 .normalize_expr(Arc::clone(&node.expr));
1405 let oeq_class = &eq_properties.oeq_cache.normal_cls;
1406 if eq_properties.is_expr_constant(&normal_expr).is_some()
1407 || oeq_class.is_expr_partial_const(&normal_expr)
1408 {
1409 node.data.sort_properties = SortProperties::Singleton;
1410 } else if let Some(options) = oeq_class.get_options(&normal_expr) {
1411 node.data.sort_properties = SortProperties::Ordered(options);
1412 }
1413 Ok(Transformed::yes(node))
1414}
1415
1416/// This function examines whether a referring expression directly refers to a
1417/// given referred expression or if any of its children in the expression tree
1418/// refer to the specified expression.
1419///
1420/// # Parameters
1421///
1422/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1423/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1424///
1425/// # Returns
1426///
1427/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1428/// `referred_expr` or not.
1429fn expr_refers(
1430 referring_expr: &Arc<dyn PhysicalExpr>,
1431 referred_expr: &Arc<dyn PhysicalExpr>,
1432) -> bool {
1433 referring_expr.eq(referred_expr)
1434 || referring_expr
1435 .children()
1436 .iter()
1437 .any(|child| expr_refers(child, referred_expr))
1438}
1439
1440/// This function examines the given expression and its properties to determine
1441/// the ordering properties of the expression. The range knowledge is not utilized
1442/// yet in the scope of this function.
1443///
1444/// # Parameters
1445///
1446/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1447/// which ordering properties need to be determined.
1448/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1449/// referred to by `expr`.
1450/// - `schema``: A reference to the schema which the `expr` columns refer.
1451///
1452/// # Returns
1453///
1454/// A `SortProperties` indicating the ordering information of the given expression.
1455fn get_expr_properties(
1456 expr: &Arc<dyn PhysicalExpr>,
1457 dependencies: &Dependencies,
1458 schema: &SchemaRef,
1459) -> Result<ExprProperties> {
1460 if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1461 // If exact match is found, return its ordering.
1462 Ok(ExprProperties {
1463 sort_properties: SortProperties::Ordered(column_order.options),
1464 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1465 preserves_lex_ordering: false,
1466 })
1467 } else if expr.downcast_ref::<Column>().is_some() {
1468 Ok(ExprProperties {
1469 sort_properties: SortProperties::Unordered,
1470 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1471 preserves_lex_ordering: false,
1472 })
1473 } else if let Some(literal) = expr.downcast_ref::<Literal>() {
1474 Ok(ExprProperties {
1475 sort_properties: SortProperties::Singleton,
1476 range: literal.value().into(),
1477 preserves_lex_ordering: true,
1478 })
1479 } else {
1480 // Find orderings of its children
1481 let child_states = expr
1482 .children()
1483 .iter()
1484 .map(|child| get_expr_properties(child, dependencies, schema))
1485 .collect::<Result<Vec<_>>>()?;
1486 // Calculate expression ordering using ordering of its children.
1487 expr.get_properties(&child_states)
1488 }
1489}