datafusion_physical_expr/equivalence/properties/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22pub use joins::*;
23pub use union::*;
24
25use std::fmt::{self, Display};
26use std::mem;
27use std::sync::Arc;
28
29use self::dependency::{
30 construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
31 Dependencies, DependencyMap,
32};
33use crate::equivalence::{
34 AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
35};
36use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
37use crate::{
38 ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
39 PhysicalSortRequirement,
40};
41
42use arrow::datatypes::SchemaRef;
43use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
44use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
45use datafusion_expr::interval_arithmetic::Interval;
46use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
47use datafusion_physical_expr_common::sort_expr::options_compatible;
48use datafusion_physical_expr_common::utils::ExprPropertiesNode;
49
50use indexmap::IndexSet;
51use itertools::Itertools;
52
53/// `EquivalenceProperties` stores information about the output of a plan node
54/// that can be used to optimize the plan. Currently, it keeps track of:
55/// - Sort expressions (orderings),
56/// - Equivalent expressions; i.e. expressions known to have the same value.
57/// - Constants expressions; i.e. expressions known to contain a single constant
58/// value.
59///
60/// Please see the [Using Ordering for Better Plans] blog for more details.
61///
62/// [Using Ordering for Better Plans]: https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties` tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example, if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case, columns `a` and `b` always have the same value. With this
101/// information, Datafusion can optimize various operations. For example, if
102/// the partition requirement is `Hash(a)` and output partitioning is
103/// `Hash(b)`, then DataFusion avoids repartitioning the data as the existing
104/// partitioning satisfies the requirement.
105///
106/// # Code Example
107/// ```
108/// # use std::sync::Arc;
109/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
110/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
111/// # use datafusion_physical_expr::expressions::col;
112/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
113/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
114/// # Field::new("a", DataType::Int32, false),
115/// # Field::new("b", DataType::Int32, false),
116/// # Field::new("c", DataType::Int32, false),
117/// # ]));
118/// # let col_a = col("a", &schema).unwrap();
119/// # let col_b = col("b", &schema).unwrap();
120/// # let col_c = col("c", &schema).unwrap();
121/// // This object represents data that is sorted by a ASC, c DESC
122/// // with a single constant value of b
123/// let mut eq_properties = EquivalenceProperties::new(schema);
124/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
125/// eq_properties.add_ordering([
126/// PhysicalSortExpr::new_default(col_a).asc(),
127/// PhysicalSortExpr::new_default(col_c).desc(),
128/// ]);
129///
130/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: (heterogeneous)}]");
131/// ```
132#[derive(Clone, Debug)]
133pub struct EquivalenceProperties {
134 /// Distinct equivalence classes (i.e. expressions with the same value).
135 eq_group: EquivalenceGroup,
136 /// Equivalent sort expressions (i.e. those define the same ordering).
137 oeq_class: OrderingEquivalenceClass,
138 /// Cache storing equivalent sort expressions in normal form (i.e. without
139 /// constants/duplicates and in standard form) and a map associating leading
140 /// terms with full sort expressions.
141 oeq_cache: OrderingEquivalenceCache,
142 /// Table constraints that factor in equivalence calculations.
143 constraints: Constraints,
144 /// Schema associated with this object.
145 schema: SchemaRef,
146}
147
148/// This object serves as a cache for storing equivalent sort expressions
149/// in normal form, and a map associating leading sort expressions with
150/// full lexicographical orderings. With this information, DataFusion can
151/// efficiently determine whether a given ordering is satisfied by the
152/// existing orderings, and discover new orderings based on the existing
153/// equivalence properties.
154#[derive(Clone, Debug, Default)]
155struct OrderingEquivalenceCache {
156 /// Equivalent sort expressions in normal form.
157 normal_cls: OrderingEquivalenceClass,
158 /// Map associating leading sort expressions with full lexicographical
159 /// orderings. Values are indices into `normal_cls`.
160 leading_map: HashMap<Arc<dyn PhysicalExpr>, Vec<usize>>,
161}
162
163impl OrderingEquivalenceCache {
164 /// Creates a new `OrderingEquivalenceCache` object with the given
165 /// equivalent orderings, which should be in normal form.
166 pub fn new(
167 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
168 ) -> Self {
169 let mut cache = Self {
170 normal_cls: OrderingEquivalenceClass::new(orderings),
171 leading_map: HashMap::new(),
172 };
173 cache.update_map();
174 cache
175 }
176
177 /// Updates/reconstructs the leading expression map according to the normal
178 /// ordering equivalence class within.
179 pub fn update_map(&mut self) {
180 self.leading_map.clear();
181 for (idx, ordering) in self.normal_cls.iter().enumerate() {
182 let expr = Arc::clone(&ordering.first().expr);
183 self.leading_map.entry(expr).or_default().push(idx);
184 }
185 }
186
187 /// Clears the cache, removing all orderings and leading expressions.
188 pub fn clear(&mut self) {
189 self.normal_cls.clear();
190 self.leading_map.clear();
191 }
192}
193
194impl EquivalenceProperties {
195 /// Creates an empty `EquivalenceProperties` object.
196 pub fn new(schema: SchemaRef) -> Self {
197 Self {
198 eq_group: EquivalenceGroup::default(),
199 oeq_class: OrderingEquivalenceClass::default(),
200 oeq_cache: OrderingEquivalenceCache::default(),
201 constraints: Constraints::default(),
202 schema,
203 }
204 }
205
206 /// Adds constraints to the properties.
207 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
208 self.constraints = constraints;
209 self
210 }
211
212 /// Creates a new `EquivalenceProperties` object with the given orderings.
213 pub fn new_with_orderings(
214 schema: SchemaRef,
215 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
216 ) -> Self {
217 let eq_group = EquivalenceGroup::default();
218 let oeq_class = OrderingEquivalenceClass::new(orderings);
219 // Here, we can avoid performing a full normalization, and get by with
220 // only removing constants because the equivalence group is empty.
221 let normal_orderings = oeq_class.iter().cloned().map(|o| {
222 o.into_iter()
223 .filter(|sort_expr| eq_group.is_expr_constant(&sort_expr.expr).is_none())
224 });
225 Self {
226 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
227 oeq_class,
228 eq_group,
229 constraints: Constraints::default(),
230 schema,
231 }
232 }
233
234 /// Returns the associated schema.
235 pub fn schema(&self) -> &SchemaRef {
236 &self.schema
237 }
238
239 /// Returns a reference to the ordering equivalence class within.
240 pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
241 &self.oeq_class
242 }
243
244 /// Returns a reference to the equivalence group within.
245 pub fn eq_group(&self) -> &EquivalenceGroup {
246 &self.eq_group
247 }
248
249 /// Returns a reference to the constraints within.
250 pub fn constraints(&self) -> &Constraints {
251 &self.constraints
252 }
253
254 /// Returns all the known constants expressions.
255 pub fn constants(&self) -> Vec<ConstExpr> {
256 self.eq_group
257 .iter()
258 .flat_map(|c| {
259 c.iter().filter_map(|expr| {
260 c.constant
261 .as_ref()
262 .map(|across| ConstExpr::new(Arc::clone(expr), across.clone()))
263 })
264 })
265 .collect()
266 }
267
268 /// Returns the output ordering of the properties.
269 pub fn output_ordering(&self) -> Option<LexOrdering> {
270 let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
271 self.normalize_sort_exprs(concat)
272 }
273
274 /// Extends this `EquivalenceProperties` with the `other` object.
275 pub fn extend(mut self, other: Self) -> Result<Self> {
276 self.constraints.extend(other.constraints);
277 self.add_equivalence_group(other.eq_group)?;
278 self.add_orderings(other.oeq_class);
279 Ok(self)
280 }
281
282 /// Clears (empties) the ordering equivalence class within this object.
283 /// Call this method when existing orderings are invalidated.
284 pub fn clear_orderings(&mut self) {
285 self.oeq_class.clear();
286 self.oeq_cache.clear();
287 }
288
289 /// Removes constant expressions that may change across partitions.
290 /// This method should be used when merging data from different partitions.
291 pub fn clear_per_partition_constants(&mut self) {
292 if self.eq_group.clear_per_partition_constants() {
293 // Renormalize orderings if the equivalence group changes:
294 let normal_orderings = self
295 .oeq_class
296 .iter()
297 .cloned()
298 .map(|o| self.eq_group.normalize_sort_exprs(o));
299 self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
300 }
301 }
302
303 /// Adds new orderings into the existing ordering equivalence class.
304 pub fn add_orderings(
305 &mut self,
306 orderings: impl IntoIterator<Item = impl IntoIterator<Item = PhysicalSortExpr>>,
307 ) {
308 let orderings: Vec<_> =
309 orderings.into_iter().filter_map(LexOrdering::new).collect();
310 let normal_orderings: Vec<_> = orderings
311 .iter()
312 .cloned()
313 .filter_map(|o| self.normalize_sort_exprs(o))
314 .collect();
315 if !normal_orderings.is_empty() {
316 self.oeq_class.extend(orderings);
317 // Normalize given orderings to update the cache:
318 self.oeq_cache.normal_cls.extend(normal_orderings);
319 // TODO: If no ordering is found to be redundant during extension, we
320 // can use a shortcut algorithm to update the leading map.
321 self.oeq_cache.update_map();
322 }
323 }
324
325 /// Adds a single ordering to the existing ordering equivalence class.
326 pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
327 self.add_orderings(std::iter::once(ordering));
328 }
329
330 fn update_oeq_cache(&mut self) -> Result<()> {
331 // Renormalize orderings if the equivalence group changes:
332 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
333 let normal_orderings = normal_cls
334 .into_iter()
335 .map(|o| self.eq_group.normalize_sort_exprs(o));
336 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
337 self.oeq_cache.update_map();
338 // Discover any new orderings based on the new equivalence classes:
339 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
340 for expr in leading_exprs {
341 self.discover_new_orderings(expr)?;
342 }
343 Ok(())
344 }
345
346 /// Incorporates the given equivalence group to into the existing
347 /// equivalence group within.
348 pub fn add_equivalence_group(
349 &mut self,
350 other_eq_group: EquivalenceGroup,
351 ) -> Result<()> {
352 if !other_eq_group.is_empty() {
353 self.eq_group.extend(other_eq_group);
354 self.update_oeq_cache()?;
355 }
356 Ok(())
357 }
358
359 /// Returns the ordering equivalence class within in normal form.
360 /// Normalization standardizes expressions according to the equivalence
361 /// group within, and removes constants/duplicates.
362 pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
363 self.oeq_class
364 .iter()
365 .cloned()
366 .filter_map(|ordering| self.normalize_sort_exprs(ordering))
367 .collect::<Vec<_>>()
368 .into()
369 }
370
371 /// Adds a new equality condition into the existing equivalence group.
372 /// If the given equality defines a new equivalence class, adds this new
373 /// equivalence class to the equivalence group.
374 pub fn add_equal_conditions(
375 &mut self,
376 left: Arc<dyn PhysicalExpr>,
377 right: Arc<dyn PhysicalExpr>,
378 ) -> Result<()> {
379 // Add equal expressions to the state:
380 if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
381 self.update_oeq_cache()?;
382 }
383 self.update_oeq_cache()?;
384 Ok(())
385 }
386
387 /// Track/register physical expressions with constant values.
388 pub fn add_constants(
389 &mut self,
390 constants: impl IntoIterator<Item = ConstExpr>,
391 ) -> Result<()> {
392 // Add the new constant to the equivalence group:
393 for constant in constants {
394 self.eq_group.add_constant(constant);
395 }
396 // Renormalize the orderings after adding new constants by removing
397 // the constants from existing orderings:
398 let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
399 let normal_orderings = normal_cls.into_iter().map(|ordering| {
400 ordering.into_iter().filter(|sort_expr| {
401 self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
402 })
403 });
404 self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
405 self.oeq_cache.update_map();
406 // Discover any new orderings based on the constants:
407 let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
408 for expr in leading_exprs {
409 self.discover_new_orderings(expr)?;
410 }
411 Ok(())
412 }
413
414 /// Discover new valid orderings in light of a new equality. Accepts a single
415 /// argument (`expr`) which is used to determine the orderings to update.
416 /// When constants or equivalence classes change, there may be new orderings
417 /// that can be discovered with the new equivalence properties.
418 /// For a discussion, see: <https://github.com/apache/datafusion/issues/9812>
419 fn discover_new_orderings(
420 &mut self,
421 normal_expr: Arc<dyn PhysicalExpr>,
422 ) -> Result<()> {
423 let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) else {
424 return Ok(());
425 };
426 let eq_class = self
427 .eq_group
428 .get_equivalence_class(&normal_expr)
429 .map_or_else(|| vec![normal_expr], |class| class.clone().into());
430
431 let mut new_orderings = vec![];
432 for idx in ordering_idxs {
433 let ordering = &self.oeq_cache.normal_cls[*idx];
434 let leading_ordering_options = ordering[0].options;
435
436 'exprs: for equivalent_expr in &eq_class {
437 let children = equivalent_expr.children();
438 if children.is_empty() {
439 continue;
440 }
441 // Check if all children match the next expressions in the ordering:
442 let mut child_properties = vec![];
443 // Build properties for each child based on the next expression:
444 for (i, child) in children.into_iter().enumerate() {
445 let Some(next) = ordering.get(i + 1) else {
446 break 'exprs;
447 };
448 if !next.expr.eq(child) {
449 break 'exprs;
450 }
451 let data_type = child.data_type(&self.schema)?;
452 child_properties.push(ExprProperties {
453 sort_properties: SortProperties::Ordered(next.options),
454 range: Interval::make_unbounded(&data_type)?,
455 preserves_lex_ordering: true,
456 });
457 }
458 // Check if the expression is monotonic in all arguments:
459 let expr_properties =
460 equivalent_expr.get_properties(&child_properties)?;
461 if expr_properties.preserves_lex_ordering
462 && expr_properties.sort_properties
463 == SortProperties::Ordered(leading_ordering_options)
464 {
465 // Assume that `[c ASC, a ASC, b ASC]` is among existing
466 // orderings. If equality `c = f(a, b)` is given, ordering
467 // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
468 // ordering `[a ASC, b ASC]` is also a valid ordering.
469 new_orderings.push(ordering[1..].to_vec());
470 break;
471 }
472 }
473 }
474
475 if !new_orderings.is_empty() {
476 self.add_orderings(new_orderings);
477 }
478 Ok(())
479 }
480
481 /// Updates the ordering equivalence class within assuming that the table
482 /// is re-sorted according to the argument `ordering`, and returns whether
483 /// this operation resulted in any change. Note that equivalence classes
484 /// (and constants) do not change as they are unaffected by a re-sort. If
485 /// the given ordering is already satisfied, the function does nothing.
486 pub fn reorder(
487 &mut self,
488 ordering: impl IntoIterator<Item = PhysicalSortExpr>,
489 ) -> Result<bool> {
490 let (ordering, ordering_tee) = ordering.into_iter().tee();
491 // First, standardize the given ordering:
492 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
493 // If the ordering vanishes after normalization, it is satisfied:
494 return Ok(false);
495 };
496 if normal_ordering.len() != self.common_sort_prefix_length(&normal_ordering)? {
497 // If the ordering is unsatisfied, replace existing orderings:
498 self.clear_orderings();
499 self.add_ordering(ordering_tee);
500 return Ok(true);
501 }
502 Ok(false)
503 }
504
505 /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
506 /// equivalence group within. Returns a `LexOrdering` instance if the
507 /// expressions define a proper lexicographical ordering. For more details,
508 /// see [`EquivalenceGroup::normalize_sort_exprs`].
509 pub fn normalize_sort_exprs(
510 &self,
511 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
512 ) -> Option<LexOrdering> {
513 LexOrdering::new(self.eq_group.normalize_sort_exprs(sort_exprs))
514 }
515
516 /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
517 /// equivalence group within. Returns a `LexRequirement` instance if the
518 /// expressions define a proper lexicographical requirement. For more
519 /// details, see [`EquivalenceGroup::normalize_sort_exprs`].
520 pub fn normalize_sort_requirements(
521 &self,
522 sort_reqs: impl IntoIterator<Item = PhysicalSortRequirement>,
523 ) -> Option<LexRequirement> {
524 LexRequirement::new(self.eq_group.normalize_sort_requirements(sort_reqs))
525 }
526
527 /// Iteratively checks whether the given ordering is satisfied by any of
528 /// the existing orderings. See [`Self::ordering_satisfy_requirement`] for
529 /// more details and examples.
530 pub fn ordering_satisfy(
531 &self,
532 given: impl IntoIterator<Item = PhysicalSortExpr>,
533 ) -> Result<bool> {
534 // First, standardize the given ordering:
535 let Some(normal_ordering) = self.normalize_sort_exprs(given) else {
536 // If the ordering vanishes after normalization, it is satisfied:
537 return Ok(true);
538 };
539 Ok(normal_ordering.len() == self.common_sort_prefix_length(&normal_ordering)?)
540 }
541
542 /// Iteratively checks whether the given sort requirement is satisfied by
543 /// any of the existing orderings.
544 ///
545 /// ### Example Scenarios
546 ///
547 /// In these scenarios, assume that all expressions share the same sort
548 /// properties.
549 ///
550 /// #### Case 1: Sort Requirement `[a, c]`
551 ///
552 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
553 /// 1. The function first checks the leading requirement `a`, which is
554 /// satisfied by `[a, b, c].first()`.
555 /// 2. `a` is added as a constant for the next iteration.
556 /// 3. Normal orderings become `[[b, c], [d]]`.
557 /// 4. The function fails for `c` in the second iteration, as neither
558 /// `[b, c]` nor `[d]` satisfies `c`.
559 ///
560 /// #### Case 2: Sort Requirement `[a, d]`
561 ///
562 /// **Existing orderings:** `[[a, b, c], [a, d]]`, **constants:** `[]`
563 /// 1. The function first checks the leading requirement `a`, which is
564 /// satisfied by `[a, b, c].first()`.
565 /// 2. `a` is added as a constant for the next iteration.
566 /// 3. Normal orderings become `[[b, c], [d]]`.
567 /// 4. The function returns `true` as `[d]` satisfies `d`.
568 pub fn ordering_satisfy_requirement(
569 &self,
570 given: impl IntoIterator<Item = PhysicalSortRequirement>,
571 ) -> Result<bool> {
572 // First, standardize the given requirement:
573 let Some(normal_reqs) = self.normalize_sort_requirements(given) else {
574 // If the requirement vanishes after normalization, it is satisfied:
575 return Ok(true);
576 };
577 // Then, check whether given requirement is satisfied by constraints:
578 if self.satisfied_by_constraints(&normal_reqs) {
579 return Ok(true);
580 }
581 let schema = self.schema();
582 let mut eq_properties = self.clone();
583 for element in normal_reqs {
584 // Check whether given requirement is satisfied:
585 let ExprProperties {
586 sort_properties, ..
587 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
588 let satisfy = match sort_properties {
589 SortProperties::Ordered(options) => element.options.is_none_or(|opts| {
590 let nullable = element.expr.nullable(schema).unwrap_or(true);
591 options_compatible(&options, &opts, nullable)
592 }),
593 // Singleton expressions satisfy any requirement.
594 SortProperties::Singleton => true,
595 SortProperties::Unordered => false,
596 };
597 if !satisfy {
598 return Ok(false);
599 }
600 // Treat satisfied keys as constants in subsequent iterations. We
601 // can do this because the "next" key only matters in a lexicographical
602 // ordering when the keys to its left have the same values.
603 //
604 // Note that these expressions are not properly "constants". This is just
605 // an implementation strategy confined to this function.
606 //
607 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
608 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
609 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
610 // we add column `a` as constant to the algorithm state. This enables us
611 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
612 let const_expr = ConstExpr::from(element.expr);
613 eq_properties.add_constants(std::iter::once(const_expr))?;
614 }
615 Ok(true)
616 }
617
618 /// Returns the number of consecutive sort expressions (starting from the
619 /// left) that are satisfied by the existing ordering.
620 fn common_sort_prefix_length(&self, normal_ordering: &LexOrdering) -> Result<usize> {
621 let full_length = normal_ordering.len();
622 // Check whether the given ordering is satisfied by constraints:
623 if self.satisfied_by_constraints_ordering(normal_ordering) {
624 // If constraints satisfy all sort expressions, return the full
625 // length:
626 return Ok(full_length);
627 }
628 let schema = self.schema();
629 let mut eq_properties = self.clone();
630 for (idx, element) in normal_ordering.into_iter().enumerate() {
631 // Check whether given ordering is satisfied:
632 let ExprProperties {
633 sort_properties, ..
634 } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
635 let satisfy = match sort_properties {
636 SortProperties::Ordered(options) => options_compatible(
637 &options,
638 &element.options,
639 element.expr.nullable(schema).unwrap_or(true),
640 ),
641 // Singleton expressions satisfy any ordering.
642 SortProperties::Singleton => true,
643 SortProperties::Unordered => false,
644 };
645 if !satisfy {
646 // As soon as one sort expression is unsatisfied, return how
647 // many we've satisfied so far:
648 return Ok(idx);
649 }
650 // Treat satisfied keys as constants in subsequent iterations. We
651 // can do this because the "next" key only matters in a lexicographical
652 // ordering when the keys to its left have the same values.
653 //
654 // Note that these expressions are not properly "constants". This is just
655 // an implementation strategy confined to this function.
656 //
657 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
658 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
659 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
660 // we add column `a` as constant to the algorithm state. This enables us
661 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
662 let const_expr = ConstExpr::from(Arc::clone(&element.expr));
663 eq_properties.add_constants(std::iter::once(const_expr))?
664 }
665 // All sort expressions are satisfied, return full length:
666 Ok(full_length)
667 }
668
669 /// Determines the longest normal prefix of `ordering` satisfied by the
670 /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
671 /// boolean indicating whether all the sort expressions are satisfied.
672 pub fn extract_common_sort_prefix(
673 &self,
674 ordering: LexOrdering,
675 ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
676 // First, standardize the given ordering:
677 let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
678 // If the ordering vanishes after normalization, it is satisfied:
679 return Ok((vec![], true));
680 };
681 let prefix_len = self.common_sort_prefix_length(&normal_ordering)?;
682 let flag = prefix_len == normal_ordering.len();
683 let mut sort_exprs: Vec<_> = normal_ordering.into();
684 if !flag {
685 sort_exprs.truncate(prefix_len);
686 }
687 Ok((sort_exprs, flag))
688 }
689
690 /// Checks if the sort expressions are satisfied by any of the table
691 /// constraints (primary key or unique). Returns true if any constraint
692 /// fully satisfies the expressions (i.e. constraint indices form a valid
693 /// prefix of an existing ordering that matches the expressions). For
694 /// unique constraints, also verifies nullable columns.
695 fn satisfied_by_constraints_ordering(
696 &self,
697 normal_exprs: &[PhysicalSortExpr],
698 ) -> bool {
699 self.constraints.iter().any(|constraint| match constraint {
700 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
701 let check_null = matches!(constraint, Constraint::Unique(_));
702 let normalized_size = normal_exprs.len();
703 indices.len() <= normalized_size
704 && self.oeq_class.iter().any(|ordering| {
705 let length = ordering.len();
706 if indices.len() > length || normalized_size < length {
707 return false;
708 }
709 // Build a map of column positions in the ordering:
710 let mut col_positions = HashMap::with_capacity(length);
711 for (pos, req) in ordering.iter().enumerate() {
712 if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
713 {
714 let nullable = col.nullable(&self.schema).unwrap_or(true);
715 col_positions.insert(col.index(), (pos, nullable));
716 }
717 }
718 // Check if all constraint indices appear in valid positions:
719 if !indices.iter().all(|idx| {
720 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
721 // For unique constraints, verify column is not nullable if it's first/last:
722 !check_null
723 || !nullable
724 || (pos != 0 && pos != length - 1)
725 })
726 }) {
727 return false;
728 }
729 // Check if this ordering matches the prefix:
730 normal_exprs.iter().zip(ordering).all(|(given, existing)| {
731 existing.satisfy_expr(given, &self.schema)
732 })
733 })
734 }
735 })
736 }
737
738 /// Checks if the sort requirements are satisfied by any of the table
739 /// constraints (primary key or unique). Returns true if any constraint
740 /// fully satisfies the requirements (i.e. constraint indices form a valid
741 /// prefix of an existing ordering that matches the requirements). For
742 /// unique constraints, also verifies nullable columns.
743 fn satisfied_by_constraints(&self, normal_reqs: &[PhysicalSortRequirement]) -> bool {
744 self.constraints.iter().any(|constraint| match constraint {
745 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
746 let check_null = matches!(constraint, Constraint::Unique(_));
747 let normalized_size = normal_reqs.len();
748 indices.len() <= normalized_size
749 && self.oeq_class.iter().any(|ordering| {
750 let length = ordering.len();
751 if indices.len() > length || normalized_size < length {
752 return false;
753 }
754 // Build a map of column positions in the ordering:
755 let mut col_positions = HashMap::with_capacity(length);
756 for (pos, req) in ordering.iter().enumerate() {
757 if let Some(col) = req.expr.as_any().downcast_ref::<Column>()
758 {
759 let nullable = col.nullable(&self.schema).unwrap_or(true);
760 col_positions.insert(col.index(), (pos, nullable));
761 }
762 }
763 // Check if all constraint indices appear in valid positions:
764 if !indices.iter().all(|idx| {
765 col_positions.get(idx).is_some_and(|&(pos, nullable)| {
766 // For unique constraints, verify column is not nullable if it's first/last:
767 !check_null
768 || !nullable
769 || (pos != 0 && pos != length - 1)
770 })
771 }) {
772 return false;
773 }
774 // Check if this ordering matches the prefix:
775 normal_reqs.iter().zip(ordering).all(|(given, existing)| {
776 existing.satisfy(given, &self.schema)
777 })
778 })
779 }
780 })
781 }
782
783 /// Checks whether the `given` sort requirements are equal or more specific
784 /// than the `reference` sort requirements.
785 pub fn requirements_compatible(
786 &self,
787 given: LexRequirement,
788 reference: LexRequirement,
789 ) -> bool {
790 let Some(normal_given) = self.normalize_sort_requirements(given) else {
791 return true;
792 };
793 let Some(normal_reference) = self.normalize_sort_requirements(reference) else {
794 return true;
795 };
796
797 (normal_reference.len() <= normal_given.len())
798 && normal_reference
799 .into_iter()
800 .zip(normal_given)
801 .all(|(reference, given)| given.compatible(&reference))
802 }
803
804 /// Modify existing orderings by substituting sort expressions with appropriate
805 /// targets from the projection mapping. We substitute a sort expression when
806 /// its physical expression has a one-to-one functional relationship with a
807 /// target expression in the mapping.
808 ///
809 /// After substitution, we may generate more than one `LexOrdering` for each
810 /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
811 /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying projection
812 /// expressions `a, b, CAST(a)`.
813 ///
814 /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is
815 /// sorted, `atan(x + 1000)` should also be substituted. For now, we
816 /// only consider single-column `CAST` expressions.
817 fn substitute_oeq_class(
818 schema: &SchemaRef,
819 mapping: &ProjectionMapping,
820 oeq_class: OrderingEquivalenceClass,
821 ) -> OrderingEquivalenceClass {
822 let new_orderings = oeq_class.into_iter().flat_map(|order| {
823 // Modify/expand existing orderings by substituting sort
824 // expressions with appropriate targets from the mapping:
825 order
826 .into_iter()
827 .map(|sort_expr| {
828 let referring_exprs = mapping
829 .iter()
830 .map(|(source, _target)| source)
831 .filter(|source| expr_refers(source, &sort_expr.expr))
832 .cloned();
833 let mut result = vec![];
834 // The sort expression comes from this schema, so the
835 // following call to `unwrap` is safe.
836 let expr_type = sort_expr.expr.data_type(schema).unwrap();
837 // TODO: Add one-to-one analysis for ScalarFunctions.
838 for r_expr in referring_exprs {
839 // We check whether this expression is substitutable.
840 if let Some(cast_expr) =
841 r_expr.as_any().downcast_ref::<CastExpr>()
842 {
843 // For casts, we need to know whether the cast
844 // expression matches:
845 if cast_expr.expr.eq(&sort_expr.expr)
846 && cast_expr.is_bigger_cast(&expr_type)
847 {
848 result.push(PhysicalSortExpr::new(
849 r_expr,
850 sort_expr.options,
851 ));
852 }
853 }
854 }
855 result.push(sort_expr);
856 result
857 })
858 // Generate all valid orderings given substituted expressions:
859 .multi_cartesian_product()
860 });
861 OrderingEquivalenceClass::new(new_orderings)
862 }
863
864 /// Projects argument `expr` according to the projection described by
865 /// `mapping`, taking equivalences into account.
866 ///
867 /// For example, assume that columns `a` and `c` are always equal, and that
868 /// the projection described by `mapping` encodes the following:
869 ///
870 /// ```text
871 /// a -> a1
872 /// b -> b1
873 /// ```
874 ///
875 /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
876 /// `Some(a1 + b1)` and `d` to `None`, meaning that it is not projectable.
877 pub fn project_expr(
878 &self,
879 expr: &Arc<dyn PhysicalExpr>,
880 mapping: &ProjectionMapping,
881 ) -> Option<Arc<dyn PhysicalExpr>> {
882 self.eq_group.project_expr(mapping, expr)
883 }
884
885 /// Projects the given `expressions` according to the projection described
886 /// by `mapping`, taking equivalences into account. This function is similar
887 /// to [`Self::project_expr`], but projects multiple expressions at once
888 /// more efficiently than calling `project_expr` for each expression.
889 pub fn project_expressions<'a>(
890 &'a self,
891 expressions: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>> + 'a,
892 mapping: &'a ProjectionMapping,
893 ) -> impl Iterator<Item = Option<Arc<dyn PhysicalExpr>>> + 'a {
894 self.eq_group.project_expressions(mapping, expressions)
895 }
896
897 /// Constructs a dependency map based on existing orderings referred to in
898 /// the projection.
899 ///
900 /// This function analyzes the orderings in the normalized order-equivalence
901 /// class and builds a dependency map. The dependency map captures relationships
902 /// between expressions within the orderings, helping to identify dependencies
903 /// and construct valid projected orderings during projection operations.
904 ///
905 /// # Parameters
906 ///
907 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
908 /// relationship between source and target expressions.
909 ///
910 /// # Returns
911 ///
912 /// A [`DependencyMap`] representing the dependency map, where each
913 /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
914 ///
915 /// # Example
916 ///
917 /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
918 /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
919 /// Then, the dependency map will be:
920 ///
921 /// ```text
922 /// a ASC: Node {Some(a_new ASC), HashSet{}}
923 /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
924 /// c ASC: Node {None, HashSet{a ASC}}
925 /// ```
926 fn construct_dependency_map(
927 &self,
928 oeq_class: OrderingEquivalenceClass,
929 mapping: &ProjectionMapping,
930 ) -> DependencyMap {
931 let mut map = DependencyMap::default();
932 for ordering in oeq_class.into_iter() {
933 // Previous expression is a dependency. Note that there is no
934 // dependency for the leading expression.
935 if !self.insert_to_dependency_map(
936 mapping,
937 ordering[0].clone(),
938 None,
939 &mut map,
940 ) {
941 continue;
942 }
943 for (dependency, sort_expr) in ordering.into_iter().tuple_windows() {
944 if !self.insert_to_dependency_map(
945 mapping,
946 sort_expr,
947 Some(dependency),
948 &mut map,
949 ) {
950 // If we can't project, stop constructing the dependency map
951 // as remaining dependencies will be invalid post projection.
952 break;
953 }
954 }
955 }
956 map
957 }
958
959 /// Projects the sort expression according to the projection mapping and
960 /// inserts it into the dependency map with the given dependency. Returns
961 /// a boolean flag indicating whether the given expression is projectable.
962 fn insert_to_dependency_map(
963 &self,
964 mapping: &ProjectionMapping,
965 sort_expr: PhysicalSortExpr,
966 dependency: Option<PhysicalSortExpr>,
967 map: &mut DependencyMap,
968 ) -> bool {
969 let target_sort_expr = self
970 .project_expr(&sort_expr.expr, mapping)
971 .map(|expr| PhysicalSortExpr::new(expr, sort_expr.options));
972 let projectable = target_sort_expr.is_some();
973 if projectable
974 || mapping
975 .iter()
976 .any(|(source, _)| expr_refers(source, &sort_expr.expr))
977 {
978 // Add sort expressions that can be projected or referred to
979 // by any of the projection expressions to the dependency map:
980 map.insert(sort_expr, target_sort_expr, dependency);
981 }
982 projectable
983 }
984
985 /// Returns a new `ProjectionMapping` where source expressions are in normal
986 /// form. Normalization ensures that source expressions are transformed into
987 /// a consistent representation, which is beneficial for algorithms that rely
988 /// on exact equalities, as it allows for more precise and reliable comparisons.
989 ///
990 /// # Parameters
991 ///
992 /// - `mapping`: A reference to the original `ProjectionMapping` to normalize.
993 ///
994 /// # Returns
995 ///
996 /// A new `ProjectionMapping` with source expressions in normal form.
997 fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
998 mapping
999 .iter()
1000 .map(|(source, target)| {
1001 let normal_source = self.eq_group.normalize_expr(Arc::clone(source));
1002 (normal_source, target.clone())
1003 })
1004 .collect()
1005 }
1006
1007 /// Computes projected orderings based on a given projection mapping.
1008 ///
1009 /// This function takes a `ProjectionMapping` and computes the possible
1010 /// orderings for the projected expressions. It considers dependencies
1011 /// between expressions and generates valid orderings according to the
1012 /// specified sort properties.
1013 ///
1014 /// # Parameters
1015 ///
1016 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
1017 /// relationship between source and target expressions.
1018 /// - `oeq_class`: The `OrderingEquivalenceClass` containing the orderings
1019 /// to project.
1020 ///
1021 /// # Returns
1022 ///
1023 /// A vector of all valid (but not in normal form) orderings after projection.
1024 fn projected_orderings(
1025 &self,
1026 mapping: &ProjectionMapping,
1027 mut oeq_class: OrderingEquivalenceClass,
1028 ) -> Vec<LexOrdering> {
1029 // Normalize source expressions in the mapping:
1030 let mapping = self.normalize_mapping(mapping);
1031 // Get dependency map for existing orderings:
1032 oeq_class = Self::substitute_oeq_class(&self.schema, &mapping, oeq_class);
1033 let dependency_map = self.construct_dependency_map(oeq_class, &mapping);
1034 let orderings = mapping.iter().flat_map(|(source, targets)| {
1035 referred_dependencies(&dependency_map, source)
1036 .into_iter()
1037 .filter_map(|deps| {
1038 let ep = get_expr_properties(source, &deps, &self.schema);
1039 let sort_properties = ep.map(|prop| prop.sort_properties);
1040 if let Ok(SortProperties::Ordered(options)) = sort_properties {
1041 Some((options, deps))
1042 } else {
1043 // Do not consider unordered cases.
1044 None
1045 }
1046 })
1047 .flat_map(|(options, relevant_deps)| {
1048 // Generate dependent orderings (i.e. prefixes for targets):
1049 let dependency_orderings =
1050 generate_dependency_orderings(&relevant_deps, &dependency_map);
1051 let sort_exprs = targets.iter().map(|(target, _)| {
1052 PhysicalSortExpr::new(Arc::clone(target), options)
1053 });
1054 if dependency_orderings.is_empty() {
1055 sort_exprs.map(|sort_expr| [sort_expr].into()).collect()
1056 } else {
1057 sort_exprs
1058 .flat_map(|sort_expr| {
1059 let mut result = dependency_orderings.clone();
1060 for ordering in result.iter_mut() {
1061 ordering.push(sort_expr.clone());
1062 }
1063 result
1064 })
1065 .collect::<Vec<_>>()
1066 }
1067 })
1068 });
1069
1070 // Add valid projected orderings. For example, if existing ordering is
1071 // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
1072 // preserve `a_new + b_new` as ordered. Please note that `a_new` and
1073 // `b_new` themselves need not be ordered. Such dependencies cannot be
1074 // deduced via the pass above.
1075 let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
1076 let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1077 if prefixes.is_empty() {
1078 // If prefix is empty, there is no dependency. Insert
1079 // empty ordering:
1080 if let Some(target) = &node.target {
1081 prefixes.push([target.clone()].into());
1082 }
1083 } else {
1084 // Append current ordering on top its dependencies:
1085 for ordering in prefixes.iter_mut() {
1086 if let Some(target) = &node.target {
1087 ordering.push(target.clone());
1088 }
1089 }
1090 }
1091 prefixes
1092 });
1093
1094 // Simplify each ordering by removing redundant sections:
1095 orderings.chain(projected_orderings).collect()
1096 }
1097
1098 /// Projects constraints according to the given projection mapping.
1099 ///
1100 /// This function takes a projection mapping and extracts column indices of
1101 /// target columns. It then projects the constraints to only include
1102 /// relationships between columns that exist in the projected output.
1103 ///
1104 /// # Parameters
1105 ///
1106 /// * `mapping` - A reference to the `ProjectionMapping` that defines the
1107 /// projection operation.
1108 ///
1109 /// # Returns
1110 ///
1111 /// Returns an optional `Constraints` object containing only the constraints
1112 /// that are valid for the projected columns (if any exists).
1113 fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1114 let indices = mapping
1115 .iter()
1116 .flat_map(|(_, targets)| {
1117 targets.iter().flat_map(|(target, _)| {
1118 target.as_any().downcast_ref::<Column>().map(|c| c.index())
1119 })
1120 })
1121 .collect::<Vec<_>>();
1122 self.constraints.project(&indices)
1123 }
1124
1125 /// Projects the equivalences within according to `mapping` and
1126 /// `output_schema`.
1127 pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1128 let eq_group = self.eq_group.project(mapping);
1129 let orderings =
1130 self.projected_orderings(mapping, self.oeq_cache.normal_cls.clone());
1131 let normal_orderings = orderings
1132 .iter()
1133 .cloned()
1134 .map(|o| eq_group.normalize_sort_exprs(o));
1135 Self {
1136 oeq_cache: OrderingEquivalenceCache::new(normal_orderings),
1137 oeq_class: OrderingEquivalenceClass::new(orderings),
1138 constraints: self.projected_constraints(mapping).unwrap_or_default(),
1139 schema: output_schema,
1140 eq_group,
1141 }
1142 }
1143
1144 /// Returns the longest (potentially partial) permutation satisfying the
1145 /// existing ordering. For example, if we have the equivalent orderings
1146 /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1147 /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1148 /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1149 /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1150 /// inside the argument `exprs` (respectively). For the mathematical
1151 /// definition of "partial permutation", see:
1152 ///
1153 /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1154 pub fn find_longest_permutation(
1155 &self,
1156 exprs: &[Arc<dyn PhysicalExpr>],
1157 ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
1158 let mut eq_properties = self.clone();
1159 let mut result = vec![];
1160 // The algorithm is as follows:
1161 // - Iterate over all the expressions and insert ordered expressions
1162 // into the result.
1163 // - Treat inserted expressions as constants (i.e. add them as constants
1164 // to the state).
1165 // - Continue the above procedure until no expression is inserted; i.e.
1166 // the algorithm reaches a fixed point.
1167 // This algorithm should reach a fixed point in at most `exprs.len()`
1168 // iterations.
1169 let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1170 for _ in 0..exprs.len() {
1171 // Get ordered expressions with their indices.
1172 let ordered_exprs = search_indices
1173 .iter()
1174 .filter_map(|&idx| {
1175 let ExprProperties {
1176 sort_properties, ..
1177 } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1178 match sort_properties {
1179 SortProperties::Ordered(options) => {
1180 let expr = Arc::clone(&exprs[idx]);
1181 Some((PhysicalSortExpr::new(expr, options), idx))
1182 }
1183 SortProperties::Singleton => {
1184 // Assign default ordering to constant expressions:
1185 let expr = Arc::clone(&exprs[idx]);
1186 Some((PhysicalSortExpr::new_default(expr), idx))
1187 }
1188 SortProperties::Unordered => None,
1189 }
1190 })
1191 .collect::<Vec<_>>();
1192 // We reached a fixed point, exit.
1193 if ordered_exprs.is_empty() {
1194 break;
1195 }
1196 // Remove indices that have an ordering from `search_indices`, and
1197 // treat ordered expressions as constants in subsequent iterations.
1198 // We can do this because the "next" key only matters in a lexicographical
1199 // ordering when the keys to its left have the same values.
1200 //
1201 // Note that these expressions are not properly "constants". This is just
1202 // an implementation strategy confined to this function.
1203 for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1204 let const_expr = ConstExpr::from(Arc::clone(expr));
1205 eq_properties.add_constants(std::iter::once(const_expr))?;
1206 search_indices.shift_remove(idx);
1207 }
1208 // Add new ordered section to the state.
1209 result.extend(ordered_exprs);
1210 }
1211 Ok(result.into_iter().unzip())
1212 }
1213
1214 /// This function determines whether the provided expression is constant
1215 /// based on the known constants. For example, if columns `a` and `b` are
1216 /// constant, then expressions `a`, `b` and `a + b` will all return `true`
1217 /// whereas expression `c` will return `false`.
1218 ///
1219 /// # Parameters
1220 ///
1221 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1222 /// expression to be checked.
1223 ///
1224 /// # Returns
1225 ///
1226 /// Returns a `Some` value if the expression is constant according to
1227 /// equivalence group, and `None` otherwise. The `Some` variant contains
1228 /// an `AcrossPartitions` value indicating whether the expression is
1229 /// constant across partitions, and its actual value (if available).
1230 pub fn is_expr_constant(
1231 &self,
1232 expr: &Arc<dyn PhysicalExpr>,
1233 ) -> Option<AcrossPartitions> {
1234 self.eq_group.is_expr_constant(expr)
1235 }
1236
1237 /// Retrieves the properties for a given physical expression.
1238 ///
1239 /// This function constructs an [`ExprProperties`] object for the given
1240 /// expression, which encapsulates information about the expression's
1241 /// properties, including its [`SortProperties`] and [`Interval`].
1242 ///
1243 /// # Parameters
1244 ///
1245 /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1246 /// for which ordering information is sought.
1247 ///
1248 /// # Returns
1249 ///
1250 /// Returns an [`ExprProperties`] object containing the ordering and range
1251 /// information for the given expression.
1252 pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1253 ExprPropertiesNode::new_unknown(expr)
1254 .transform_up(|expr| update_properties(expr, self))
1255 .data()
1256 .map(|node| node.data)
1257 .unwrap_or_else(|_| ExprProperties::new_unknown())
1258 }
1259
1260 /// Transforms this `EquivalenceProperties` by mapping columns in the
1261 /// original schema to columns in the new schema by index.
1262 pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
1263 // The new schema and the original schema is aligned when they have the
1264 // same number of columns, and fields at the same index have the same
1265 // type in both schemas.
1266 let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1267 && self
1268 .schema
1269 .fields
1270 .iter()
1271 .zip(schema.fields.iter())
1272 .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1273 if !schemas_aligned {
1274 // Rewriting equivalence properties in terms of new schema is not
1275 // safe when schemas are not aligned:
1276 return plan_err!(
1277 "Schemas have to be aligned to rewrite equivalences:\n Old schema: {:?}\n New schema: {:?}",
1278 self.schema,
1279 schema
1280 );
1281 }
1282
1283 // Rewrite equivalence classes according to the new schema:
1284 let mut eq_classes = vec![];
1285 for mut eq_class in self.eq_group {
1286 // Rewrite the expressions in the equivalence class:
1287 eq_class.exprs = eq_class
1288 .exprs
1289 .into_iter()
1290 .map(|expr| with_new_schema(expr, &schema))
1291 .collect::<Result<_>>()?;
1292 // Rewrite the constant value (if available and known):
1293 let data_type = eq_class
1294 .canonical_expr()
1295 .map(|e| e.data_type(&schema))
1296 .transpose()?;
1297 if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
1298 (data_type, &mut eq_class.constant)
1299 {
1300 *value = value.cast_to(&data_type)?;
1301 }
1302 eq_classes.push(eq_class);
1303 }
1304 self.eq_group = eq_classes.into();
1305
1306 // Rewrite orderings according to new schema:
1307 self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
1308 self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;
1309
1310 // Update the schema:
1311 self.schema = schema;
1312
1313 Ok(self)
1314 }
1315}
1316
1317impl From<EquivalenceProperties> for OrderingEquivalenceClass {
1318 fn from(eq_properties: EquivalenceProperties) -> Self {
1319 eq_properties.oeq_class
1320 }
1321}
1322
1323/// More readable display version of the `EquivalenceProperties`.
1324///
1325/// Format:
1326/// ```text
1327/// order: [[b@1 ASC NULLS LAST]], eq: [{members: [a@0], constant: (heterogeneous)}]
1328/// ```
1329impl Display for EquivalenceProperties {
1330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1331 let empty_eq_group = self.eq_group.is_empty();
1332 let empty_oeq_class = self.oeq_class.is_empty();
1333 if empty_oeq_class && empty_eq_group {
1334 write!(f, "No properties")?;
1335 } else if !empty_oeq_class {
1336 write!(f, "order: {}", self.oeq_class)?;
1337 if !empty_eq_group {
1338 write!(f, ", eq: {}", self.eq_group)?;
1339 }
1340 } else {
1341 write!(f, "eq: {}", self.eq_group)?;
1342 }
1343 Ok(())
1344 }
1345}
1346
1347/// Calculates the properties of a given [`ExprPropertiesNode`].
1348///
1349/// Order information can be retrieved as:
1350/// - If it is a leaf node, we directly find the order of the node by looking
1351/// at the given sort expression and equivalence properties if it is a `Column`
1352/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1353/// it as singleton so that it can cooperate with all ordered columns.
1354/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1355/// and operator has its own rules on how to propagate the children orderings.
1356/// However, before we engage in recursion, we check whether this intermediate
1357/// node directly matches with the sort expression. If there is a match, the
1358/// sort expression emerges at that node immediately, discarding the recursive
1359/// result coming from its children.
1360///
1361/// Range information is calculated as:
1362/// - If it is a `Literal` node, we set the range as a point value. If it is a
1363/// `Column` node, we set the datatype of the range, but cannot give an interval
1364/// for the range, yet.
1365/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1366/// and operator has its own rules on how to propagate the children range.
1367fn update_properties(
1368 mut node: ExprPropertiesNode,
1369 eq_properties: &EquivalenceProperties,
1370) -> Result<Transformed<ExprPropertiesNode>> {
1371 // First, try to gather the information from the children:
1372 if !node.expr.children().is_empty() {
1373 // We have an intermediate (non-leaf) node, account for its children:
1374 let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1375 node.data = node.expr.get_properties(&children_props)?;
1376 } else if node.expr.as_any().is::<Literal>() {
1377 // We have a Literal, which is one of the two possible leaf node types:
1378 node.data = node.expr.get_properties(&[])?;
1379 } else if node.expr.as_any().is::<Column>() {
1380 // We have a Column, which is the other possible leaf node type:
1381 node.data.range =
1382 Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1383 }
1384 // Now, check what we know about orderings:
1385 let normal_expr = eq_properties
1386 .eq_group
1387 .normalize_expr(Arc::clone(&node.expr));
1388 let oeq_class = &eq_properties.oeq_cache.normal_cls;
1389 if eq_properties.is_expr_constant(&normal_expr).is_some()
1390 || oeq_class.is_expr_partial_const(&normal_expr)
1391 {
1392 node.data.sort_properties = SortProperties::Singleton;
1393 } else if let Some(options) = oeq_class.get_options(&normal_expr) {
1394 node.data.sort_properties = SortProperties::Ordered(options);
1395 }
1396 Ok(Transformed::yes(node))
1397}
1398
1399/// This function examines whether a referring expression directly refers to a
1400/// given referred expression or if any of its children in the expression tree
1401/// refer to the specified expression.
1402///
1403/// # Parameters
1404///
1405/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1406/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1407///
1408/// # Returns
1409///
1410/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1411/// `referred_expr` or not.
1412fn expr_refers(
1413 referring_expr: &Arc<dyn PhysicalExpr>,
1414 referred_expr: &Arc<dyn PhysicalExpr>,
1415) -> bool {
1416 referring_expr.eq(referred_expr)
1417 || referring_expr
1418 .children()
1419 .iter()
1420 .any(|child| expr_refers(child, referred_expr))
1421}
1422
1423/// This function examines the given expression and its properties to determine
1424/// the ordering properties of the expression. The range knowledge is not utilized
1425/// yet in the scope of this function.
1426///
1427/// # Parameters
1428///
1429/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1430/// which ordering properties need to be determined.
1431/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1432/// referred to by `expr`.
1433/// - `schema``: A reference to the schema which the `expr` columns refer.
1434///
1435/// # Returns
1436///
1437/// A `SortProperties` indicating the ordering information of the given expression.
1438fn get_expr_properties(
1439 expr: &Arc<dyn PhysicalExpr>,
1440 dependencies: &Dependencies,
1441 schema: &SchemaRef,
1442) -> Result<ExprProperties> {
1443 if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1444 // If exact match is found, return its ordering.
1445 Ok(ExprProperties {
1446 sort_properties: SortProperties::Ordered(column_order.options),
1447 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1448 preserves_lex_ordering: false,
1449 })
1450 } else if expr.as_any().downcast_ref::<Column>().is_some() {
1451 Ok(ExprProperties {
1452 sort_properties: SortProperties::Unordered,
1453 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1454 preserves_lex_ordering: false,
1455 })
1456 } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1457 Ok(ExprProperties {
1458 sort_properties: SortProperties::Singleton,
1459 range: literal.value().into(),
1460 preserves_lex_ordering: true,
1461 })
1462 } else {
1463 // Find orderings of its children
1464 let child_states = expr
1465 .children()
1466 .iter()
1467 .map(|child| get_expr_properties(child, dependencies, schema))
1468 .collect::<Result<Vec<_>>>()?;
1469 // Calculate expression ordering using ordering of its children.
1470 expr.get_properties(&child_states)
1471 }
1472}