use crate::physical_expr::PhysicalExpr;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
use std::sync::{Arc, LazyLock};
use std::vec::IntoIter;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::columnar_value::ColumnarValue;
use itertools::Itertools;
#[derive(Clone, Debug)]
pub struct PhysicalSortExpr {
pub expr: Arc<dyn PhysicalExpr>,
pub options: SortOptions,
}
impl PhysicalSortExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
Self { expr, options }
}
pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr, SortOptions::default())
}
pub fn asc(mut self) -> Self {
self.options.descending = false;
self
}
pub fn desc(mut self) -> Self {
self.options.descending = true;
self
}
pub fn nulls_first(mut self) -> Self {
self.options.nulls_first = true;
self
}
pub fn nulls_last(mut self) -> Self {
self.options.nulls_first = false;
self
}
}
impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
self.expr.as_ref()
}
}
impl PartialEq for PhysicalSortExpr {
fn eq(&self, other: &PhysicalSortExpr) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
}
}
impl Eq for PhysicalSortExpr {}
impl Hash for PhysicalSortExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr.hash(state);
self.options.hash(state);
}
}
impl Display for PhysicalSortExpr {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{} {}", self.expr, to_str(&self.options))
}
}
impl PhysicalSortExpr {
pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
let value_to_sort = self.expr.evaluate(batch)?;
let array_to_sort = match value_to_sort {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
};
Ok(SortColumn {
values: array_to_sort,
options: Some(self.options),
})
}
pub fn satisfy(
&self,
requirement: &PhysicalSortRequirement,
schema: &Schema,
) -> bool {
let nullable = self.expr.nullable(schema).unwrap_or(true);
self.expr.eq(&requirement.expr)
&& if nullable {
requirement.options.is_none_or(|opts| self.options == opts)
} else {
requirement
.options
.is_none_or(|opts| self.options.descending == opts.descending)
}
}
}
#[derive(Clone, Debug)]
pub struct PhysicalSortRequirement {
pub expr: Arc<dyn PhysicalExpr>,
pub options: Option<SortOptions>,
}
impl From<PhysicalSortRequirement> for PhysicalSortExpr {
fn from(value: PhysicalSortRequirement) -> Self {
let options = value.options.unwrap_or(SortOptions {
descending: false,
nulls_first: false,
});
PhysicalSortExpr::new(value.expr, options)
}
}
impl From<PhysicalSortExpr> for PhysicalSortRequirement {
fn from(value: PhysicalSortExpr) -> Self {
PhysicalSortRequirement::new(value.expr, Some(value.options))
}
}
impl PartialEq for PhysicalSortRequirement {
fn eq(&self, other: &PhysicalSortRequirement) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
}
}
impl Display for PhysicalSortRequirement {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let opts_string = self.options.as_ref().map_or("NA", to_str);
write!(f, "{} {}", self.expr, opts_string)
}
}
pub fn format_physical_sort_requirement_list(
exprs: &[PhysicalSortRequirement],
) -> impl Display + '_ {
struct DisplayWrapper<'a>(&'a [PhysicalSortRequirement]);
impl Display for DisplayWrapper<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut iter = self.0.iter();
write!(f, "[")?;
if let Some(expr) = iter.next() {
write!(f, "{}", expr)?;
}
for expr in iter {
write!(f, ", {}", expr)?;
}
write!(f, "]")?;
Ok(())
}
}
DisplayWrapper(exprs)
}
impl PhysicalSortRequirement {
pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
Self { expr, options }
}
pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
self.expr = expr;
self
}
pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
self.expr.eq(&other.expr)
&& other
.options
.is_none_or(|other_opts| self.options == Some(other_opts))
}
#[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
let ordering = ordering.into_iter().cloned().collect();
LexRequirement::from_lex_ordering(ordering)
}
#[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> LexOrdering {
let requirements = requirements.into_iter().collect();
LexOrdering::from_lex_requirement(requirements)
}
}
#[inline]
fn to_str(options: &SortOptions) -> &str {
match (options.descending, options.nulls_first) {
(true, true) => "DESC",
(true, false) => "DESC NULLS LAST",
(false, true) => "ASC",
(false, false) => "ASC NULLS LAST",
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct LexOrdering {
inner: Vec<PhysicalSortExpr>,
}
impl AsRef<LexOrdering> for LexOrdering {
fn as_ref(&self) -> &LexOrdering {
self
}
}
impl LexOrdering {
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
Self { inner }
}
pub fn empty() -> &'static LexOrdering {
static EMPTY_ORDER: LazyLock<LexOrdering> = LazyLock::new(LexOrdering::default);
&EMPTY_ORDER
}
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
pub fn clear(&mut self) {
self.inner.clear()
}
pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
self.inner
}
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
self.inner.contains(expr)
}
pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
self.inner.extend(iter)
}
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&PhysicalSortExpr) -> bool,
{
self.inner.retain(f)
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
self.inner.iter()
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
self.inner.pop()
}
pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
self.inner.push(physical_sort_expr)
}
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
}
pub fn merge(mut self, other: LexOrdering) -> Self {
self.inner = self.inner.into_iter().chain(other).unique().collect();
self
}
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
requirement
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
}
pub fn collapse(self) -> Self {
let mut output = LexOrdering::default();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
pub fn transform<F>(&mut self, f: F)
where
F: FnMut(&mut PhysicalSortExpr),
{
self.inner.iter_mut().for_each(f);
}
}
impl From<Vec<PhysicalSortExpr>> for LexOrdering {
fn from(value: Vec<PhysicalSortExpr>) -> Self {
Self::new(value)
}
}
impl From<LexRequirement> for LexOrdering {
fn from(value: LexRequirement) -> Self {
Self::from_lex_requirement(value)
}
}
impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
fn from(value: LexOrdering) -> Self {
value.inner.into()
}
}
impl Deref for LexOrdering {
type Target = [PhysicalSortExpr];
fn deref(&self) -> &Self::Target {
self.inner.as_slice()
}
}
impl Display for LexOrdering {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let mut first = true;
for sort_expr in &self.inner {
if first {
first = false;
} else {
write!(f, ", ")?;
}
write!(f, "{}", sort_expr)?;
}
Ok(())
}
}
impl FromIterator<PhysicalSortExpr> for LexOrdering {
fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
let mut lex_ordering = LexOrdering::default();
for i in iter {
lex_ordering.push(i);
}
lex_ordering
}
}
impl Index<usize> for LexOrdering {
type Output = PhysicalSortExpr;
fn index(&self, index: usize) -> &Self::Output {
&self.inner[index]
}
}
impl Index<Range<usize>> for LexOrdering {
type Output = [PhysicalSortExpr];
fn index(&self, range: Range<usize>) -> &Self::Output {
&self.inner[range]
}
}
impl Index<RangeFrom<usize>> for LexOrdering {
type Output = [PhysicalSortExpr];
fn index(&self, range_from: RangeFrom<usize>) -> &Self::Output {
&self.inner[range_from]
}
}
impl Index<RangeTo<usize>> for LexOrdering {
type Output = [PhysicalSortExpr];
fn index(&self, range_to: RangeTo<usize>) -> &Self::Output {
&self.inner[range_to]
}
}
impl IntoIterator for LexOrdering {
type Item = PhysicalSortExpr;
type IntoIter = IntoIter<PhysicalSortExpr>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}
#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
#[derive(Debug, Default, Clone, PartialEq)]
pub struct LexRequirement {
pub inner: Vec<PhysicalSortRequirement>,
}
impl LexRequirement {
pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
Self { inner }
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
self.inner.iter()
}
pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
self.inner.push(physical_sort_requirement)
}
pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
Self::new(
ordering
.into_iter()
.map(PhysicalSortRequirement::from)
.collect(),
)
}
pub fn collapse(self) -> Self {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
}
}
impl From<LexOrdering> for LexRequirement {
fn from(value: LexOrdering) -> Self {
Self::from_lex_ordering(value)
}
}
impl Deref for LexRequirement {
type Target = [PhysicalSortRequirement];
fn deref(&self) -> &Self::Target {
self.inner.as_slice()
}
}
impl FromIterator<PhysicalSortRequirement> for LexRequirement {
fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
let mut lex_requirement = LexRequirement::new(vec![]);
for i in iter {
lex_requirement.inner.push(i);
}
lex_requirement
}
}
impl IntoIterator for LexRequirement {
type Item = PhysicalSortRequirement;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}
impl<'a> IntoIterator for &'a LexOrdering {
type Item = &'a PhysicalSortExpr;
type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
fn into_iter(self) -> Self::IntoIter {
self.inner.iter()
}
}
pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];