datafusion_physical_expr_common/
sort_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort expressions
19
20use crate::physical_expr::PhysicalExpr;
21use std::fmt;
22use std::fmt::{Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
25use std::sync::{Arc, LazyLock};
26use std::vec::IntoIter;
27
28use arrow::compute::kernels::sort::{SortColumn, SortOptions};
29use arrow::datatypes::Schema;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::Result;
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use itertools::Itertools;
34
35/// Represents Sort operation for a column in a RecordBatch
36///
37/// Example:
38/// ```
39/// # use std::any::Any;
40/// # use std::fmt::Display;
41/// # use std::hash::Hasher;
42/// # use std::sync::Arc;
43/// # use arrow::array::RecordBatch;
44/// # use datafusion_common::Result;
45/// # use arrow::compute::SortOptions;
46/// # use arrow::datatypes::{DataType, Schema};
47/// # use datafusion_expr_common::columnar_value::ColumnarValue;
48/// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
49/// # use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
50/// # // this crate doesn't have a physical expression implementation
51/// # // so make a really simple one
52/// # #[derive(Clone, Debug, PartialEq, Eq, Hash)]
53/// # struct MyPhysicalExpr;
54/// # impl PhysicalExpr for MyPhysicalExpr {
55/// #  fn as_any(&self) -> &dyn Any {todo!() }
56/// #  fn data_type(&self, input_schema: &Schema) -> Result<DataType> {todo!()}
57/// #  fn nullable(&self, input_schema: &Schema) -> Result<bool> {todo!() }
58/// #  fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {todo!() }
59/// #  fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {todo!()}
60/// #  fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> {todo!()}
61/// # }
62/// # impl Display for MyPhysicalExpr {
63/// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "a") }
64/// # }
65/// # fn col(name: &str) -> Arc<dyn PhysicalExpr> { Arc::new(MyPhysicalExpr) }
66/// // Sort by a ASC
67/// let options = SortOptions::default();
68/// let sort_expr = PhysicalSortExpr::new(col("a"), options);
69/// assert_eq!(sort_expr.to_string(), "a ASC");
70///
71/// // Sort by a DESC NULLS LAST
72/// let sort_expr = PhysicalSortExpr::new_default(col("a"))
73///   .desc()
74///   .nulls_last();
75/// assert_eq!(sort_expr.to_string(), "a DESC NULLS LAST");
76/// ```
77#[derive(Clone, Debug)]
78pub struct PhysicalSortExpr {
79    /// Physical expression representing the column to sort
80    pub expr: Arc<dyn PhysicalExpr>,
81    /// Option to specify how the given column should be sorted
82    pub options: SortOptions,
83}
84
85impl PhysicalSortExpr {
86    /// Create a new PhysicalSortExpr
87    pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
88        Self { expr, options }
89    }
90
91    /// Create a new PhysicalSortExpr with default [`SortOptions`]
92    pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
93        Self::new(expr, SortOptions::default())
94    }
95
96    /// Set the sort sort options to ASC
97    pub fn asc(mut self) -> Self {
98        self.options.descending = false;
99        self
100    }
101
102    /// Set the sort sort options to DESC
103    pub fn desc(mut self) -> Self {
104        self.options.descending = true;
105        self
106    }
107
108    /// Set the sort sort options to NULLS FIRST
109    pub fn nulls_first(mut self) -> Self {
110        self.options.nulls_first = true;
111        self
112    }
113
114    /// Set the sort sort options to NULLS LAST
115    pub fn nulls_last(mut self) -> Self {
116        self.options.nulls_first = false;
117        self
118    }
119}
120
121/// Access the PhysicalSortExpr as a PhysicalExpr
122impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
123    fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
124        self.expr.as_ref()
125    }
126}
127
128impl PartialEq for PhysicalSortExpr {
129    fn eq(&self, other: &PhysicalSortExpr) -> bool {
130        self.options == other.options && self.expr.eq(&other.expr)
131    }
132}
133
134impl Eq for PhysicalSortExpr {}
135
136impl Hash for PhysicalSortExpr {
137    fn hash<H: Hasher>(&self, state: &mut H) {
138        self.expr.hash(state);
139        self.options.hash(state);
140    }
141}
142
143impl Display for PhysicalSortExpr {
144    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
145        write!(f, "{} {}", self.expr, to_str(&self.options))
146    }
147}
148
149impl PhysicalSortExpr {
150    /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
151    pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
152        let value_to_sort = self.expr.evaluate(batch)?;
153        let array_to_sort = match value_to_sort {
154            ColumnarValue::Array(array) => array,
155            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
156        };
157        Ok(SortColumn {
158            values: array_to_sort,
159            options: Some(self.options),
160        })
161    }
162
163    /// Checks whether this sort expression satisfies the given `requirement`.
164    /// If sort options are unspecified in `requirement`, only expressions are
165    /// compared for inequality.
166    pub fn satisfy(
167        &self,
168        requirement: &PhysicalSortRequirement,
169        schema: &Schema,
170    ) -> bool {
171        // If the column is not nullable, NULLS FIRST/LAST is not important.
172        let nullable = self.expr.nullable(schema).unwrap_or(true);
173        self.expr.eq(&requirement.expr)
174            && if nullable {
175                requirement.options.is_none_or(|opts| self.options == opts)
176            } else {
177                requirement
178                    .options
179                    .is_none_or(|opts| self.options.descending == opts.descending)
180            }
181    }
182}
183
184/// Represents sort requirement associated with a plan
185///
186/// If the requirement includes [`SortOptions`] then both the
187/// expression *and* the sort options must match.
188///
189/// If the requirement does not include [`SortOptions`]) then only the
190/// expressions must match.
191///
192/// # Examples
193///
194/// With sort options (`A`, `DESC NULLS FIRST`):
195/// * `ORDER BY A DESC NULLS FIRST` matches
196/// * `ORDER BY A ASC  NULLS FIRST` does not match (`ASC` vs `DESC`)
197/// * `ORDER BY B DESC NULLS FIRST` does not match (different expr)
198///
199/// Without sort options (`A`, None):
200/// * `ORDER BY A DESC NULLS FIRST` matches
201/// * `ORDER BY A ASC  NULLS FIRST` matches (`ASC` and `NULL` options ignored)
202/// * `ORDER BY B DESC NULLS FIRST` does not match  (different expr)
203#[derive(Clone, Debug)]
204pub struct PhysicalSortRequirement {
205    /// Physical expression representing the column to sort
206    pub expr: Arc<dyn PhysicalExpr>,
207    /// Option to specify how the given column should be sorted.
208    /// If unspecified, there are no constraints on sort options.
209    pub options: Option<SortOptions>,
210}
211
212impl From<PhysicalSortRequirement> for PhysicalSortExpr {
213    /// If options is `None`, the default sort options `ASC, NULLS LAST` is used.
214    ///
215    /// The default is picked to be consistent with
216    /// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
217    fn from(value: PhysicalSortRequirement) -> Self {
218        let options = value.options.unwrap_or(SortOptions {
219            descending: false,
220            nulls_first: false,
221        });
222        PhysicalSortExpr::new(value.expr, options)
223    }
224}
225
226impl From<PhysicalSortExpr> for PhysicalSortRequirement {
227    fn from(value: PhysicalSortExpr) -> Self {
228        PhysicalSortRequirement::new(value.expr, Some(value.options))
229    }
230}
231
232impl PartialEq for PhysicalSortRequirement {
233    fn eq(&self, other: &PhysicalSortRequirement) -> bool {
234        self.options == other.options && self.expr.eq(&other.expr)
235    }
236}
237
238impl Display for PhysicalSortRequirement {
239    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
240        let opts_string = self.options.as_ref().map_or("NA", to_str);
241        write!(f, "{} {}", self.expr, opts_string)
242    }
243}
244
245/// Writes a list of [`PhysicalSortRequirement`]s to a `std::fmt::Formatter`.
246///
247/// Example output: `[a + 1, b]`
248pub fn format_physical_sort_requirement_list(
249    exprs: &[PhysicalSortRequirement],
250) -> impl Display + '_ {
251    struct DisplayWrapper<'a>(&'a [PhysicalSortRequirement]);
252    impl Display for DisplayWrapper<'_> {
253        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
254            let mut iter = self.0.iter();
255            write!(f, "[")?;
256            if let Some(expr) = iter.next() {
257                write!(f, "{}", expr)?;
258            }
259            for expr in iter {
260                write!(f, ", {}", expr)?;
261            }
262            write!(f, "]")?;
263            Ok(())
264        }
265    }
266    DisplayWrapper(exprs)
267}
268
269impl PhysicalSortRequirement {
270    /// Creates a new requirement.
271    ///
272    /// If `options` is `Some(..)`, creates an `exact` requirement,
273    /// which must match both `options` and `expr`.
274    ///
275    /// If `options` is `None`, Creates a new `expr_only` requirement,
276    /// which must match only `expr`.
277    ///
278    /// See [`PhysicalSortRequirement`] for examples.
279    pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
280        Self { expr, options }
281    }
282
283    /// Replace the required expression for this requirement with the new one
284    pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
285        self.expr = expr;
286        self
287    }
288
289    /// Returns whether this requirement is equal or more specific than `other`.
290    pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
291        self.expr.eq(&other.expr)
292            && other
293                .options
294                .is_none_or(|other_opts| self.options == Some(other_opts))
295    }
296
297    #[deprecated(since = "43.0.0", note = "use  LexRequirement::from_lex_ordering")]
298    pub fn from_sort_exprs<'a>(
299        ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
300    ) -> LexRequirement {
301        let ordering = ordering.into_iter().cloned().collect();
302        LexRequirement::from_lex_ordering(ordering)
303    }
304    #[deprecated(since = "43.0.0", note = "use  LexOrdering::from_lex_requirement")]
305    pub fn to_sort_exprs(
306        requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
307    ) -> LexOrdering {
308        let requirements = requirements.into_iter().collect();
309        LexOrdering::from_lex_requirement(requirements)
310    }
311}
312
313/// Returns the SQL string representation of the given [SortOptions] object.
314#[inline]
315fn to_str(options: &SortOptions) -> &str {
316    match (options.descending, options.nulls_first) {
317        (true, true) => "DESC",
318        (true, false) => "DESC NULLS LAST",
319        (false, true) => "ASC",
320        (false, false) => "ASC NULLS LAST",
321    }
322}
323
324///`LexOrdering` contains a `Vec<PhysicalSortExpr>`, which represents
325/// a lexicographical ordering.
326///
327/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering
328/// that first sorts by column `a` in ascending order, then by column `b` in
329/// descending order.
330#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
331pub struct LexOrdering {
332    inner: Vec<PhysicalSortExpr>,
333}
334
335impl AsRef<LexOrdering> for LexOrdering {
336    fn as_ref(&self) -> &LexOrdering {
337        self
338    }
339}
340
341impl LexOrdering {
342    /// Creates a new [`LexOrdering`] from a vector
343    pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
344        Self { inner }
345    }
346
347    /// Return an empty LexOrdering (no expressions)
348    pub fn empty() -> &'static LexOrdering {
349        static EMPTY_ORDER: LazyLock<LexOrdering> = LazyLock::new(LexOrdering::default);
350        &EMPTY_ORDER
351    }
352
353    /// Returns the number of elements that can be stored in the LexOrdering
354    /// without reallocating.
355    pub fn capacity(&self) -> usize {
356        self.inner.capacity()
357    }
358
359    /// Clears the LexOrdering, removing all elements.
360    pub fn clear(&mut self) {
361        self.inner.clear()
362    }
363
364    /// Takes ownership of the actual vector of `PhysicalSortExpr`s in the LexOrdering.
365    pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
366        self.inner
367    }
368
369    /// Returns `true` if the LexOrdering contains `expr`
370    pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
371        self.inner.contains(expr)
372    }
373
374    /// Add all elements from `iter` to the LexOrdering.
375    pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
376        self.inner.extend(iter)
377    }
378
379    /// Remove all elements from the LexOrdering where `f` evaluates to `false`.
380    pub fn retain<F>(&mut self, f: F)
381    where
382        F: FnMut(&PhysicalSortExpr) -> bool,
383    {
384        self.inner.retain(f)
385    }
386
387    /// Returns `true` if the LexOrdering contains no elements.
388    pub fn is_empty(&self) -> bool {
389        self.inner.is_empty()
390    }
391
392    /// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering.
393    pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
394        self.inner.iter()
395    }
396
397    /// Returns the number of elements in the LexOrdering.
398    pub fn len(&self) -> usize {
399        self.inner.len()
400    }
401
402    /// Removes the last element from the LexOrdering and returns it, or `None` if it is empty.
403    pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
404        self.inner.pop()
405    }
406
407    /// Appends an element to the back of the LexOrdering.
408    pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
409        self.inner.push(physical_sort_expr)
410    }
411
412    /// Truncates the LexOrdering, keeping only the first `len` elements.
413    pub fn truncate(&mut self, len: usize) {
414        self.inner.truncate(len)
415    }
416
417    /// Merge the contents of `other` into `self`, removing duplicates.
418    pub fn merge(mut self, other: LexOrdering) -> Self {
419        self.inner = self.inner.into_iter().chain(other).unique().collect();
420        self
421    }
422
423    /// Converts a `LexRequirement` into a `LexOrdering`.
424    ///
425    /// This function converts [`PhysicalSortRequirement`] to [`PhysicalSortExpr`]
426    /// for each entry in the input.
427    ///
428    /// If the required ordering is `None` for an entry in `requirement`, the
429    /// default ordering `ASC, NULLS LAST` is used (see
430    /// [`PhysicalSortExpr::from`]).
431    pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
432        requirement
433            .into_iter()
434            .map(PhysicalSortExpr::from)
435            .collect()
436    }
437
438    /// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression.
439    ///
440    /// This function filters  duplicate entries that have same physical
441    /// expression inside, ignoring [`SortOptions`]. For example:
442    ///
443    /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
444    pub fn collapse(self) -> Self {
445        let mut output = LexOrdering::default();
446        for item in self {
447            if !output.iter().any(|req| req.expr.eq(&item.expr)) {
448                output.push(item);
449            }
450        }
451        output
452    }
453
454    /// Transforms each `PhysicalSortExpr` in the `LexOrdering`
455    /// in place using the provided closure `f`.
456    pub fn transform<F>(&mut self, f: F)
457    where
458        F: FnMut(&mut PhysicalSortExpr),
459    {
460        self.inner.iter_mut().for_each(f);
461    }
462}
463
464impl From<Vec<PhysicalSortExpr>> for LexOrdering {
465    fn from(value: Vec<PhysicalSortExpr>) -> Self {
466        Self::new(value)
467    }
468}
469
470impl From<LexRequirement> for LexOrdering {
471    fn from(value: LexRequirement) -> Self {
472        Self::from_lex_requirement(value)
473    }
474}
475
476/// Convert a `LexOrdering` into a `Arc[<PhysicalSortExpr>]` for fast copies
477impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
478    fn from(value: LexOrdering) -> Self {
479        value.inner.into()
480    }
481}
482
483impl Deref for LexOrdering {
484    type Target = [PhysicalSortExpr];
485
486    fn deref(&self) -> &Self::Target {
487        self.inner.as_slice()
488    }
489}
490
491impl Display for LexOrdering {
492    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
493        let mut first = true;
494        for sort_expr in &self.inner {
495            if first {
496                first = false;
497            } else {
498                write!(f, ", ")?;
499            }
500            write!(f, "{}", sort_expr)?;
501        }
502        Ok(())
503    }
504}
505
506impl FromIterator<PhysicalSortExpr> for LexOrdering {
507    fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
508        let mut lex_ordering = LexOrdering::default();
509
510        for i in iter {
511            lex_ordering.push(i);
512        }
513
514        lex_ordering
515    }
516}
517
518impl Index<usize> for LexOrdering {
519    type Output = PhysicalSortExpr;
520
521    fn index(&self, index: usize) -> &Self::Output {
522        &self.inner[index]
523    }
524}
525
526impl Index<Range<usize>> for LexOrdering {
527    type Output = [PhysicalSortExpr];
528
529    fn index(&self, range: Range<usize>) -> &Self::Output {
530        &self.inner[range]
531    }
532}
533
534impl Index<RangeFrom<usize>> for LexOrdering {
535    type Output = [PhysicalSortExpr];
536
537    fn index(&self, range_from: RangeFrom<usize>) -> &Self::Output {
538        &self.inner[range_from]
539    }
540}
541
542impl Index<RangeTo<usize>> for LexOrdering {
543    type Output = [PhysicalSortExpr];
544
545    fn index(&self, range_to: RangeTo<usize>) -> &Self::Output {
546        &self.inner[range_to]
547    }
548}
549
550impl IntoIterator for LexOrdering {
551    type Item = PhysicalSortExpr;
552    type IntoIter = IntoIter<PhysicalSortExpr>;
553
554    fn into_iter(self) -> Self::IntoIter {
555        self.inner.into_iter()
556    }
557}
558
559///`LexOrderingRef` is an alias for the type &`[PhysicalSortExpr]`, which represents
560/// a reference to a lexicographical ordering.
561#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
562pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
563
564///`LexRequirement` is an struct containing a `Vec<PhysicalSortRequirement>`, which
565/// represents a lexicographical ordering requirement.
566#[derive(Debug, Default, Clone, PartialEq)]
567pub struct LexRequirement {
568    pub inner: Vec<PhysicalSortRequirement>,
569}
570
571impl LexRequirement {
572    pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
573        Self { inner }
574    }
575
576    pub fn is_empty(&self) -> bool {
577        self.inner.is_empty()
578    }
579
580    pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
581        self.inner.iter()
582    }
583
584    pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
585        self.inner.push(physical_sort_requirement)
586    }
587
588    /// Create a new [`LexRequirement`] from a [`LexOrdering`]
589    ///
590    /// Returns [`LexRequirement`] that requires the exact
591    /// sort of the [`PhysicalSortExpr`]s in `ordering`
592    pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
593        Self::new(
594            ordering
595                .into_iter()
596                .map(PhysicalSortRequirement::from)
597                .collect(),
598        )
599    }
600
601    /// Constructs a duplicate-free `LexOrderingReq` by filtering out
602    /// duplicate entries that have same physical expression inside.
603    ///
604    /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
605    /// Some(ASC)]`.
606    pub fn collapse(self) -> Self {
607        let mut output = Vec::<PhysicalSortRequirement>::new();
608        for item in self {
609            if !output.iter().any(|req| req.expr.eq(&item.expr)) {
610                output.push(item);
611            }
612        }
613        LexRequirement::new(output)
614    }
615}
616
617impl From<LexOrdering> for LexRequirement {
618    fn from(value: LexOrdering) -> Self {
619        Self::from_lex_ordering(value)
620    }
621}
622
623impl Deref for LexRequirement {
624    type Target = [PhysicalSortRequirement];
625
626    fn deref(&self) -> &Self::Target {
627        self.inner.as_slice()
628    }
629}
630
631impl FromIterator<PhysicalSortRequirement> for LexRequirement {
632    fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
633        let mut lex_requirement = LexRequirement::new(vec![]);
634
635        for i in iter {
636            lex_requirement.inner.push(i);
637        }
638
639        lex_requirement
640    }
641}
642
643impl IntoIterator for LexRequirement {
644    type Item = PhysicalSortRequirement;
645    type IntoIter = IntoIter<Self::Item>;
646
647    fn into_iter(self) -> Self::IntoIter {
648        self.inner.into_iter()
649    }
650}
651
652impl<'a> IntoIterator for &'a LexOrdering {
653    type Item = &'a PhysicalSortExpr;
654    type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
655
656    fn into_iter(self) -> Self::IntoIter {
657        self.inner.iter()
658    }
659}
660
661///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, which
662/// represents a reference to a lexicographical ordering requirement.
663/// #[deprecated(since = "43.0.0", note = "use &LexRequirement instead")]
664pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];