datafusion_physical_expr_common/
sort_expr.rs1use crate::physical_expr::PhysicalExpr;
21use std::fmt;
22use std::fmt::{Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
25use std::sync::{Arc, LazyLock};
26use std::vec::IntoIter;
27
28use arrow::compute::kernels::sort::{SortColumn, SortOptions};
29use arrow::datatypes::Schema;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::Result;
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use itertools::Itertools;
34
35#[derive(Clone, Debug)]
78pub struct PhysicalSortExpr {
79 pub expr: Arc<dyn PhysicalExpr>,
81 pub options: SortOptions,
83}
84
85impl PhysicalSortExpr {
86 pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
88 Self { expr, options }
89 }
90
91 pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
93 Self::new(expr, SortOptions::default())
94 }
95
96 pub fn asc(mut self) -> Self {
98 self.options.descending = false;
99 self
100 }
101
102 pub fn desc(mut self) -> Self {
104 self.options.descending = true;
105 self
106 }
107
108 pub fn nulls_first(mut self) -> Self {
110 self.options.nulls_first = true;
111 self
112 }
113
114 pub fn nulls_last(mut self) -> Self {
116 self.options.nulls_first = false;
117 self
118 }
119}
120
121impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
123 fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
124 self.expr.as_ref()
125 }
126}
127
128impl PartialEq for PhysicalSortExpr {
129 fn eq(&self, other: &PhysicalSortExpr) -> bool {
130 self.options == other.options && self.expr.eq(&other.expr)
131 }
132}
133
134impl Eq for PhysicalSortExpr {}
135
136impl Hash for PhysicalSortExpr {
137 fn hash<H: Hasher>(&self, state: &mut H) {
138 self.expr.hash(state);
139 self.options.hash(state);
140 }
141}
142
143impl Display for PhysicalSortExpr {
144 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
145 write!(f, "{} {}", self.expr, to_str(&self.options))
146 }
147}
148
149impl PhysicalSortExpr {
150 pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
152 let value_to_sort = self.expr.evaluate(batch)?;
153 let array_to_sort = match value_to_sort {
154 ColumnarValue::Array(array) => array,
155 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
156 };
157 Ok(SortColumn {
158 values: array_to_sort,
159 options: Some(self.options),
160 })
161 }
162
163 pub fn satisfy(
167 &self,
168 requirement: &PhysicalSortRequirement,
169 schema: &Schema,
170 ) -> bool {
171 let nullable = self.expr.nullable(schema).unwrap_or(true);
173 self.expr.eq(&requirement.expr)
174 && if nullable {
175 requirement.options.is_none_or(|opts| self.options == opts)
176 } else {
177 requirement
178 .options
179 .is_none_or(|opts| self.options.descending == opts.descending)
180 }
181 }
182}
183
184#[derive(Clone, Debug)]
204pub struct PhysicalSortRequirement {
205 pub expr: Arc<dyn PhysicalExpr>,
207 pub options: Option<SortOptions>,
210}
211
212impl From<PhysicalSortRequirement> for PhysicalSortExpr {
213 fn from(value: PhysicalSortRequirement) -> Self {
218 let options = value.options.unwrap_or(SortOptions {
219 descending: false,
220 nulls_first: false,
221 });
222 PhysicalSortExpr::new(value.expr, options)
223 }
224}
225
226impl From<PhysicalSortExpr> for PhysicalSortRequirement {
227 fn from(value: PhysicalSortExpr) -> Self {
228 PhysicalSortRequirement::new(value.expr, Some(value.options))
229 }
230}
231
232impl PartialEq for PhysicalSortRequirement {
233 fn eq(&self, other: &PhysicalSortRequirement) -> bool {
234 self.options == other.options && self.expr.eq(&other.expr)
235 }
236}
237
238impl Display for PhysicalSortRequirement {
239 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
240 let opts_string = self.options.as_ref().map_or("NA", to_str);
241 write!(f, "{} {}", self.expr, opts_string)
242 }
243}
244
245pub fn format_physical_sort_requirement_list(
249 exprs: &[PhysicalSortRequirement],
250) -> impl Display + '_ {
251 struct DisplayWrapper<'a>(&'a [PhysicalSortRequirement]);
252 impl Display for DisplayWrapper<'_> {
253 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
254 let mut iter = self.0.iter();
255 write!(f, "[")?;
256 if let Some(expr) = iter.next() {
257 write!(f, "{}", expr)?;
258 }
259 for expr in iter {
260 write!(f, ", {}", expr)?;
261 }
262 write!(f, "]")?;
263 Ok(())
264 }
265 }
266 DisplayWrapper(exprs)
267}
268
269impl PhysicalSortRequirement {
270 pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
280 Self { expr, options }
281 }
282
283 pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
285 self.expr = expr;
286 self
287 }
288
289 pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
291 self.expr.eq(&other.expr)
292 && other
293 .options
294 .is_none_or(|other_opts| self.options == Some(other_opts))
295 }
296
297 #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
298 pub fn from_sort_exprs<'a>(
299 ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
300 ) -> LexRequirement {
301 let ordering = ordering.into_iter().cloned().collect();
302 LexRequirement::from_lex_ordering(ordering)
303 }
304 #[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
305 pub fn to_sort_exprs(
306 requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
307 ) -> LexOrdering {
308 let requirements = requirements.into_iter().collect();
309 LexOrdering::from_lex_requirement(requirements)
310 }
311}
312
313#[inline]
315fn to_str(options: &SortOptions) -> &str {
316 match (options.descending, options.nulls_first) {
317 (true, true) => "DESC",
318 (true, false) => "DESC NULLS LAST",
319 (false, true) => "ASC",
320 (false, false) => "ASC NULLS LAST",
321 }
322}
323
324#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
331pub struct LexOrdering {
332 inner: Vec<PhysicalSortExpr>,
333}
334
335impl AsRef<LexOrdering> for LexOrdering {
336 fn as_ref(&self) -> &LexOrdering {
337 self
338 }
339}
340
341impl LexOrdering {
342 pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
344 Self { inner }
345 }
346
347 pub fn empty() -> &'static LexOrdering {
349 static EMPTY_ORDER: LazyLock<LexOrdering> = LazyLock::new(LexOrdering::default);
350 &EMPTY_ORDER
351 }
352
353 pub fn capacity(&self) -> usize {
356 self.inner.capacity()
357 }
358
359 pub fn clear(&mut self) {
361 self.inner.clear()
362 }
363
364 pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
366 self.inner
367 }
368
369 pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
371 self.inner.contains(expr)
372 }
373
374 pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
376 self.inner.extend(iter)
377 }
378
379 pub fn retain<F>(&mut self, f: F)
381 where
382 F: FnMut(&PhysicalSortExpr) -> bool,
383 {
384 self.inner.retain(f)
385 }
386
387 pub fn is_empty(&self) -> bool {
389 self.inner.is_empty()
390 }
391
392 pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
394 self.inner.iter()
395 }
396
397 pub fn len(&self) -> usize {
399 self.inner.len()
400 }
401
402 pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
404 self.inner.pop()
405 }
406
407 pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
409 self.inner.push(physical_sort_expr)
410 }
411
412 pub fn truncate(&mut self, len: usize) {
414 self.inner.truncate(len)
415 }
416
417 pub fn merge(mut self, other: LexOrdering) -> Self {
419 self.inner = self.inner.into_iter().chain(other).unique().collect();
420 self
421 }
422
423 pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
432 requirement
433 .into_iter()
434 .map(PhysicalSortExpr::from)
435 .collect()
436 }
437
438 pub fn collapse(self) -> Self {
445 let mut output = LexOrdering::default();
446 for item in self {
447 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
448 output.push(item);
449 }
450 }
451 output
452 }
453
454 pub fn transform<F>(&mut self, f: F)
457 where
458 F: FnMut(&mut PhysicalSortExpr),
459 {
460 self.inner.iter_mut().for_each(f);
461 }
462}
463
464impl From<Vec<PhysicalSortExpr>> for LexOrdering {
465 fn from(value: Vec<PhysicalSortExpr>) -> Self {
466 Self::new(value)
467 }
468}
469
470impl From<LexRequirement> for LexOrdering {
471 fn from(value: LexRequirement) -> Self {
472 Self::from_lex_requirement(value)
473 }
474}
475
476impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
478 fn from(value: LexOrdering) -> Self {
479 value.inner.into()
480 }
481}
482
483impl Deref for LexOrdering {
484 type Target = [PhysicalSortExpr];
485
486 fn deref(&self) -> &Self::Target {
487 self.inner.as_slice()
488 }
489}
490
491impl Display for LexOrdering {
492 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
493 let mut first = true;
494 for sort_expr in &self.inner {
495 if first {
496 first = false;
497 } else {
498 write!(f, ", ")?;
499 }
500 write!(f, "{}", sort_expr)?;
501 }
502 Ok(())
503 }
504}
505
506impl FromIterator<PhysicalSortExpr> for LexOrdering {
507 fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
508 let mut lex_ordering = LexOrdering::default();
509
510 for i in iter {
511 lex_ordering.push(i);
512 }
513
514 lex_ordering
515 }
516}
517
518impl Index<usize> for LexOrdering {
519 type Output = PhysicalSortExpr;
520
521 fn index(&self, index: usize) -> &Self::Output {
522 &self.inner[index]
523 }
524}
525
526impl Index<Range<usize>> for LexOrdering {
527 type Output = [PhysicalSortExpr];
528
529 fn index(&self, range: Range<usize>) -> &Self::Output {
530 &self.inner[range]
531 }
532}
533
534impl Index<RangeFrom<usize>> for LexOrdering {
535 type Output = [PhysicalSortExpr];
536
537 fn index(&self, range_from: RangeFrom<usize>) -> &Self::Output {
538 &self.inner[range_from]
539 }
540}
541
542impl Index<RangeTo<usize>> for LexOrdering {
543 type Output = [PhysicalSortExpr];
544
545 fn index(&self, range_to: RangeTo<usize>) -> &Self::Output {
546 &self.inner[range_to]
547 }
548}
549
550impl IntoIterator for LexOrdering {
551 type Item = PhysicalSortExpr;
552 type IntoIter = IntoIter<PhysicalSortExpr>;
553
554 fn into_iter(self) -> Self::IntoIter {
555 self.inner.into_iter()
556 }
557}
558
559#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
562pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
563
564#[derive(Debug, Default, Clone, PartialEq)]
567pub struct LexRequirement {
568 pub inner: Vec<PhysicalSortRequirement>,
569}
570
571impl LexRequirement {
572 pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
573 Self { inner }
574 }
575
576 pub fn is_empty(&self) -> bool {
577 self.inner.is_empty()
578 }
579
580 pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
581 self.inner.iter()
582 }
583
584 pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
585 self.inner.push(physical_sort_requirement)
586 }
587
588 pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
593 Self::new(
594 ordering
595 .into_iter()
596 .map(PhysicalSortRequirement::from)
597 .collect(),
598 )
599 }
600
601 pub fn collapse(self) -> Self {
607 let mut output = Vec::<PhysicalSortRequirement>::new();
608 for item in self {
609 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
610 output.push(item);
611 }
612 }
613 LexRequirement::new(output)
614 }
615}
616
617impl From<LexOrdering> for LexRequirement {
618 fn from(value: LexOrdering) -> Self {
619 Self::from_lex_ordering(value)
620 }
621}
622
623impl Deref for LexRequirement {
624 type Target = [PhysicalSortRequirement];
625
626 fn deref(&self) -> &Self::Target {
627 self.inner.as_slice()
628 }
629}
630
631impl FromIterator<PhysicalSortRequirement> for LexRequirement {
632 fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
633 let mut lex_requirement = LexRequirement::new(vec![]);
634
635 for i in iter {
636 lex_requirement.inner.push(i);
637 }
638
639 lex_requirement
640 }
641}
642
643impl IntoIterator for LexRequirement {
644 type Item = PhysicalSortRequirement;
645 type IntoIter = IntoIter<Self::Item>;
646
647 fn into_iter(self) -> Self::IntoIter {
648 self.inner.into_iter()
649 }
650}
651
652impl<'a> IntoIterator for &'a LexOrdering {
653 type Item = &'a PhysicalSortExpr;
654 type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
655
656 fn into_iter(self) -> Self::IntoIter {
657 self.inner.iter()
658 }
659}
660
661pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];