use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
pub trait WindowExpr: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
fn field(&self) -> Result<Field>;
fn name(&self) -> &str {
"WindowExpr: default name"
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect()
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
fn evaluate_partition_points(
&self,
num_rows: usize,
partition_columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
if partition_columns.is_empty() {
Ok(vec![Range {
start: 0,
end: num_rows,
}])
} else {
Ok(lexicographical_partition_ranges(partition_columns)
.map_err(DataFusionError::ArrowError)?
.collect::<Vec<_>>())
}
}
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
fn order_by(&self) -> &[PhysicalSortExpr];
fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
self.partition_by()
.iter()
.map(|expr| {
PhysicalSortExpr {
expr: expr.clone(),
options: SortOptions::default(),
}
.evaluate_to_sort_column(batch)
})
.collect()
}
fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
let mut sort_columns = self.partition_columns(batch)?;
let order_by_columns = self
.order_by()
.iter()
.map(|e| e.evaluate_to_sort_column(batch))
.collect::<Result<Vec<SortColumn>>>()?;
sort_columns.extend(order_by_columns);
Ok(sort_columns)
}
}