use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::PhysicalExpr;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
#[derive(Clone, Debug)]
pub struct PhysicalSortExpr {
pub expr: Arc<dyn PhysicalExpr>,
pub options: SortOptions,
}
impl PartialEq for PhysicalSortExpr {
fn eq(&self, other: &PhysicalSortExpr) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
}
}
impl Eq for PhysicalSortExpr {}
impl Hash for PhysicalSortExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr.hash(state);
self.options.hash(state);
}
}
impl std::fmt::Display for PhysicalSortExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} {}", self.expr, to_str(&self.options))
}
}
impl PhysicalSortExpr {
pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
let value_to_sort = self.expr.evaluate(batch)?;
let array_to_sort = match value_to_sort {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => {
return exec_err!(
"Sort operation is not applicable to scalar value {scalar}"
);
}
};
Ok(SortColumn {
values: array_to_sort,
options: Some(self.options),
})
}
pub fn satisfy(
&self,
requirement: &PhysicalSortRequirement,
schema: &Schema,
) -> bool {
let nullable = self.expr.nullable(schema).unwrap_or(true);
self.expr.eq(&requirement.expr)
&& if nullable {
requirement
.options
.map_or(true, |opts| self.options == opts)
} else {
requirement
.options
.map_or(true, |opts| self.options.descending == opts.descending)
}
}
pub fn format_list(input: &[PhysicalSortExpr]) -> impl Display + '_ {
struct DisplayableList<'a>(&'a [PhysicalSortExpr]);
impl<'a> Display for DisplayableList<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut first = true;
for sort_expr in self.0 {
if first {
first = false;
} else {
write!(f, ",")?;
}
write!(f, "{}", sort_expr)?;
}
Ok(())
}
}
DisplayableList(input)
}
}
#[derive(Clone, Debug)]
pub struct PhysicalSortRequirement {
pub expr: Arc<dyn PhysicalExpr>,
pub options: Option<SortOptions>,
}
impl From<PhysicalSortRequirement> for PhysicalSortExpr {
fn from(value: PhysicalSortRequirement) -> Self {
let options = value.options.unwrap_or(SortOptions {
descending: false,
nulls_first: false,
});
PhysicalSortExpr {
expr: value.expr,
options,
}
}
}
impl From<PhysicalSortExpr> for PhysicalSortRequirement {
fn from(value: PhysicalSortExpr) -> Self {
PhysicalSortRequirement::new(value.expr, Some(value.options))
}
}
impl PartialEq for PhysicalSortRequirement {
fn eq(&self, other: &PhysicalSortRequirement) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
}
}
impl std::fmt::Display for PhysicalSortRequirement {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let opts_string = self.options.as_ref().map_or("NA", to_str);
write!(f, "{} {}", self.expr, opts_string)
}
}
impl PhysicalSortRequirement {
pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
Self { expr, options }
}
pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
self.expr = expr;
self
}
pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
self.expr.eq(&other.expr)
&& other.options.map_or(true, |other_opts| {
self.options.map_or(false, |opts| opts == other_opts)
})
}
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> Vec<PhysicalSortRequirement> {
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
}
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Vec<PhysicalSortExpr> {
requirements
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
}
}
#[inline]
fn to_str(options: &SortOptions) -> &str {
match (options.descending, options.nulls_first) {
(true, true) => "DESC",
(true, false) => "DESC NULLS LAST",
(false, true) => "ASC",
(false, false) => "ASC NULLS LAST",
}
}
pub type LexOrdering = Vec<PhysicalSortExpr>;
pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
pub type LexRequirement = Vec<PhysicalSortRequirement>;
pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];