datafusion_physical_expr_common/
sort_expr.rs1use crate::physical_expr::{fmt_sql, PhysicalExpr};
21use std::fmt;
22use std::fmt::{Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
25use std::sync::{Arc, LazyLock};
26use std::vec::IntoIter;
27
28use arrow::compute::kernels::sort::{SortColumn, SortOptions};
29use arrow::datatypes::Schema;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::Result;
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use itertools::Itertools;
34
35#[derive(Clone, Debug)]
79pub struct PhysicalSortExpr {
80 pub expr: Arc<dyn PhysicalExpr>,
82 pub options: SortOptions,
84}
85
86impl PhysicalSortExpr {
87 pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
89 Self { expr, options }
90 }
91
92 pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
94 Self::new(expr, SortOptions::default())
95 }
96
97 pub fn asc(mut self) -> Self {
99 self.options.descending = false;
100 self
101 }
102
103 pub fn desc(mut self) -> Self {
105 self.options.descending = true;
106 self
107 }
108
109 pub fn nulls_first(mut self) -> Self {
111 self.options.nulls_first = true;
112 self
113 }
114
115 pub fn nulls_last(mut self) -> Self {
117 self.options.nulls_first = false;
118 self
119 }
120
121 pub fn fmt_sql(&self, f: &mut Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{} {}",
126 fmt_sql(self.expr.as_ref()),
127 to_str(&self.options)
128 )
129 }
130}
131
132impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
134 fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
135 self.expr.as_ref()
136 }
137}
138
139impl PartialEq for PhysicalSortExpr {
140 fn eq(&self, other: &PhysicalSortExpr) -> bool {
141 self.options == other.options && self.expr.eq(&other.expr)
142 }
143}
144
145impl Eq for PhysicalSortExpr {}
146
147impl Hash for PhysicalSortExpr {
148 fn hash<H: Hasher>(&self, state: &mut H) {
149 self.expr.hash(state);
150 self.options.hash(state);
151 }
152}
153
154impl Display for PhysicalSortExpr {
155 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
156 write!(f, "{} {}", self.expr, to_str(&self.options))
157 }
158}
159
160impl PhysicalSortExpr {
161 pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
163 let value_to_sort = self.expr.evaluate(batch)?;
164 let array_to_sort = match value_to_sort {
165 ColumnarValue::Array(array) => array,
166 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
167 };
168 Ok(SortColumn {
169 values: array_to_sort,
170 options: Some(self.options),
171 })
172 }
173
174 pub fn satisfy(
178 &self,
179 requirement: &PhysicalSortRequirement,
180 schema: &Schema,
181 ) -> bool {
182 let nullable = self.expr.nullable(schema).unwrap_or(true);
184 self.expr.eq(&requirement.expr)
185 && if nullable {
186 requirement.options.is_none_or(|opts| self.options == opts)
187 } else {
188 requirement
189 .options
190 .is_none_or(|opts| self.options.descending == opts.descending)
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
215pub struct PhysicalSortRequirement {
216 pub expr: Arc<dyn PhysicalExpr>,
218 pub options: Option<SortOptions>,
221}
222
223impl From<PhysicalSortRequirement> for PhysicalSortExpr {
224 fn from(value: PhysicalSortRequirement) -> Self {
229 let options = value.options.unwrap_or(SortOptions {
230 descending: false,
231 nulls_first: false,
232 });
233 PhysicalSortExpr::new(value.expr, options)
234 }
235}
236
237impl From<PhysicalSortExpr> for PhysicalSortRequirement {
238 fn from(value: PhysicalSortExpr) -> Self {
239 PhysicalSortRequirement::new(value.expr, Some(value.options))
240 }
241}
242
243impl PartialEq for PhysicalSortRequirement {
244 fn eq(&self, other: &PhysicalSortRequirement) -> bool {
245 self.options == other.options && self.expr.eq(&other.expr)
246 }
247}
248
249impl Display for PhysicalSortRequirement {
250 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
251 let opts_string = self.options.as_ref().map_or("NA", to_str);
252 write!(f, "{} {}", self.expr, opts_string)
253 }
254}
255
256pub fn format_physical_sort_requirement_list(
260 exprs: &[PhysicalSortRequirement],
261) -> impl Display + '_ {
262 struct DisplayWrapper<'a>(&'a [PhysicalSortRequirement]);
263 impl Display for DisplayWrapper<'_> {
264 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
265 let mut iter = self.0.iter();
266 write!(f, "[")?;
267 if let Some(expr) = iter.next() {
268 write!(f, "{}", expr)?;
269 }
270 for expr in iter {
271 write!(f, ", {}", expr)?;
272 }
273 write!(f, "]")?;
274 Ok(())
275 }
276 }
277 DisplayWrapper(exprs)
278}
279
280impl PhysicalSortRequirement {
281 pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
291 Self { expr, options }
292 }
293
294 pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
296 self.expr = expr;
297 self
298 }
299
300 pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
302 self.expr.eq(&other.expr)
303 && other
304 .options
305 .is_none_or(|other_opts| self.options == Some(other_opts))
306 }
307
308 #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
309 pub fn from_sort_exprs<'a>(
310 ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
311 ) -> LexRequirement {
312 let ordering = ordering.into_iter().cloned().collect();
313 LexRequirement::from_lex_ordering(ordering)
314 }
315 #[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
316 pub fn to_sort_exprs(
317 requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
318 ) -> LexOrdering {
319 let requirements = requirements.into_iter().collect();
320 LexOrdering::from_lex_requirement(requirements)
321 }
322}
323
324#[inline]
326fn to_str(options: &SortOptions) -> &str {
327 match (options.descending, options.nulls_first) {
328 (true, true) => "DESC",
329 (true, false) => "DESC NULLS LAST",
330 (false, true) => "ASC",
331 (false, false) => "ASC NULLS LAST",
332 }
333}
334
335#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
342pub struct LexOrdering {
343 inner: Vec<PhysicalSortExpr>,
344}
345
346impl AsRef<LexOrdering> for LexOrdering {
347 fn as_ref(&self) -> &LexOrdering {
348 self
349 }
350}
351
352impl LexOrdering {
353 pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
355 Self { inner }
356 }
357
358 pub fn empty() -> &'static LexOrdering {
360 static EMPTY_ORDER: LazyLock<LexOrdering> = LazyLock::new(LexOrdering::default);
361 &EMPTY_ORDER
362 }
363
364 pub fn capacity(&self) -> usize {
367 self.inner.capacity()
368 }
369
370 pub fn clear(&mut self) {
372 self.inner.clear()
373 }
374
375 pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
377 self.inner
378 }
379
380 pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
382 self.inner.contains(expr)
383 }
384
385 pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
387 self.inner.extend(iter)
388 }
389
390 pub fn retain<F>(&mut self, f: F)
392 where
393 F: FnMut(&PhysicalSortExpr) -> bool,
394 {
395 self.inner.retain(f)
396 }
397
398 pub fn is_empty(&self) -> bool {
400 self.inner.is_empty()
401 }
402
403 pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
405 self.inner.iter()
406 }
407
408 pub fn len(&self) -> usize {
410 self.inner.len()
411 }
412
413 pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
415 self.inner.pop()
416 }
417
418 pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
420 self.inner.push(physical_sort_expr)
421 }
422
423 pub fn truncate(&mut self, len: usize) {
425 self.inner.truncate(len)
426 }
427
428 pub fn merge(mut self, other: LexOrdering) -> Self {
430 self.inner = self.inner.into_iter().chain(other).unique().collect();
431 self
432 }
433
434 pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
443 requirement
444 .into_iter()
445 .map(PhysicalSortExpr::from)
446 .collect()
447 }
448
449 pub fn collapse(self) -> Self {
456 let mut output = LexOrdering::default();
457 for item in self {
458 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
459 output.push(item);
460 }
461 }
462 output
463 }
464
465 pub fn transform<F>(&mut self, f: F)
468 where
469 F: FnMut(&mut PhysicalSortExpr),
470 {
471 self.inner.iter_mut().for_each(f);
472 }
473}
474
475impl From<Vec<PhysicalSortExpr>> for LexOrdering {
476 fn from(value: Vec<PhysicalSortExpr>) -> Self {
477 Self::new(value)
478 }
479}
480
481impl From<LexRequirement> for LexOrdering {
482 fn from(value: LexRequirement) -> Self {
483 Self::from_lex_requirement(value)
484 }
485}
486
487impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
489 fn from(value: LexOrdering) -> Self {
490 value.inner.into()
491 }
492}
493
494impl Deref for LexOrdering {
495 type Target = [PhysicalSortExpr];
496
497 fn deref(&self) -> &Self::Target {
498 self.inner.as_slice()
499 }
500}
501
502impl Display for LexOrdering {
503 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
504 let mut first = true;
505 for sort_expr in &self.inner {
506 if first {
507 first = false;
508 } else {
509 write!(f, ", ")?;
510 }
511 write!(f, "{}", sort_expr)?;
512 }
513 Ok(())
514 }
515}
516
517impl FromIterator<PhysicalSortExpr> for LexOrdering {
518 fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
519 let mut lex_ordering = LexOrdering::default();
520
521 for i in iter {
522 lex_ordering.push(i);
523 }
524
525 lex_ordering
526 }
527}
528
529impl Index<usize> for LexOrdering {
530 type Output = PhysicalSortExpr;
531
532 fn index(&self, index: usize) -> &Self::Output {
533 &self.inner[index]
534 }
535}
536
537impl Index<Range<usize>> for LexOrdering {
538 type Output = [PhysicalSortExpr];
539
540 fn index(&self, range: Range<usize>) -> &Self::Output {
541 &self.inner[range]
542 }
543}
544
545impl Index<RangeFrom<usize>> for LexOrdering {
546 type Output = [PhysicalSortExpr];
547
548 fn index(&self, range_from: RangeFrom<usize>) -> &Self::Output {
549 &self.inner[range_from]
550 }
551}
552
553impl Index<RangeTo<usize>> for LexOrdering {
554 type Output = [PhysicalSortExpr];
555
556 fn index(&self, range_to: RangeTo<usize>) -> &Self::Output {
557 &self.inner[range_to]
558 }
559}
560
561impl IntoIterator for LexOrdering {
562 type Item = PhysicalSortExpr;
563 type IntoIter = IntoIter<PhysicalSortExpr>;
564
565 fn into_iter(self) -> Self::IntoIter {
566 self.inner.into_iter()
567 }
568}
569
570#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
573pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
574
575#[derive(Debug, Default, Clone, PartialEq)]
578pub struct LexRequirement {
579 pub inner: Vec<PhysicalSortRequirement>,
580}
581
582impl LexRequirement {
583 pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
584 Self { inner }
585 }
586
587 pub fn is_empty(&self) -> bool {
588 self.inner.is_empty()
589 }
590
591 pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
592 self.inner.iter()
593 }
594
595 pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
596 self.inner.push(physical_sort_requirement)
597 }
598
599 pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
604 Self::new(
605 ordering
606 .into_iter()
607 .map(PhysicalSortRequirement::from)
608 .collect(),
609 )
610 }
611
612 pub fn collapse(self) -> Self {
618 let mut output = Vec::<PhysicalSortRequirement>::new();
619 for item in self {
620 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
621 output.push(item);
622 }
623 }
624 LexRequirement::new(output)
625 }
626}
627
628impl From<LexOrdering> for LexRequirement {
629 fn from(value: LexOrdering) -> Self {
630 Self::from_lex_ordering(value)
631 }
632}
633
634impl Deref for LexRequirement {
635 type Target = [PhysicalSortRequirement];
636
637 fn deref(&self) -> &Self::Target {
638 self.inner.as_slice()
639 }
640}
641
642impl FromIterator<PhysicalSortRequirement> for LexRequirement {
643 fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
644 let mut lex_requirement = LexRequirement::new(vec![]);
645
646 for i in iter {
647 lex_requirement.inner.push(i);
648 }
649
650 lex_requirement
651 }
652}
653
654impl IntoIterator for LexRequirement {
655 type Item = PhysicalSortRequirement;
656 type IntoIter = IntoIter<Self::Item>;
657
658 fn into_iter(self) -> Self::IntoIter {
659 self.inner.into_iter()
660 }
661}
662
663impl<'a> IntoIterator for &'a LexOrdering {
664 type Item = &'a PhysicalSortExpr;
665 type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
666
667 fn into_iter(self) -> Self::IntoIter {
668 self.inner.iter()
669 }
670}
671
672pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];