datafusion_physical_expr_common/
sort_expr.rs1use crate::physical_expr::{fmt_sql, PhysicalExpr};
21use std::fmt;
22use std::fmt::{Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
25use std::sync::{Arc, LazyLock};
26use std::vec::IntoIter;
27
28use arrow::compute::kernels::sort::{SortColumn, SortOptions};
29use arrow::datatypes::Schema;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::Result;
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use itertools::Itertools;
34
35#[derive(Clone, Debug)]
81pub struct PhysicalSortExpr {
82 pub expr: Arc<dyn PhysicalExpr>,
84 pub options: SortOptions,
86}
87
88impl PhysicalSortExpr {
89 pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
91 Self { expr, options }
92 }
93
94 pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
96 Self::new(expr, SortOptions::default())
97 }
98
99 pub fn asc(mut self) -> Self {
101 self.options.descending = false;
102 self
103 }
104
105 pub fn desc(mut self) -> Self {
107 self.options.descending = true;
108 self
109 }
110
111 pub fn nulls_first(mut self) -> Self {
113 self.options.nulls_first = true;
114 self
115 }
116
117 pub fn nulls_last(mut self) -> Self {
119 self.options.nulls_first = false;
120 self
121 }
122
123 pub fn fmt_sql(&self, f: &mut Formatter) -> fmt::Result {
125 write!(
126 f,
127 "{} {}",
128 fmt_sql(self.expr.as_ref()),
129 to_str(&self.options)
130 )
131 }
132}
133
134impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
136 fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
137 self.expr.as_ref()
138 }
139}
140
141impl PartialEq for PhysicalSortExpr {
142 fn eq(&self, other: &PhysicalSortExpr) -> bool {
143 self.options == other.options && self.expr.eq(&other.expr)
144 }
145}
146
147impl Eq for PhysicalSortExpr {}
148
149impl Hash for PhysicalSortExpr {
150 fn hash<H: Hasher>(&self, state: &mut H) {
151 self.expr.hash(state);
152 self.options.hash(state);
153 }
154}
155
156impl Display for PhysicalSortExpr {
157 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
158 write!(f, "{} {}", self.expr, to_str(&self.options))
159 }
160}
161
162impl PhysicalSortExpr {
163 pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
165 let value_to_sort = self.expr.evaluate(batch)?;
166 let array_to_sort = match value_to_sort {
167 ColumnarValue::Array(array) => array,
168 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
169 };
170 Ok(SortColumn {
171 values: array_to_sort,
172 options: Some(self.options),
173 })
174 }
175
176 pub fn satisfy(
180 &self,
181 requirement: &PhysicalSortRequirement,
182 schema: &Schema,
183 ) -> bool {
184 let nullable = self.expr.nullable(schema).unwrap_or(true);
186 self.expr.eq(&requirement.expr)
187 && if nullable {
188 requirement.options.is_none_or(|opts| self.options == opts)
189 } else {
190 requirement
191 .options
192 .is_none_or(|opts| self.options.descending == opts.descending)
193 }
194 }
195}
196
197#[derive(Clone, Debug)]
217pub struct PhysicalSortRequirement {
218 pub expr: Arc<dyn PhysicalExpr>,
220 pub options: Option<SortOptions>,
223}
224
225impl From<PhysicalSortRequirement> for PhysicalSortExpr {
226 fn from(value: PhysicalSortRequirement) -> Self {
231 let options = value.options.unwrap_or(SortOptions {
232 descending: false,
233 nulls_first: false,
234 });
235 PhysicalSortExpr::new(value.expr, options)
236 }
237}
238
239impl From<PhysicalSortExpr> for PhysicalSortRequirement {
240 fn from(value: PhysicalSortExpr) -> Self {
241 PhysicalSortRequirement::new(value.expr, Some(value.options))
242 }
243}
244
245impl PartialEq for PhysicalSortRequirement {
246 fn eq(&self, other: &PhysicalSortRequirement) -> bool {
247 self.options == other.options && self.expr.eq(&other.expr)
248 }
249}
250
251impl Display for PhysicalSortRequirement {
252 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
253 let opts_string = self.options.as_ref().map_or("NA", to_str);
254 write!(f, "{} {}", self.expr, opts_string)
255 }
256}
257
258pub fn format_physical_sort_requirement_list(
262 exprs: &[PhysicalSortRequirement],
263) -> impl Display + '_ {
264 struct DisplayWrapper<'a>(&'a [PhysicalSortRequirement]);
265 impl Display for DisplayWrapper<'_> {
266 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
267 let mut iter = self.0.iter();
268 write!(f, "[")?;
269 if let Some(expr) = iter.next() {
270 write!(f, "{expr}")?;
271 }
272 for expr in iter {
273 write!(f, ", {expr}")?;
274 }
275 write!(f, "]")?;
276 Ok(())
277 }
278 }
279 DisplayWrapper(exprs)
280}
281
282impl PhysicalSortRequirement {
283 pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
293 Self { expr, options }
294 }
295
296 pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
298 self.expr = expr;
299 self
300 }
301
302 pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
304 self.expr.eq(&other.expr)
305 && other
306 .options
307 .is_none_or(|other_opts| self.options == Some(other_opts))
308 }
309
310 #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
311 pub fn from_sort_exprs<'a>(
312 ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
313 ) -> LexRequirement {
314 let ordering = ordering.into_iter().cloned().collect();
315 LexRequirement::from_lex_ordering(ordering)
316 }
317 #[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
318 pub fn to_sort_exprs(
319 requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
320 ) -> LexOrdering {
321 let requirements = requirements.into_iter().collect();
322 LexOrdering::from_lex_requirement(requirements)
323 }
324}
325
326#[inline]
328fn to_str(options: &SortOptions) -> &str {
329 match (options.descending, options.nulls_first) {
330 (true, true) => "DESC",
331 (true, false) => "DESC NULLS LAST",
332 (false, true) => "ASC",
333 (false, false) => "ASC NULLS LAST",
334 }
335}
336
337#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
344pub struct LexOrdering {
345 inner: Vec<PhysicalSortExpr>,
346}
347
348impl AsRef<LexOrdering> for LexOrdering {
349 fn as_ref(&self) -> &LexOrdering {
350 self
351 }
352}
353
354impl LexOrdering {
355 pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
357 Self { inner }
358 }
359
360 pub fn empty() -> &'static LexOrdering {
362 static EMPTY_ORDER: LazyLock<LexOrdering> = LazyLock::new(LexOrdering::default);
363 &EMPTY_ORDER
364 }
365
366 pub fn capacity(&self) -> usize {
369 self.inner.capacity()
370 }
371
372 pub fn clear(&mut self) {
374 self.inner.clear()
375 }
376
377 pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
379 self.inner
380 }
381
382 pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
384 self.inner.contains(expr)
385 }
386
387 pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
389 self.inner.extend(iter)
390 }
391
392 pub fn retain<F>(&mut self, f: F)
394 where
395 F: FnMut(&PhysicalSortExpr) -> bool,
396 {
397 self.inner.retain(f)
398 }
399
400 pub fn is_empty(&self) -> bool {
402 self.inner.is_empty()
403 }
404
405 pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
407 self.inner.iter()
408 }
409
410 pub fn len(&self) -> usize {
412 self.inner.len()
413 }
414
415 pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
417 self.inner.pop()
418 }
419
420 pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
422 self.inner.push(physical_sort_expr)
423 }
424
425 pub fn truncate(&mut self, len: usize) {
427 self.inner.truncate(len)
428 }
429
430 pub fn merge(mut self, other: LexOrdering) -> Self {
432 self.inner = self.inner.into_iter().chain(other).unique().collect();
433 self
434 }
435
436 pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
445 requirement
446 .into_iter()
447 .map(PhysicalSortExpr::from)
448 .collect()
449 }
450
451 pub fn collapse(self) -> Self {
458 let mut output = LexOrdering::default();
459 for item in self {
460 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
461 output.push(item);
462 }
463 }
464 output
465 }
466
467 pub fn transform<F>(&mut self, f: F)
470 where
471 F: FnMut(&mut PhysicalSortExpr),
472 {
473 self.inner.iter_mut().for_each(f);
474 }
475}
476
477impl From<Vec<PhysicalSortExpr>> for LexOrdering {
478 fn from(value: Vec<PhysicalSortExpr>) -> Self {
479 Self::new(value)
480 }
481}
482
483impl From<LexRequirement> for LexOrdering {
484 fn from(value: LexRequirement) -> Self {
485 Self::from_lex_requirement(value)
486 }
487}
488
489impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
491 fn from(value: LexOrdering) -> Self {
492 value.inner.into()
493 }
494}
495
496impl Deref for LexOrdering {
497 type Target = [PhysicalSortExpr];
498
499 fn deref(&self) -> &Self::Target {
500 self.inner.as_slice()
501 }
502}
503
504impl Display for LexOrdering {
505 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
506 let mut first = true;
507 for sort_expr in &self.inner {
508 if first {
509 first = false;
510 } else {
511 write!(f, ", ")?;
512 }
513 write!(f, "{sort_expr}")?;
514 }
515 Ok(())
516 }
517}
518
519impl FromIterator<PhysicalSortExpr> for LexOrdering {
520 fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
521 let mut lex_ordering = LexOrdering::default();
522
523 for i in iter {
524 lex_ordering.push(i);
525 }
526
527 lex_ordering
528 }
529}
530
531impl Index<usize> for LexOrdering {
532 type Output = PhysicalSortExpr;
533
534 fn index(&self, index: usize) -> &Self::Output {
535 &self.inner[index]
536 }
537}
538
539impl Index<Range<usize>> for LexOrdering {
540 type Output = [PhysicalSortExpr];
541
542 fn index(&self, range: Range<usize>) -> &Self::Output {
543 &self.inner[range]
544 }
545}
546
547impl Index<RangeFrom<usize>> for LexOrdering {
548 type Output = [PhysicalSortExpr];
549
550 fn index(&self, range_from: RangeFrom<usize>) -> &Self::Output {
551 &self.inner[range_from]
552 }
553}
554
555impl Index<RangeTo<usize>> for LexOrdering {
556 type Output = [PhysicalSortExpr];
557
558 fn index(&self, range_to: RangeTo<usize>) -> &Self::Output {
559 &self.inner[range_to]
560 }
561}
562
563impl IntoIterator for LexOrdering {
564 type Item = PhysicalSortExpr;
565 type IntoIter = IntoIter<PhysicalSortExpr>;
566
567 fn into_iter(self) -> Self::IntoIter {
568 self.inner.into_iter()
569 }
570}
571
572#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
575pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
576
577#[derive(Debug, Default, Clone, PartialEq)]
580pub struct LexRequirement {
581 pub inner: Vec<PhysicalSortRequirement>,
582}
583
584impl LexRequirement {
585 pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
586 Self { inner }
587 }
588
589 pub fn is_empty(&self) -> bool {
590 self.inner.is_empty()
591 }
592
593 pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
594 self.inner.iter()
595 }
596
597 pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
598 self.inner.push(physical_sort_requirement)
599 }
600
601 pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
606 Self::new(
607 ordering
608 .into_iter()
609 .map(PhysicalSortRequirement::from)
610 .collect(),
611 )
612 }
613
614 pub fn collapse(self) -> Self {
620 let mut output = Vec::<PhysicalSortRequirement>::new();
621 for item in self {
622 if !output.iter().any(|req| req.expr.eq(&item.expr)) {
623 output.push(item);
624 }
625 }
626 LexRequirement::new(output)
627 }
628}
629
630impl From<LexOrdering> for LexRequirement {
631 fn from(value: LexOrdering) -> Self {
632 Self::from_lex_ordering(value)
633 }
634}
635
636impl Deref for LexRequirement {
637 type Target = [PhysicalSortRequirement];
638
639 fn deref(&self) -> &Self::Target {
640 self.inner.as_slice()
641 }
642}
643
644impl FromIterator<PhysicalSortRequirement> for LexRequirement {
645 fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
646 let mut lex_requirement = LexRequirement::new(vec![]);
647
648 for i in iter {
649 lex_requirement.inner.push(i);
650 }
651
652 lex_requirement
653 }
654}
655
656impl IntoIterator for LexRequirement {
657 type Item = PhysicalSortRequirement;
658 type IntoIter = IntoIter<Self::Item>;
659
660 fn into_iter(self) -> Self::IntoIter {
661 self.inner.into_iter()
662 }
663}
664
665impl<'a> IntoIterator for &'a LexOrdering {
666 type Item = &'a PhysicalSortExpr;
667 type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
668
669 fn into_iter(self) -> Self::IntoIter {
670 self.inner.iter()
671 }
672}
673
674pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];