datafusion_physical_expr/equivalence/properties/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22pub use joins::*;
23pub use union::*;
24
25use std::fmt::{self, Display};
26use std::mem;
27use std::sync::Arc;
28
29use self::dependency::{
30 construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
31 Dependencies, DependencyMap,
32};
33use crate::equivalence::{
34 AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
35};
36use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
37use crate::{
38 ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
39 PhysicalSortRequirement,
40};
41
42use arrow::datatypes::SchemaRef;
43use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
44use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
45use datafusion_expr::interval_arithmetic::Interval;
46use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
47use datafusion_physical_expr_common::sort_expr::options_compatible;
48use datafusion_physical_expr_common::utils::ExprPropertiesNode;
49
50use indexmap::IndexSet;
51use itertools::Itertools;
52
53/// `EquivalenceProperties` stores information about the output of a plan node
54/// that can be used to optimize the plan. Currently, it keeps track of:
55/// - Sort expressions (orderings),
56/// - Equivalent expressions; i.e. expressions known to have the same value.
57/// - Constants expressions; i.e. expressions known to contain a single constant
58/// value.
59///
60/// Please see the [Using Ordering for Better Plans] blog for more details.
61///
62/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties` tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example, if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case, columns `a` and `b` always have the same value. With this
101/// information, Datafusion can optimize various operations. For example, if
102/// the partition requirement is `Hash(a)` and output partitioning is
103/// `Hash(b)`, then DataFusion avoids repartitioning the data as the existing
104/// partitioning satisfies the requirement.
105///
106/// # Code Example
107/// ```
108/// # use std::sync::Arc;
109/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
110/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
111/// # use datafusion_physical_expr::expressions::col;
112/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
113/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
114/// # Field::new("a", DataType::Int32, false),
115/// # Field::new("b", DataType::Int32, false),
116/// # Field::new("c", DataType::Int32, false),
117/// # ]));
118/// # let col_a = col("a", &schema).unwrap();
119/// # let col_b = col("b", &schema).unwrap();
120/// # let col_c = col("c", &schema).unwrap();
121/// // This object represents data that is sorted by a ASC, c DESC
122/// // with a single constant value of b
123/// let mut eq_properties = EquivalenceProperties::new(schema);
124/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
125/// eq_properties.add_ordering([
126/// PhysicalSortExpr::new_default(col_a).asc(),
127/// PhysicalSortExpr::new_default(col_c).desc(),
128/// ]);
129///
130/// assert_eq!(
131/// eq_properties.to_string(),
132/// "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: (heterogeneous)}]"
133/// );
134/// ```
135#[derive(Clone, Debug)]
136pub struct EquivalenceProperties {
137 /// Distinct equivalence classes (i.e. expressions with the same value).
138 eq_group: EquivalenceGroup,
139 /// Equivalent sort expressions (i.e. those define the same ordering).
140 oeq_class: OrderingEquivalenceClass,
141 /// Cache storing equivalent sort expressions in normal form (i.e. without
142 /// constants/duplicates and in standard form) and a map associating leading
143 /// terms with full sort expressions.
144 oeq_cache: OrderingEquivalenceCache,
145 /// Table constraints that factor in equivalence calculations.
146 constraints: Constraints,
147 /// Schema associated with this object.
148 schema: SchemaRef,
149}
150
151/// This object serves as a cache for storing equivalent sort expressions
152/// in normal form, and a map associating leading sort expressions with
153/// full lexicographical orderings. With this information, DataFusion can
154/// efficiently determine whether a given ordering is satisfied by the
155/// existing orderings, and discover new orderings based on the existing
156/// equivalence properties.
157#[derive(Clone, Debug, Default)]
158struct OrderingEquivalenceCache {
159 /// Equivalent sort expressions in normal form.
160 normal_cls: OrderingEquivalenceClass,
161 /// Map associating leading sort expressions with full lexicographical
162 /// orderings. Values are indices into `normal_cls`.
163 leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
164}
165
166impl OrderingEquivalenceCache {
167 /// Creates a new `OrderingEquivalenceCache` object with the given
168 /// equivalent orderings, which should be in normal form.
169 pub fn new(
170 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
171 ) -> Self {
172 let mut cache = Self {
173 normal_cls: OrderingEquivalenceClass::new(orderings),
174 leading_map: HashMap::new(),
175 };
176 cache.update_map();
177 cache
178 }
179
180 /// Updates/reconstructs the leading expression map according to the normal
181 /// ordering equivalence class within.
182 pub fn update_map(&mut self) {
183 self.leading_map.clear();
184 for (idx, ordering) in self.normal_cls.iter().enumerate() {
185 let expr = Arc::clone(&ordering.first().expr);
186 self.leading_map.entry(expr).or_default().push(idx);
187 }
188 }
189
190 /// Clears the cache, removing all orderings and leading expressions.
191 pub fn clear(&mut self) {
192 self.normal_cls.clear();
193 self.leading_map.clear();
194 }
195}
196
197impl EquivalenceProperties {
198 /// Creates an empty `EquivalenceProperties` object.
199 pub fn new(schema: SchemaRef) -> Self {
200 Self {
201 eq_group: EquivalenceGroup::default(),
202 oeq_class: OrderingEquivalenceClass::default(),
203 oeq_cache: OrderingEquivalenceCache::default(),
204 constraints: Constraints::default(),
205 schema,
206 }
207 }
208
209 /// Adds constraints to the properties.
210 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
211 self.constraints = constraints;
212 self
213 }
214
215 /// Creates a new `EquivalenceProperties` object with the given orderings.
216 pub fn new_with_orderings(
217 schema: SchemaRef,
218 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
219 ) -> Self {
220 let eq_group = EquivalenceGroup::default();
221 let oeq_class = OrderingEquivalenceClass::new(orderings);
222 // Here, we can avoid performing a full normalization, and get by with
223 // only removing constants because the equivalence group is empty.
224 let normal_orderings = oeq_class.iter().cloned().map(|o| {
225 o.into_iter()
226 .filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
227 });
228 Self {
229 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
230 oeq_class,
231 eq_group,
232 constraints: Constraints::default(),
233 schema,
234 }
235 }
236
237 /// Returns the associated schema.
238 pub fn schema(&self) -> &SchemaRef {
239 &self.schema
240 }
241
242 /// Returns a reference to the ordering equivalence class within.
243 pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
244 &self.oeq_class
245 }
246
247 /// Returns a reference to the equivalence group within.
248 pub fn eq_group(&self) -> &EquivalenceGroup {
249 &self.eq_group
250 }
251
252 /// Returns a reference to the constraints within.
253 pub fn constraints(&self) -> &Constraints {
254 &self.constraints
255 }
256
257 /// Returns all the known constants expressions.
258 pub fn constants(&self) -> Vec<ConstExpr> {
259 self.eq_group
260 .iter()
261 .flat_map(|c| {
262 c.iter().filter_map(|expr| {
263 c.constant
264 .as_ref()
265 .map(|across| ConstExpr::new(Arc::clone(expr), across.clone()))
266 })
267 })
268 .collect()
269 }
270
271 /// Returns the output ordering of the properties.
272 pub fn output_ordering(&self) -> Option<LexOrdering> {
273 let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
274 self.normalize_sort_exprs(concat)
275 }
276
277 /// Extends this `EquivalenceProperties` with the `other` object.
278 pub fn extend(mut self, other: Self) -> Result<Self> {
279 self.constraints.extend(other.constraints);
280 self.add_equivalence_group(other.eq_group)?;
281 self.add_orderings(other.oeq_class);
282 Ok(self)
283 }
284
285 /// Clears (empties) the ordering equivalence class within this object.
286 /// Call this method when existing orderings are invalidated.
287 pub fn clear_orderings(&mut self) {
288 self.oeq_class.clear();
289 self.oeq_cache.clear();
290 }
291
292 /// Removes constant expressions that may change across partitions.
293 /// This method should be used when merging data from different partitions.
294 pub fn clear_per_partition_constants(&mut self) {
295 if self.eq_group.clear_per_partition_constants() {
296 // Renormalize orderings if the equivalence group changes:
297 let normal_orderings = self
298 .oeq_class
299 .iter()
300 .cloned()
301 .map(|o| self.eq_group.normalize_sort_exprs(o));
302 self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
303 }
304 }
305
306 /// Adds new orderings into the existing ordering equivalence class.
307 pub fn add_orderings(
308 &mut self,
309 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
310 ) {
311 let orderings: Vec<_> =
312 orderings.into_iter().filter_map(LexOrdering::new).collect();
313 let normal_orderings: Vec<_> = orderings
314 .iter()
315 .cloned()
316 .filter_map(|o| self.normalize_sort_exprs(o))
317 .collect();
318 if !normal_orderings.is_empty() {
319 self.oeq_class.extend(orderings);
320 // Normalize given orderings to update the cache:
321 self.oeq_cache.normal_cls.extend(normal_orderings);
322 // TODO: If no ordering is found to be redundant during extension, we
323 // can use a shortcut algorithm to update the leading map.
324 self.oeq_cache.update_map();
325 }
326 }
327
328 /// Adds a single ordering to the existing ordering equivalence class.
329 pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
330 self.add_orderings(std::iter::once(ordering));
331 }
332
333 fn update_oeq_cache(&mut self) -> Result<()> {
334 // Renormalize orderings if the equivalence group changes:
335 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
336 let normal_orderings = normal_cls
337 .into_iter()
338 .map(|o| self.eq_group.normalize_sort_exprs(o));
339 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
340 self.oeq_cache.update_map();
341 // Discover any new orderings based on the new equivalence classes:
342 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
343 for expr in leading_exprs {
344 self.discover_new_orderings(expr)?;
345 }
346 Ok(())
347 }
348
349 /// Incorporates the given equivalence group to into the existing
350 /// equivalence group within.
351 pub fn add_equivalence_group(
352 &mut self,
353 other_eq_group: EquivalenceGroup,
354 ) -> Result<()> {
355 if !other_eq_group.is_empty() {
356 self.eq_group.extend(other_eq_group);
357 self.update_oeq_cache()?;
358 }
359 Ok(())
360 }
361
362 /// Returns the ordering equivalence class within in normal form.
363 /// Normalization standardizes expressions according to the equivalence
364 /// group within, and removes constants/duplicates.
365 pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
366 self.oeq_class
367 .iter()
368 .cloned()
369 .filter_map(|ordering| self.normalize_sort_exprs(ordering))
370 .collect::<Vec<_>>()
371 .into()
372 }
373
374 /// Adds a new equality condition into the existing equivalence group.
375 /// If the given equality defines a new equivalence class, adds this new
376 /// equivalence class to the equivalence group.
377 pub fn add_equal_conditions(
378 &mut self,
379 left: Arc<dyn PhysicalExpr>,
380 right: Arc<dyn PhysicalExpr>,
381 ) -> Result<()> {
382 // Add equal expressions to the state:
383 if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
384 self.update_oeq_cache()?;
385 }
386 self.update_oeq_cache()?;
387 Ok(())
388 }
389
390 /// Track/register physical expressions with constant values.
391 pub fn add_constants(
392 &mut self,
393 constants: impl IntoIterator<Item = ConstExpr>,
394 ) -> Result<()> {
395 // Add the new constant to the equivalence group:
396 for constant in constants {
397 self.eq_group.add_constant(constant);
398 }
399 // Renormalize the orderings after adding new constants by removing
400 // the constants from existing orderings:
401 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
402 let normal_orderings = normal_cls.into_iter().map(|ordering| {
403 ordering.into_iter().filter(|sort_expr| {
404 self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
405 })
406 });
407 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
408 self.oeq_cache.update_map();
409 // Discover any new orderings based on the constants:
410 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
411 for expr in leading_exprs {
412 self.discover_new_orderings(expr)?;
413 }
414 Ok(())
415 }
416
417 /// Discover new valid orderings in light of a new equality. Accepts a single
418 /// argument (`expr`) which is used to determine the orderings to update.
419 /// When constants or equivalence classes change, there may be new orderings
420 /// that can be discovered with the new equivalence properties.
421 /// For a discussion, see: <https://github.com/apache/datafusion/issues/9812>
422 fn discover_new_orderings(
423 &mut self,
424 normal_expr: Arc<dyn PhysicalExpr>,
425 ) -> Result<()> {
426 let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
427 return Ok(());
428 };
429 let eq_class = self
430 .eq_group
431 .get_equivalence_class(&normal_expr)
432 .map_or_else(|| vec![normal_expr], |class| class.clone().into());
433
434 let mut new_orderings = vec![];
435 for idx in ordering_idxs {
436 let ordering = &self.oeq_cache.normal_cls[*idx];
437 let leading_ordering_options = ordering[0].options;
438
439 'exprs: for equivalent_expr in &eq_class {
440 let children = equivalent_expr.children();
441 if children.is_empty() {
442 continue;
443 }
444 // Check if all children match the next expressions in the ordering:
445 let mut child_properties = vec![];
446 // Build properties for each child based on the next expression:
447 for (i, child) in children.into_iter().enumerate() {
448 let Some(next) = ordering.get(i + 1) else {
449 break 'exprs;
450 };
451 if !next.expr.eq(child) {
452 break 'exprs;
453 }
454 let data_type = child.data_type(&self.schema)?;
455 child_properties.push(ExprProperties {
456 sort_properties: SortProperties::Ordered(next.options),
457 range: Interval::make_unbounded(&data_type)?,
458 preserves_lex_ordering: true,
459 });
460 }
461 // Check if the expression is monotonic in all arguments:
462 let expr_properties =
463 equivalent_expr.get_properties(&child_properties)?;
464 if expr_properties.preserves_lex_ordering
465 && expr_properties.sort_properties
466 == SortProperties::Ordered(leading_ordering_options)
467 {
468 // Assume that `[c ASC, a ASC, b ASC]` is among existing
469 // orderings. If equality `c = f(a, b)` is given, ordering
470 // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
471 // ordering `[a ASC, b ASC]` is also a valid ordering.
472 new_orderings.push(ordering[1..].to_vec());
473 break;
474 }
475 }
476 }
477
478 if !new_orderings.is_empty() {
479 self.add_orderings(new_orderings);
480 }
481 Ok(())
482 }
483
484 /// Updates the ordering equivalence class within assuming that the table
485 /// is re-sorted according to the argument `ordering`, and returns whether
486 /// this operation resulted in any change. Note that equivalence classes
487 /// (and constants) do not change as they are unaffected by a re-sort. If
488 /// the given ordering is already satisfied, the function does nothing.
489 pub fn reorder(
490 &mut self,
491 ordering: impl IntoIterator<Item = PhysicalSortExpr>,
492 ) -> Result<bool> {
493 let (ordering, ordering_tee) = ordering.into_iter().tee();
494 // First, standardize the given ordering:
495 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
496 // If the ordering vanishes after normalization, it is satisfied:
497 return Ok(false);
498 };
499 if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
500 // If the ordering is unsatisfied, replace existing orderings:
501 self.clear_orderings();
502 self.add_ordering(ordering_tee);
503 return Ok(true);
504 }
505 Ok(false)
506 }
507
508 /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
509 /// equivalence group within. Returns a `LexOrdering` instance if the
510 /// expressions define a proper lexicographical ordering. For more details,
511 /// see [`EquivalenceGroup::normalize_sort_exprs`].
512 pub fn normalize_sort_exprs(
513 &self,
514 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
515 ) -> Option<LexOrdering> {
516 LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
517 }
518
519 /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
520 /// equivalence group within. Returns a `LexRequirement` instance if the
521 /// expressions define a proper lexicographical requirement. For more
522 /// details, see [`EquivalenceGroup::normalize_sort_exprs`].
523 pub fn normalize_sort_requirements(
524 &self,
525 sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
526 ) -> Option<LexRequirement> {
527 LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
528 }
529
530 /// Iteratively checks whether the given ordering is satisfied by any of
531 /// the existing orderings. See [`Self::ordering_satisfy_requirement`] for
532 /// more details and examples.
533 pub fn ordering_satisfy(
534 &self,
535 given: impl IntoIterator<Item = PhysicalSortExpr>,
536 ) -> Result<bool> {
537 // First, standardize the given ordering:
538 let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
539 // If the ordering vanishes after normalization, it is satisfied:
540 return Ok(true);
541 };
542 Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
543 }
544
545 /// Iteratively checks whether the given sort requirement is satisfied by
546 /// any of the existing orderings.
547 ///
548 /// ### Example Scenarios
549 ///
550 /// In these scenarios, assume that all expressions share the same sort
551 /// properties.
552 ///
553 /// #### Case 1: Sort Requirement `[a, c]`
554 ///
555 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
556 /// 1. The function first checks the leading requirement `a`, which is
557 /// satisfied by `[a, b, c].first()`.
558 /// 2. `a` is added as a constant for the next iteration.
559 /// 3. Normal orderings become `[[b, c], [d]]`.
560 /// 4. The function fails for `c` in the second iteration, as neither
561 /// `[b, c]` nor `[d]` satisfies `c`.
562 ///
563 /// #### Case 2: Sort Requirement `[a, d]`
564 ///
565 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
566 /// 1. The function first checks the leading requirement `a`, which is
567 /// satisfied by `[a, b, c].first()`.
568 /// 2. `a` is added as a constant for the next iteration.
569 /// 3. Normal orderings become `[[b, c], [d]]`.
570 /// 4. The function returns `true` as `[d]` satisfies `d`.
571 pub fn ordering_satisfy_requirement(
572 &self,
573 given: impl IntoIterator<Item = PhysicalSortRequirement>,
574 ) -> Result<bool> {
575 // First, standardize the given requirement:
576 let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
577 // If the requirement vanishes after normalization, it is satisfied:
578 return Ok(true);
579 };
580 // Then, check whether given requirement is satisfied by constraints:
581 if self.satisfied_by_constraints(&normal_reqs) {
582 return Ok(true);
583 }
584 let schema = self.schema();
585 let mut eq_properties = self.clone();
586 for element in normal_reqs {
587 // Check whether given requirement is satisfied:
588 let ExprProperties {
589 sort_properties, ..
590 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
591 let satisfy = match sort_properties {
592 SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
593 let nullable = element.expr.nullable(schema).unwrap_or(true);
594 options_compatible(&options, &opts, nullable)
595 }),
596 // Singleton expressions satisfy any requirement.
597 SortProperties::Singleton => true,
598 SortProperties::Unordered => false,
599 };
600 if !satisfy {
601 return Ok(false);
602 }
603 // Treat satisfied keys as constants in subsequent iterations. We
604 // can do this because the "next" key only matters in a lexicographical
605 // ordering when the keys to its left have the same values.
606 //
607 // Note that these expressions are not properly "constants". This is just
608 // an implementation strategy confined to this function.
609 //
610 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
611 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
612 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
613 // we add column `a` as constant to the algorithm state. This enables us
614 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
615 let const_expr = ConstExpr::from(element.expr);
616 eq_properties.add_constants(std::iter::once(const_expr))?;
617 }
618 Ok(true)
619 }
620
621 /// Returns the number of consecutive sort expressions (starting from the
622 /// left) that are satisfied by the existing ordering.
623 fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
624 let full_length = normal_ordering.len();
625 // Check whether the given ordering is satisfied by constraints:
626 if self.satisfied_by_constraints_ordering(normal_ordering) {
627 // If constraints satisfy all sort expressions, return the full
628 // length:
629 return Ok(full_length);
630 }
631 let schema = self.schema();
632 let mut eq_properties = self.clone();
633 for (idx, element) in normal_ordering.into_iter().enumerate() {
634 // Check whether given ordering is satisfied:
635 let ExprProperties {
636 sort_properties, ..
637 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
638 let satisfy = match sort_properties {
639 SortProperties::Ordered(options) => options_compatible(
640 &options,
641 &element.options,
642 element.expr.nullable(schema).unwrap_or(true),
643 ),
644 // Singleton expressions satisfy any ordering.
645 SortProperties::Singleton => true,
646 SortProperties::Unordered => false,
647 };
648 if !satisfy {
649 // As soon as one sort expression is unsatisfied, return how
650 // many we've satisfied so far:
651 return Ok(idx);
652 }
653 // Treat satisfied keys as constants in subsequent iterations. We
654 // can do this because the "next" key only matters in a lexicographical
655 // ordering when the keys to its left have the same values.
656 //
657 // Note that these expressions are not properly "constants". This is just
658 // an implementation strategy confined to this function.
659 //
660 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
661 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
662 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
663 // we add column `a` as constant to the algorithm state. This enables us
664 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
665 let const_expr = ConstExpr::from(Arc::clone(&element.expr));
666 eq_properties.add_constants(std::iter::once(const_expr))?
667 }
668 // All sort expressions are satisfied, return full length:
669 Ok(full_length)
670 }
671
672 /// Determines the longest normal prefix of `ordering` satisfied by the
673 /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
674 /// boolean indicating whether all the sort expressions are satisfied.
675 pub fn extract_common_sort_prefix(
676 &self,
677 ordering: LexOrdering,
678 ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
679 // First, standardize the given ordering:
680 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
681 // If the ordering vanishes after normalization, it is satisfied:
682 return Ok((vec![], true));
683 };
684 let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
685 let flag = prefix_len == normal_ordering.len();
686 let mut sort_exprs: Vec<_> = normal_ordering.into();
687 if !flag {
688 sort_exprs.truncate(prefix_len);
689 }
690 Ok((sort_exprs, flag))
691 }
692
693 /// Checks if the sort expressions are satisfied by any of the table
694 /// constraints (primary key or unique). Returns true if any constraint
695 /// fully satisfies the expressions (i.e. constraint indices form a valid
696 /// prefix of an existing ordering that matches the expressions). For
697 /// unique constraints, also verifies nullable columns.
698 fn satisfied_by_constraints_ordering(
699 &self,
700 normal_exprs: &[PhysicalSortExpr],
701 ) -> bool {
702 self.constraints.iter().any(|constraint| match constraint {
703 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
704 let check_null = matches!(constraint, Constraint::Unique(_));
705 let normalized_size = normal_exprs.len();
706 indices.len() <= normalized_size
707 && self.oeq_class.iter().any(|ordering| {
708 let length = ordering.len();
709 if indices.len() > length || normalized_size < length {
710 return false;
711 }
712 // Build a map of column positions in the ordering:
713 let mut col_positions = HashMap::with_capacity(length);
714 for (pos, req) in ordering.iter().enumerate() {
715 if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
716 {
717 let nullable = col.nullable(&self.schema).unwrap_or(true);
718 col_positions.insert(col.index(), (pos, nullable));
719 }
720 }
721 // Check if all constraint indices appear in valid positions:
722 if !indices.iter().all(|idx| {
723 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
724 // For unique constraints, verify column is not nullable if it's first/last:
725 !check_null
726 || !nullable
727 || (pos != 0 && pos != length - 1)
728 })
729 }) {
730 return false;
731 }
732 // Check if this ordering matches the prefix:
733 normal_exprs.iter().zip(ordering).all(|(given, existing)| {
734 existing.satisfy_expr(given, &self.schema)
735 })
736 })
737 }
738 })
739 }
740
741 /// Checks if the sort requirements are satisfied by any of the table
742 /// constraints (primary key or unique). Returns true if any constraint
743 /// fully satisfies the requirements (i.e. constraint indices form a valid
744 /// prefix of an existing ordering that matches the requirements). For
745 /// unique constraints, also verifies nullable columns.
746 fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
747 self.constraints.iter().any(|constraint| match constraint {
748 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
749 let check_null = matches!(constraint, Constraint::Unique(_));
750 let normalized_size = normal_reqs.len();
751 indices.len() <= normalized_size
752 && self.oeq_class.iter().any(|ordering| {
753 let length = ordering.len();
754 if indices.len() > length || normalized_size < length {
755 return false;
756 }
757 // Build a map of column positions in the ordering:
758 let mut col_positions = HashMap::with_capacity(length);
759 for (pos, req) in ordering.iter().enumerate() {
760 if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
761 {
762 let nullable = col.nullable(&self.schema).unwrap_or(true);
763 col_positions.insert(col.index(), (pos, nullable));
764 }
765 }
766 // Check if all constraint indices appear in valid positions:
767 if !indices.iter().all(|idx| {
768 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
769 // For unique constraints, verify column is not nullable if it's first/last:
770 !check_null
771 || !nullable
772 || (pos != 0 && pos != length - 1)
773 })
774 }) {
775 return false;
776 }
777 // Check if this ordering matches the prefix:
778 normal_reqs.iter().zip(ordering).all(|(given, existing)| {
779 existing.satisfy(given, &self.schema)
780 })
781 })
782 }
783 })
784 }
785
786 /// Checks whether the `given` sort requirements are equal or more specific
787 /// than the `reference` sort requirements.
788 pub fn requirements_compatible(
789 &self,
790 given: LexRequirement,
791 reference: LexRequirement,
792 ) -> bool {
793 let Some(normal_given) = self.normalize_sort_requirements(given) else {
794 return true;
795 };
796 let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
797 return true;
798 };
799
800 (normal_reference.len() <= normal_given.len())
801 && normal_reference
802 .into_iter()
803 .zip(normal_given)
804 .all(|(reference, given)| given.compatible(&reference))
805 }
806
807 /// Modify existing orderings by substituting sort expressions with appropriate
808 /// targets from the projection mapping. We substitute a sort expression when
809 /// its physical expression has a one-to-one functional relationship with a
810 /// target expression in the mapping.
811 ///
812 /// After substitution, we may generate more than one `LexOrdering` for each
813 /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
814 /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying projection
815 /// expressions `a, b, CAST(a)`.
816 ///
817 /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is
818 /// sorted, `atan(x + 1000)` should also be substituted. For now, we
819 /// only consider single-column `CAST` expressions.
820 fn substitute_oeq_class(
821 schema: &SchemaRef,
822 mapping: &ProjectionMapping,
823 oeq_class: OrderingEquivalenceClass,
824 ) -> OrderingEquivalenceClass {
825 let new_orderings = oeq_class.into_iter().flat_map(|order| {
826 // Modify/expand existing orderings by substituting sort
827 // expressions with appropriate targets from the mapping:
828 order
829 .into_iter()
830 .map(|sort_expr| {
831 let referring_exprs = mapping
832 .iter()
833 .map(|(source, _target)| source)
834 .filter(|source| expr_refers(source, &sort_expr.expr))
835 .cloned();
836 let mut result = vec![];
837 // The sort expression comes from this schema, so the
838 // following call to `unwrap` is safe.
839 let expr_type = sort_expr.expr.data_type(schema).unwrap();
840 // TODO: Add one-to-one analysis for ScalarFunctions.
841 for r_expr in referring_exprs {
842 // We check whether this expression is substitutable.
843 if let Some(cast_expr) =
844 r_expr.as_any().downcast_ref::<CastExpr>()
845 {
846 // For casts, we need to know whether the cast
847 // expression matches:
848 if cast_expr.expr.eq(&sort_expr.expr)
849 && cast_expr.is_bigger_cast(&expr_type)
850 {
851 result.push(PhysicalSortExpr::new(
852 r_expr,
853 sort_expr.options,
854 ));
855 }
856 }
857 }
858 result.push(sort_expr);
859 result
860 })
861 // Generate all valid orderings given substituted expressions:
862 .multi_cartesian_product()
863 });
864 OrderingEquivalenceClass::new(new_orderings)
865 }
866
867 /// Projects argument `expr` according to the projection described by
868 /// `mapping`, taking equivalences into account.
869 ///
870 /// For example, assume that columns `a` and `c` are always equal, and that
871 /// the projection described by `mapping` encodes the following:
872 ///
873 /// ```text
874 /// a -> a1
875 /// b -> b1
876 /// ```
877 ///
878 /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
879 /// `Some(a1 + b1)` and `d` to `None`, meaning that it is not projectable.
880 pub fn project_expr(
881 &self,
882 expr: &Arc<dyn PhysicalExpr>,
883 mapping: &ProjectionMapping,
884 ) -> Option<Arc<dyn PhysicalExpr>> {
885 self.eq_group.project_expr(mapping, expr)
886 }
887
888 /// Projects the given `expressions` according to the projection described
889 /// by `mapping`, taking equivalences into account. This function is similar
890 /// to [`Self::project_expr`], but projects multiple expressions at once
891 /// more efficiently than calling `project_expr` for each expression.
892 pub fn project_expressions<'a>(
893 &'a self,
894 expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
895 mapping: &'a ProjectionMapping,
896 ) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
897 self.eq_group.project_expressions(mapping, expressions)
898 }
899
900 /// Constructs a dependency map based on existing orderings referred to in
901 /// the projection.
902 ///
903 /// This function analyzes the orderings in the normalized order-equivalence
904 /// class and builds a dependency map. The dependency map captures relationships
905 /// between expressions within the orderings, helping to identify dependencies
906 /// and construct valid projected orderings during projection operations.
907 ///
908 /// # Parameters
909 ///
910 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
911 /// relationship between source and target expressions.
912 ///
913 /// # Returns
914 ///
915 /// A [`DependencyMap`] representing the dependency map, where each
916 /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
917 ///
918 /// # Example
919 ///
920 /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
921 /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
922 /// Then, the dependency map will be:
923 ///
924 /// ```text
925 /// a ASC: Node {Some(a_new ASC), HashSet{}}
926 /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
927 /// c ASC: Node {None, HashSet{a ASC}}
928 /// ```
929 fn construct_dependency_map(
930 &self,
931 oeq_class: OrderingEquivalenceClass,
932 mapping: &ProjectionMapping,
933 ) -> DependencyMap {
934 let mut map = DependencyMap::default();
935 for ordering in oeq_class.into_iter() {
936 // Previous expression is a dependency. Note that there is no
937 // dependency for the leading expression.
938 if !self.insert_to_dependency_map(
939 mapping,
940 ordering[0].clone(),
941 None,
942 &mut map,
943 ) {
944 continue;
945 }
946 for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
947 if !self.insert_to_dependency_map(
948 mapping,
949 sort_expr,
950 Some(dependency),
951 &mut map,
952 ) {
953 // If we can't project, stop constructing the dependency map
954 // as remaining dependencies will be invalid post projection.
955 break;
956 }
957 }
958 }
959 map
960 }
961
962 /// Projects the sort expression according to the projection mapping and
963 /// inserts it into the dependency map with the given dependency. Returns
964 /// a boolean flag indicating whether the given expression is projectable.
965 fn insert_to_dependency_map(
966 &self,
967 mapping: &ProjectionMapping,
968 sort_expr: PhysicalSortExpr,
969 dependency: Option<PhysicalSortExpr>,
970 map: &mut DependencyMap,
971 ) -> bool {
972 let target_sort_expr = self
973 .project_expr(&sort_expr.expr, mapping)
974 .map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
975 let projectable = target_sort_expr.is_some();
976 if projectable
977 || mapping
978 .iter()
979 .any(|(source, _)| expr_refers(source, &sort_expr.expr))
980 {
981 // Add sort expressions that can be projected or referred to
982 // by any of the projection expressions to the dependency map:
983 map.insert(sort_expr, target_sort_expr, dependency);
984 }
985 projectable
986 }
987
988 /// Returns a new `ProjectionMapping` where source expressions are in normal
989 /// form. Normalization ensures that source expressions are transformed into
990 /// a consistent representation, which is beneficial for algorithms that rely
991 /// on exact equalities, as it allows for more precise and reliable comparisons.
992 ///
993 /// # Parameters
994 ///
995 /// - `mapping`: A reference to the original `ProjectionMapping` to normalize.
996 ///
997 /// # Returns
998 ///
999 /// A new `ProjectionMapping` with source expressions in normal form.
1000 fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
1001 mapping
1002 .iter()
1003 .map(|(source, target)| {
1004 let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
1005 (normal_source, target.clone())
1006 })
1007 .collect()
1008 }
1009
1010 /// Computes projected orderings based on a given projection mapping.
1011 ///
1012 /// This function takes a `ProjectionMapping` and computes the possible
1013 /// orderings for the projected expressions. It considers dependencies
1014 /// between expressions and generates valid orderings according to the
1015 /// specified sort properties.
1016 ///
1017 /// # Parameters
1018 ///
1019 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
1020 /// relationship between source and target expressions.
1021 /// - `oeq_class`: The `OrderingEquivalenceClass` containing the orderings
1022 /// to project.
1023 ///
1024 /// # Returns
1025 ///
1026 /// A vector of all valid (but not in normal form) orderings after projection.
1027 fn projected_orderings(
1028 &self,
1029 mapping: &ProjectionMapping,
1030 mut oeq_class: OrderingEquivalenceClass,
1031 ) -> Vec<LexOrdering> {
1032 // Normalize source expressions in the mapping:
1033 let mapping = self.normalize_mapping(mapping);
1034 // Get dependency map for existing orderings:
1035 oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
1036 let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
1037 let orderings = mapping.iter().flat_map(|(source, targets)| {
1038 referred_dependencies(&dependency_map, source)
1039 .into_iter()
1040 .filter_map(|deps| {
1041 let ep = get_expr_properties(source, &deps, &self.schema);
1042 let sort_properties = ep.map(|prop| prop.sort_properties);
1043 if let Ok(SortProperties::Ordered(options)) = sort_properties {
1044 Some((options, deps))
1045 } else {
1046 // Do not consider unordered cases.
1047 None
1048 }
1049 })
1050 .flat_map(|(options, relevant_deps)| {
1051 // Generate dependent orderings (i.e. prefixes for targets):
1052 let dependency_orderings =
1053 generate_dependency_orderings(&relevant_deps, &dependency_map);
1054 let sort_exprs = targets.iter().map(|(target, _)| {
1055 PhysicalSortExpr::new(Arc::clone(target), options)
1056 });
1057 if dependency_orderings.is_empty() {
1058 sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
1059 } else {
1060 sort_exprs
1061 .flat_map(|sort_expr| {
1062 let mut result = dependency_orderings.clone();
1063 for ordering in result.iter_mut() {
1064 ordering.push(sort_expr.clone());
1065 }
1066 result
1067 })
1068 .collect::<Vec<_>>()
1069 }
1070 })
1071 });
1072
1073 // Add valid projected orderings. For example, if existing ordering is
1074 // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1075 // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1076 // `b_new` themselves need not be ordered. Such dependencies cannot be
1077 // deduced via the pass above.
1078 let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1079 let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1080 if prefixes.is_empty() {
1081 // If prefix is empty, there is no dependency. Insert
1082 // empty ordering:
1083 if let Some(target) = &node.target {
1084 prefixes.push([target.clone()].into());
1085 }
1086 } else {
1087 // Append current ordering on top its dependencies:
1088 for ordering in prefixes.iter_mut() {
1089 if let Some(target) = &node.target {
1090 ordering.push(target.clone());
1091 }
1092 }
1093 }
1094 prefixes
1095 });
1096
1097 // Simplify each ordering by removing redundant sections:
1098 orderings.chain(projected_orderings).collect()
1099 }
1100
1101 /// Projects constraints according to the given projection mapping.
1102 ///
1103 /// This function takes a projection mapping and extracts column indices of
1104 /// target columns. It then projects the constraints to only include
1105 /// relationships between columns that exist in the projected output.
1106 ///
1107 /// # Parameters
1108 ///
1109 /// * `mapping` - A reference to the `ProjectionMapping` that defines the
1110 /// projection operation.
1111 ///
1112 /// # Returns
1113 ///
1114 /// Returns an optional `Constraints` object containing only the constraints
1115 /// that are valid for the projected columns (if any exists).
1116 fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1117 let indices = mapping
1118 .iter()
1119 .flat_map(|(_, targets)| {
1120 targets.iter().flat_map(|(target, _)| {
1121 target.as_any().downcast_ref::<Column>().map(|c| c.index())
1122 })
1123 })
1124 .collect::<Vec<_>>();
1125 self.constraints.project(&indices)
1126 }
1127
1128 /// Projects the equivalences within according to `mapping` and
1129 /// `output_schema`.
1130 pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1131 let eq_group = self.eq_group.project(mapping);
1132 let orderings =
1133 self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
1134 let normal_orderings = orderings
1135 .iter()
1136 .cloned()
1137 .map(|o| eq_group.normalize_sort_exprs(o));
1138 Self {
1139 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
1140 oeq_class: OrderingEquivalenceClass::new(orderings),
1141 constraints: self.projected_constraints(mapping).unwrap_or_default(),
1142 schema: output_schema,
1143 eq_group,
1144 }
1145 }
1146
1147 /// Returns the longest (potentially partial) permutation satisfying the
1148 /// existing ordering. For example, if we have the equivalent orderings
1149 /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1150 /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1151 /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1152 /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1153 /// inside the argument `exprs` (respectively). For the mathematical
1154 /// definition of "partial permutation", see:
1155 ///
1156 /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1157 pub fn find_longest_permutation(
1158 &self,
1159 exprs: &[Arc<dyn PhysicalExpr>],
1160 ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
1161 let mut eq_properties = self.clone();
1162 let mut result = vec![];
1163 // The algorithm is as follows:
1164 // - Iterate over all the expressions and insert ordered expressions
1165 // into the result.
1166 // - Treat inserted expressions as constants (i.e. add them as constants
1167 // to the state).
1168 // - Continue the above procedure until no expression is inserted; i.e.
1169 // the algorithm reaches a fixed point.
1170 // This algorithm should reach a fixed point in at most `exprs.len()`
1171 // iterations.
1172 let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1173 for _ in 0..exprs.len() {
1174 // Get ordered expressions with their indices.
1175 let ordered_exprs = search_indices
1176 .iter()
1177 .filter_map(|&idx| {
1178 let ExprProperties {
1179 sort_properties, ..
1180 } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1181 match sort_properties {
1182 SortProperties::Ordered(options) => {
1183 let expr = Arc::clone(&exprs[idx]);
1184 Some((PhysicalSortExpr::new(expr, options), idx))
1185 }
1186 SortProperties::Singleton => {
1187 // Assign default ordering to constant expressions:
1188 let expr = Arc::clone(&exprs[idx]);
1189 Some((PhysicalSortExpr::new_default(expr), idx))
1190 }
1191 SortProperties::Unordered => None,
1192 }
1193 })
1194 .collect::<Vec<_>>();
1195 // We reached a fixed point, exit.
1196 if ordered_exprs.is_empty() {
1197 break;
1198 }
1199 // Remove indices that have an ordering from `search_indices`, and
1200 // treat ordered expressions as constants in subsequent iterations.
1201 // We can do this because the "next" key only matters in a lexicographical
1202 // ordering when the keys to its left have the same values.
1203 //
1204 // Note that these expressions are not properly "constants". This is just
1205 // an implementation strategy confined to this function.
1206 for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1207 let const_expr = ConstExpr::from(Arc::clone(expr));
1208 eq_properties.add_constants(std::iter::once(const_expr))?;
1209 search_indices.shift_remove(idx);
1210 }
1211 // Add new ordered section to the state.
1212 result.extend(ordered_exprs);
1213 }
1214 Ok(result.into_iter().unzip())
1215 }
1216
1217 /// This function determines whether the provided expression is constant
1218 /// based on the known constants. For example, if columns `a` and `b` are
1219 /// constant, then expressions `a`, `b` and `a + b` will all return `true`
1220 /// whereas expression `c` will return `false`.
1221 ///
1222 /// # Parameters
1223 ///
1224 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1225 /// expression to be checked.
1226 ///
1227 /// # Returns
1228 ///
1229 /// Returns a `Some` value if the expression is constant according to
1230 /// equivalence group, and `None` otherwise. The `Some` variant contains
1231 /// an `AcrossPartitions` value indicating whether the expression is
1232 /// constant across partitions, and its actual value (if available).
1233 pub fn is_expr_constant(
1234 &self,
1235 expr: &Arc<dyn PhysicalExpr>,
1236 ) -> Option<AcrossPartitions> {
1237 self.eq_group.is_expr_constant(expr)
1238 }
1239
1240 /// Retrieves the properties for a given physical expression.
1241 ///
1242 /// This function constructs an [`ExprProperties`] object for the given
1243 /// expression, which encapsulates information about the expression's
1244 /// properties, including its [`SortProperties`] and [`Interval`].
1245 ///
1246 /// # Parameters
1247 ///
1248 /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1249 /// for which ordering information is sought.
1250 ///
1251 /// # Returns
1252 ///
1253 /// Returns an [`ExprProperties`] object containing the ordering and range
1254 /// information for the given expression.
1255 pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1256 ExprPropertiesNode::new_unknown(expr)
1257 .transform_up(|expr| update_properties(expr, self))
1258 .data()
1259 .map(|node| node.data)
1260 .unwrap_or_else(|_| ExprProperties::new_unknown())
1261 }
1262
1263 /// Transforms this `EquivalenceProperties` by mapping columns in the
1264 /// original schema to columns in the new schema by index.
1265 pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
1266 // The new schema and the original schema is aligned when they have the
1267 // same number of columns, and fields at the same index have the same
1268 // type in both schemas.
1269 let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1270 && self
1271 .schema
1272 .fields
1273 .iter()
1274 .zip(schema.fields.iter())
1275 .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1276 if !schemas_aligned {
1277 // Rewriting equivalence properties in terms of new schema is not
1278 // safe when schemas are not aligned:
1279 return plan_err!(
1280 "Schemas have to be aligned to rewrite equivalences:\n Old schema: {:?}\n New schema: {:?}",
1281 self.schema,
1282 schema
1283 );
1284 }
1285
1286 // Rewrite equivalence classes according to the new schema:
1287 let mut eq_classes = vec![];
1288 for mut eq_class in self.eq_group {
1289 // Rewrite the expressions in the equivalence class:
1290 eq_class.exprs = eq_class
1291 .exprs
1292 .into_iter()
1293 .map(|expr| with_new_schema(expr, &schema))
1294 .collect::<Result<_>>()?;
1295 // Rewrite the constant value (if available and known):
1296 let data_type = eq_class
1297 .canonical_expr()
1298 .map(|e| e.data_type(&schema))
1299 .transpose()?;
1300 if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
1301 (data_type, &mut eq_class.constant)
1302 {
1303 *value = value.cast_to(&data_type)?;
1304 }
1305 eq_classes.push(eq_class);
1306 }
1307 self.eq_group = eq_classes.into();
1308
1309 // Rewrite orderings according to new schema:
1310 self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
1311 self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
1312
1313 // Update the schema:
1314 self.schema = schema;
1315
1316 Ok(self)
1317 }
1318}
1319
1320impl From<EquivalenceProperties> for OrderingEquivalenceClass {
1321 fn from(eq_properties: EquivalenceProperties) -> Self {
1322 eq_properties.oeq_class
1323 }
1324}
1325
1326/// More readable display version of the `EquivalenceProperties`.
1327///
1328/// Format:
1329/// ```text
1330/// order: [[b@1 ASC NULLS LAST]], eq: [{members: [a@0], constant: (heterogeneous)}]
1331/// ```
1332impl Display for EquivalenceProperties {
1333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1334 let empty_eq_group = self.eq_group.is_empty();
1335 let empty_oeq_class = self.oeq_class.is_empty();
1336 if empty_oeq_class && empty_eq_group {
1337 write!(f, "No properties")?;
1338 } else if !empty_oeq_class {
1339 write!(f, "order: {}", self.oeq_class)?;
1340 if !empty_eq_group {
1341 write!(f, ", eq: {}", self.eq_group)?;
1342 }
1343 } else {
1344 write!(f, "eq: {}", self.eq_group)?;
1345 }
1346 Ok(())
1347 }
1348}
1349
1350/// Calculates the properties of a given [`ExprPropertiesNode`].
1351///
1352/// Order information can be retrieved as:
1353/// - If it is a leaf node, we directly find the order of the node by looking
1354/// at the given sort expression and equivalence properties if it is a `Column`
1355/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1356/// it as singleton so that it can cooperate with all ordered columns.
1357/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1358/// and operator has its own rules on how to propagate the children orderings.
1359/// However, before we engage in recursion, we check whether this intermediate
1360/// node directly matches with the sort expression. If there is a match, the
1361/// sort expression emerges at that node immediately, discarding the recursive
1362/// result coming from its children.
1363///
1364/// Range information is calculated as:
1365/// - If it is a `Literal` node, we set the range as a point value. If it is a
1366/// `Column` node, we set the datatype of the range, but cannot give an interval
1367/// for the range, yet.
1368/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1369/// and operator has its own rules on how to propagate the children range.
1370fn update_properties(
1371 mut node: ExprPropertiesNode,
1372 eq_properties: &EquivalenceProperties,
1373) -> Result<Transformed<ExprPropertiesNode>> {
1374 // First, try to gather the information from the children:
1375 if !node.expr.children().is_empty() {
1376 // We have an intermediate (non-leaf) node, account for its children:
1377 let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1378 node.data = node.expr.get_properties(&children_props)?;
1379 } else if node.expr.as_any().is::<Literal>() {
1380 // We have a Literal, which is one of the two possible leaf node types:
1381 node.data = node.expr.get_properties(&[])?;
1382 } else if node.expr.as_any().is::<Column>() {
1383 // We have a Column, which is the other possible leaf node type:
1384 node.data.range =
1385 Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1386 }
1387 // Now, check what we know about orderings:
1388 let normal_expr = eq_properties
1389 .eq_group
1390 .normalize_expr(Arc::clone(&node.expr));
1391 let oeq_class = &eq_properties.oeq_cache.normal_cls;
1392 if eq_properties.is_expr_constant(&normal_expr).is_some()
1393 || oeq_class.is_expr_partial_const(&normal_expr)
1394 {
1395 node.data.sort_properties = SortProperties::Singleton;
1396 } else if let Some(options) = oeq_class.get_options(&normal_expr) {
1397 node.data.sort_properties = SortProperties::Ordered(options);
1398 }
1399 Ok(Transformed::yes(node))
1400}
1401
1402/// This function examines whether a referring expression directly refers to a
1403/// given referred expression or if any of its children in the expression tree
1404/// refer to the specified expression.
1405///
1406/// # Parameters
1407///
1408/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1409/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1410///
1411/// # Returns
1412///
1413/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1414/// `referred_expr` or not.
1415fn expr_refers(
1416 referring_expr: &Arc<dyn PhysicalExpr>,
1417 referred_expr: &Arc<dyn PhysicalExpr>,
1418) -> bool {
1419 referring_expr.eq(referred_expr)
1420 || referring_expr
1421 .children()
1422 .iter()
1423 .any(|child| expr_refers(child, referred_expr))
1424}
1425
1426/// This function examines the given expression and its properties to determine
1427/// the ordering properties of the expression. The range knowledge is not utilized
1428/// yet in the scope of this function.
1429///
1430/// # Parameters
1431///
1432/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1433/// which ordering properties need to be determined.
1434/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1435/// referred to by `expr`.
1436/// - `schema``: A reference to the schema which the `expr` columns refer.
1437///
1438/// # Returns
1439///
1440/// A `SortProperties` indicating the ordering information of the given expression.
1441fn get_expr_properties(
1442 expr: &Arc<dyn PhysicalExpr>,
1443 dependencies: &Dependencies,
1444 schema: &SchemaRef,
1445) -> Result<ExprProperties> {
1446 if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1447 // If exact match is found, return its ordering.
1448 Ok(ExprProperties {
1449 sort_properties: SortProperties::Ordered(column_order.options),
1450 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1451 preserves_lex_ordering: false,
1452 })
1453 } else if expr.as_any().downcast_ref::<Column>().is_some() {
1454 Ok(ExprProperties {
1455 sort_properties: SortProperties::Unordered,
1456 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1457 preserves_lex_ordering: false,
1458 })
1459 } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1460 Ok(ExprProperties {
1461 sort_properties: SortProperties::Singleton,
1462 range: literal.value().into(),
1463 preserves_lex_ordering: true,
1464 })
1465 } else {
1466 // Find orderings of its children
1467 let child_states = expr
1468 .children()
1469 .iter()
1470 .map(|child| get_expr_properties(child, dependencies, schema))
1471 .collect::<Result<Vec<_>>>()?;
1472 // Calculate expression ordering using ordering of its children.
1473 expr.get_properties(&child_states)
1474 }
1475}