use std::sync::Arc;
use polars_core::chunked_array::cast::CastOptions;
use polars_core::prelude::*;
use polars_core::schema::Schema;
use polars_core::series::IsSorted;
use polars_utils::arena::{Arena, Node};
use polars_utils::itertools::Itertools;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::unique_id::UniqueId;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(all(feature = "strings", feature = "concat_str"))]
use crate::plans::IRStringFunction;
use crate::plans::{
AExpr, ExprIR, FunctionIR, HintIR, IR, IRFunctionExpr, Sorted, ToFieldContext,
constant_evaluate, into_column,
};
#[derive(Debug)]
pub struct IRPlanSorted(PlHashMap<Node, IRSorted>);
impl IRPlanSorted {
pub fn resolve(root: Node, ir_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) -> Self {
let mut seen = PlHashSet::default();
let mut sortedness = PlHashMap::default();
let mut cache_proxy = PlHashMap::default();
let mut amort_passed_columns = PlHashSet::default();
is_sorted_rec(
root,
ir_arena,
expr_arena,
&mut seen,
&mut sortedness,
&mut cache_proxy,
&mut amort_passed_columns,
true,
);
Self(sortedness)
}
pub fn get(&self, node: Node) -> Option<&IRSorted> {
self.0.get(&node)
}
pub fn is_expr_sorted(
&self,
at: Node,
expr: &ExprIR,
expr_arena: &Arena<AExpr>,
input_schema: &Schema,
) -> Option<AExprSorted> {
expr_is_sorted(self.get(at), expr, expr_arena, input_schema)
}
pub fn are_keys_sorted_any(
&self,
at: Node,
keys: &[ExprIR],
expr_arena: &Arena<AExpr>,
input_schema: &Schema,
) -> Option<Vec<AExprSorted>> {
are_keys_sorted_any(self.get(at), keys, expr_arena, input_schema)
}
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[derive(Debug, Default, PartialEq, Clone, Copy, Hash)]
pub struct AExprSorted {
pub descending: Option<bool>,
pub nulls_last: Option<bool>,
}
impl AExprSorted {
pub fn reverse(self) -> Self {
Self {
descending: self.descending.map(|x| !x),
nulls_last: self.nulls_last.map(|x| !x),
}
}
pub fn with_desc(mut self, desc: Option<bool>) -> Self {
self.descending = desc;
self
}
pub fn with_nulls_last(mut self, nulls_last: Option<bool>) -> Self {
self.nulls_last = nulls_last;
self
}
pub fn is_asc(&self) -> bool {
matches!(self.descending, Some(false))
}
pub fn is_desc(&self) -> bool {
matches!(self.descending, Some(true))
}
pub fn is_nulls_first(&self) -> bool {
matches!(self.nulls_last, Some(false))
}
pub fn is_nulls_last(&self) -> bool {
matches!(self.nulls_last, Some(true))
}
}
impl From<AExprSorted> for IsSorted {
fn from(val: AExprSorted) -> Self {
match val.descending {
Some(false) => IsSorted::Ascending,
Some(true) => IsSorted::Descending,
None => IsSorted::Not,
}
}
}
#[derive(Debug, Clone)]
pub struct IRSorted(pub Arc<[Sorted]>);
pub fn are_keys_sorted_any(
ir_sorted: Option<&IRSorted>,
keys: &[ExprIR],
expr_arena: &Arena<AExpr>,
input_schema: &Schema,
) -> Option<Vec<AExprSorted>> {
let mut sortedness = Vec::with_capacity(keys.len());
for (idx, key) in keys.iter().enumerate() {
let s = aexpr_sortedness(
expr_arena.get(key.node()),
expr_arena,
input_schema,
Some(&ir_sorted?.0[idx..]),
)?;
sortedness.push(s);
}
Some(sortedness)
}
pub fn expr_is_sorted(
ir_sorted: Option<&IRSorted>,
expr: &ExprIR,
expr_arena: &Arena<AExpr>,
input_schema: &Schema,
) -> Option<AExprSorted> {
aexpr_sortedness(
expr_arena.get(expr.node()),
expr_arena,
input_schema,
ir_sorted.map(|s| s.0.as_ref()),
)
}
pub fn is_sorted(root: Node, ir_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) -> Option<IRSorted> {
let mut seen = PlHashSet::default();
let mut sortedness = PlHashMap::default();
let mut cache_proxy = PlHashMap::default();
let mut amort_passed_columns = PlHashSet::default();
is_sorted_rec(
root,
ir_arena,
expr_arena,
&mut seen,
&mut sortedness,
&mut cache_proxy,
&mut amort_passed_columns,
false,
)
}
#[expect(clippy::too_many_arguments)]
#[recursive::recursive]
fn is_sorted_rec(
root: Node,
ir_arena: &Arena<IR>,
expr_arena: &Arena<AExpr>,
seen: &mut PlHashSet<Node>,
sortedness: &mut PlHashMap<Node, IRSorted>,
cache_proxy: &mut PlHashMap<UniqueId, Option<IRSorted>>,
amort_passed_columns: &mut PlHashSet<PlSmallStr>,
create_full_map: bool,
) -> Option<IRSorted> {
if let Some(s) = sortedness.get(&root) {
return Some(s.clone());
}
if !seen.insert(root) {
return None;
}
macro_rules! rec {
($node:expr) => {{
is_sorted_rec(
$node,
ir_arena,
expr_arena,
seen,
sortedness,
cache_proxy,
amort_passed_columns,
create_full_map,
)
}};
}
if create_full_map {
for input in ir_arena.get(root).inputs() {
rec!(input);
}
}
let sorted = match ir_arena.get(root) {
#[cfg(feature = "python")]
IR::PythonScan { .. } => None,
IR::Slice {
input,
offset: _,
len: _,
} => rec!(*input),
IR::Filter {
input,
predicate: _,
} => rec!(*input),
IR::Scan { .. } => None,
IR::DataFrameScan { df, .. } => {
let last_is_null = |c: &Column| Some(c.get(c.len().checked_sub(1)?).ok()?.is_null());
let sorted_cols = df
.columns()
.iter()
.filter_map(|c| match c.is_sorted_flag() {
IsSorted::Not => None,
IsSorted::Ascending => Some(Sorted {
column: c.name().clone(),
descending: Some(false),
nulls_last: Some(last_is_null(c).unwrap_or(false)),
}),
IsSorted::Descending => Some(Sorted {
column: c.name().clone(),
descending: Some(true),
nulls_last: Some(last_is_null(c).unwrap_or(false)),
}),
})
.collect_vec();
(!sorted_cols.is_empty()).then(|| IRSorted(sorted_cols.into()))
},
IR::SimpleProjection { input, columns } => {
let (input, columns) = (*input, columns.clone());
match rec!(input) {
None => None,
Some(v) => {
let first_unsorted_key = v.0.iter().position(|v| !columns.contains(&v.column));
match first_unsorted_key {
None => Some(v),
Some(0) => None,
Some(i) => Some(IRSorted(v.0.iter().take(i).cloned().collect())),
}
},
}
},
IR::Select { input, expr, .. } => {
let input = *input;
let input_sorted = rec!(input);
if let Some(input_sorted) = &input_sorted {
amort_passed_columns.clear();
amort_passed_columns.extend(expr.iter().filter_map(|e| {
let column = into_column(e.node(), expr_arena)?;
(column == e.output_name()).then(|| column.clone())
}));
let first_unkept_key = input_sorted
.0
.iter()
.position(|v| !amort_passed_columns.contains(&v.column));
match first_unkept_key {
None => Some(input_sorted.clone()),
Some(0) => {
let input_schema = ir_arena.get(input).schema(ir_arena);
first_expr_ir_sorted(
expr,
expr_arena,
input_schema.as_ref(),
Some(&input_sorted.0),
)
.map(|s| IRSorted([s].into()))
},
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
}
} else {
let input_schema = ir_arena.get(input).schema(ir_arena);
first_expr_ir_sorted(expr, expr_arena, input_schema.as_ref(), None)
.map(|s| IRSorted([s].into()))
}
},
IR::HStack { input, exprs, .. } => {
let input = *input;
let input_sorted = rec!(input);
if let Some(input_sorted) = &input_sorted {
amort_passed_columns.clear();
amort_passed_columns.extend(exprs.iter().filter_map(|e| {
match into_column(e.node(), expr_arena) {
None => Some(e.output_name().clone()),
Some(c) if c == e.output_name() => None,
Some(_) => Some(e.output_name().clone()),
}
}));
let first_overwritten_key = input_sorted
.0
.iter()
.position(|v| amort_passed_columns.contains(&v.column));
match first_overwritten_key {
None => Some(input_sorted.clone()),
Some(0) => {
let input_schema = ir_arena.get(input).schema(ir_arena);
first_expr_ir_sorted(
exprs,
expr_arena,
input_schema.as_ref(),
Some(&input_sorted.0),
)
.map(|s| IRSorted([s].into()))
},
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
}
} else {
let input_schema = ir_arena.get(input).schema(ir_arena);
first_expr_ir_sorted(exprs, expr_arena, input_schema.as_ref(), None)
.map(|s| IRSorted([s].into()))
}
},
IR::Sort {
input: _,
by_column,
slice: _,
sort_options,
} => {
let mut s = by_column
.iter()
.map_while(|e| {
into_column(e.node(), expr_arena).map(|c| Sorted {
column: c.clone(),
descending: Some(false),
nulls_last: Some(false),
})
})
.collect::<Vec<_>>();
if sort_options.descending.len() != 1 {
s.iter_mut()
.zip(sort_options.descending.iter())
.for_each(|(s, &d)| s.descending = Some(d));
} else if sort_options.descending[0] {
s.iter_mut().for_each(|s| s.descending = Some(true));
}
if sort_options.nulls_last.len() != 1 {
s.iter_mut()
.zip(sort_options.nulls_last.iter())
.for_each(|(s, &d)| s.nulls_last = Some(d));
} else if sort_options.nulls_last[0] {
s.iter_mut().for_each(|s| s.nulls_last = Some(true));
}
Some(IRSorted(s.into()))
},
IR::Cache { input, id } => {
let (input, id) = (*input, *id);
if let Some(s) = cache_proxy.get(&id) {
s.clone()
} else {
let s = rec!(input);
cache_proxy.insert(id, s.clone());
s
}
},
IR::GroupBy {
input,
keys,
options,
maintain_order: true,
..
} if !options.is_rolling() && !options.is_dynamic() => {
let input = *input;
let input_sorted = rec!(input)?;
amort_passed_columns.clear();
amort_passed_columns.extend(keys.iter().filter_map(|e| {
let column = into_column(e.node(), expr_arena)?;
(column == e.output_name()).then(|| column.clone())
}));
let first_unkept_key = input_sorted
.0
.iter()
.position(|v| !amort_passed_columns.contains(&v.column));
match first_unkept_key {
None => Some(input_sorted.clone()),
Some(0) => {
let input_schema = ir_arena.get(input).schema(ir_arena);
first_expr_ir_sorted(keys, expr_arena, input_schema.as_ref(), None)
.map(|s| IRSorted([s].into()))
},
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
}
},
#[cfg(feature = "dynamic_group_by")]
IR::GroupBy { options, .. } if options.is_rolling() => {
let Some(rolling_options) = &options.rolling else {
unreachable!()
};
Some(IRSorted(
[Sorted {
column: rolling_options.index_column.clone(),
descending: None,
nulls_last: None,
}]
.into(),
))
},
#[cfg(feature = "dynamic_group_by")]
IR::GroupBy { keys, options, .. } if options.is_dynamic() => {
let Some(dynamic_options) = &options.dynamic else {
unreachable!()
};
keys.is_empty().then(|| {
IRSorted(
[Sorted {
column: dynamic_options.index_column.clone(),
descending: None,
nulls_last: None,
}]
.into(),
)
})
},
IR::GroupBy { .. } => None,
IR::Join { .. } => None,
IR::Gather {
input,
idxs,
null_on_oob,
} => {
let input = *input;
let idxs = *idxs;
let null_on_oob = *null_on_oob;
let input_sorted = rec!(input)?;
let idxs_sorted = rec!(idxs)?;
if idxs_sorted.0.len() != 1 {
return None;
}
let idxs_sorted = &idxs_sorted.0[0];
for s in input_sorted.0.iter() {
if s.nulls_last.is_none() || s.nulls_last != idxs_sorted.nulls_last {
return None;
}
}
if null_on_oob {
if idxs_sorted.nulls_last.is_none()
|| idxs_sorted.nulls_last != idxs_sorted.descending.map(|b| !b)
{
return None;
}
}
let mut out_sorted = input_sorted.0.iter().cloned().collect_vec();
match idxs_sorted.descending {
Some(false) => {},
Some(true) => {
for s in &mut out_sorted {
s.descending = s.descending.map(|b| !b);
s.nulls_last = s.nulls_last.map(|b| !b);
}
},
None => {
for s in &mut out_sorted {
s.descending = None;
s.nulls_last = None;
}
},
}
Some(input_sorted)
},
IR::MapFunction { input, function } => match function {
FunctionIR::Hint(hint) => match hint {
HintIR::Sorted(v) => Some(IRSorted(v.clone())),
#[expect(unreachable_patterns)]
_ => rec!(*input),
},
_ => None,
},
IR::Union { .. } => None,
IR::HConcat { .. } => None,
IR::ExtContext { .. } => None,
IR::Sink { .. } => None,
IR::SinkMultiple { .. } => None,
#[cfg(feature = "merge_sorted")]
IR::MergeSorted { key, .. } => Some(IRSorted(
[Sorted {
column: key.clone(),
descending: None,
nulls_last: None,
}]
.into(),
)),
IR::Distinct { input, options } => {
if !options.maintain_order {
return None;
}
let input = *input;
rec!(input)
},
IR::UnoptimizedDispatch { .. } => None,
IR::Invalid => unreachable!(),
};
if let Some(sorted) = sorted.clone() {
sortedness.insert(root, sorted);
}
sorted
}
fn first_expr_ir_sorted(
exprs: &[ExprIR],
arena: &Arena<AExpr>,
schema: &Schema,
input_sorted: Option<&[Sorted]>,
) -> Option<Sorted> {
exprs.iter().find_map(|e| {
aexpr_sortedness(arena.get(e.node()), arena, schema, input_sorted).map(|s| Sorted {
column: e.output_name().clone(),
descending: s.descending,
nulls_last: s.nulls_last,
})
})
}
#[recursive::recursive]
pub fn aexpr_sortedness(
aexpr: &AExpr,
arena: &Arena<AExpr>,
schema: &Schema,
input_sorted: Option<&[Sorted]>,
) -> Option<AExprSorted> {
match aexpr {
AExpr::Element => None,
AExpr::Explode { .. } => None,
AExpr::Column(col) => {
let fst = input_sorted?.first()?;
(fst.column == col).then_some(AExprSorted {
descending: fst.descending,
nulls_last: fst.nulls_last,
})
},
#[cfg(feature = "dtype-struct")]
AExpr::StructField(_) => None,
AExpr::Literal(lv) if lv.is_scalar() => Some(AExprSorted {
descending: Some(false),
nulls_last: Some(false),
}),
AExpr::Literal(_) => None,
AExpr::Len => Some(AExprSorted {
descending: Some(false),
nulls_last: Some(false),
}),
AExpr::Cast {
expr,
dtype,
options: CastOptions::Strict,
} if dtype.is_integer() => {
let expr = arena.get(*expr);
let expr_sortedness = aexpr_sortedness(expr, arena, schema, input_sorted)?;
let input_dtype = expr.to_dtype(&ToFieldContext::new(arena, schema)).ok()?;
if !input_dtype.is_integer() {
return None;
}
Some(expr_sortedness)
},
AExpr::Cast { .. } => None, AExpr::Sort { expr: _, options } => Some(AExprSorted {
descending: Some(options.descending),
nulls_last: Some(options.nulls_last),
}),
AExpr::Function {
input,
function,
options: _,
} => function_expr_sortedness(function, input, arena, schema, input_sorted),
AExpr::Filter { input, by: _ }
| AExpr::Slice {
input,
offset: _,
length: _,
} => aexpr_sortedness(arena.get(*input), arena, schema, input_sorted),
AExpr::BinaryExpr { .. }
| AExpr::Gather { .. }
| AExpr::SortBy { .. }
| AExpr::Agg(_)
| AExpr::Ternary { .. }
| AExpr::AnonymousAgg { .. }
| AExpr::AnonymousFunction { .. }
| AExpr::Eval { .. }
| AExpr::Over { .. } => None,
#[cfg(feature = "dtype-struct")]
AExpr::StructEval { .. } => None,
#[cfg(feature = "dynamic_group_by")]
AExpr::Rolling { .. } => None,
}
}
pub fn function_expr_sortedness(
function: &IRFunctionExpr,
inputs: &[ExprIR],
arena: &Arena<AExpr>,
schema: &Schema,
input_sorted: Option<&[Sorted]>,
) -> Option<AExprSorted> {
macro_rules! rec_ae {
($node:expr) => {{ aexpr_sortedness(arena.get($node), arena, schema, input_sorted) }};
}
match function {
#[cfg(feature = "rle")]
IRFunctionExpr::RLEID => Some(AExprSorted {
descending: Some(false),
nulls_last: Some(false),
}),
IRFunctionExpr::SetSortedFlag(sortedness) => match sortedness.descending {
Some(false) => Some(AExprSorted {
descending: Some(false),
nulls_last: None,
}),
Some(true) => Some(AExprSorted {
descending: Some(true),
nulls_last: None,
}),
None => None,
},
IRFunctionExpr::Unique(true)
| IRFunctionExpr::DropNulls
| IRFunctionExpr::DropNans
| IRFunctionExpr::FillNullWithStrategy(
FillNullStrategy::Forward(None) | FillNullStrategy::Backward(None),
) => {
let [e] = inputs else {
return None;
};
rec_ae!(e.node())
},
#[cfg(feature = "mode")]
IRFunctionExpr::Mode {
maintain_order: true,
} => {
let [e] = inputs else {
return None;
};
rec_ae!(e.node())
},
#[cfg(feature = "range")]
IRFunctionExpr::Range(range) => {
use crate::plans::IRRangeFunction as R;
match range {
R::IntRange { step: 1, dtype }
if dtype.is_unsigned_integer()
&& constant_evaluate(inputs[0].node(), arena, schema, 0)??
.extract_i64()
.is_ok_and(|v| v == 0) =>
{
Some(AExprSorted {
descending: Some(false),
nulls_last: Some(false),
})
},
_ => None,
}
},
IRFunctionExpr::Reverse => {
let [e] = inputs else {
return None;
};
let mut sortedness = rec_ae!(e.node())?;
if let Some(d) = &mut sortedness.descending {
*d = !*d;
}
if let Some(n) = &mut sortedness.nulls_last {
*n ^= !*n;
}
Some(sortedness)
},
#[cfg(all(feature = "strings", feature = "concat_str"))]
IRFunctionExpr::StringExpr(IRStringFunction::ConcatHorizontal {
ignore_nulls: false,
delimiter: _,
}) => {
let [e] = inputs else {
return None;
};
rec_ae!(e.node())
},
_ => None,
}
}