use crate::physical_plan::common::transpose;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::windows::{
calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence,
};
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::compute::{concat, concat_batches};
use arrow::datatypes::SchemaBuilder;
use arrow::error::ArrowError;
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement};
use futures::stream::Stream;
use futures::{ready, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct WindowAggExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
input_schema: SchemaRef,
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
metrics: ExecutionPlanMetricsSet,
ordered_partition_by_indices: Vec<usize>,
}
impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
let ordered_partition_by_indices =
get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
Ok(Self {
input,
window_expr,
schema,
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
})
}
pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
&self.window_expr
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
get_at_indices(sort_keys, &self.ordered_partition_by_indices)
}
}
impl ExecutionPlan for WindowAggExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Err(DataFusionError::Plan(
"Window Error: Windowing is not currently support for unbounded inputs."
.to_string(),
))
} else {
Ok(false)
}
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input().output_ordering()
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
if self.ordered_partition_by_indices.len() < partition_bys.len() {
vec![calc_requirements(partition_bys, order_keys)]
} else {
let partition_bys = self
.ordered_partition_by_indices
.iter()
.map(|idx| &partition_bys[*idx]);
vec![calc_requirements(partition_bys, order_keys)]
}
}
fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input().equivalence_properties()
}
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
window_ordering_equivalence(&self.schema, &self.input, &self.window_expr)
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
)?))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
input,
BaselineMetrics::new(&self.metrics, partition),
self.partition_by_sort_keys()?,
self.ordered_partition_by_indices.clone(),
)?);
Ok(stream)
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "WindowAggExec: ")?;
let g: Vec<String> = self
.window_expr
.iter()
.map(|e| {
format!(
"{}: {:?}, frame: {:?}",
e.name().to_owned(),
e.field(),
e.get_window_frame()
)
})
.collect();
write!(f, "wdw=[{}]", g.join(", "))?;
}
}
Ok(())
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
column_statistics.extend(input_col_stats);
} else {
column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
}
column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
Statistics {
is_exact: input_stat.is_exact,
num_rows: input_stat.num_rows,
column_statistics: Some(column_statistics),
total_byte_size: None,
}
}
}
fn create_schema(
input_schema: &Schema,
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Schema> {
let capacity = input_schema.fields().len() + window_expr.len();
let mut builder = SchemaBuilder::with_capacity(capacity);
builder.extend(input_schema.fields().iter().cloned());
for expr in window_expr {
builder.push(expr.field()?);
}
Ok(builder.finish())
}
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
window_expr
.iter()
.map(|window_expr| window_expr.evaluate(batch))
.collect()
}
pub struct WindowAggStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
batches: Vec<RecordBatch>,
finished: bool,
window_expr: Vec<Arc<dyn WindowExpr>>,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
baseline_metrics: BaselineMetrics,
ordered_partition_by_indices: Vec<usize>,
}
impl WindowAggStream {
pub fn new(
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
ordered_partition_by_indices: Vec<usize>,
) -> Result<Self> {
if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
return Err(DataFusionError::Internal(
"All partition by columns should have an ordering".to_string(),
));
}
Ok(Self {
schema,
input,
batches: vec![],
finished: false,
window_expr,
baseline_metrics,
partition_by_sort_keys,
ordered_partition_by_indices,
})
}
fn compute_aggregates(&self) -> Result<RecordBatch> {
let _timer = self.baseline_metrics.elapsed_compute().timer();
let batch = concat_batches(&self.input.schema(), &self.batches)?;
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let partition_by_sort_keys = self
.ordered_partition_by_indices
.iter()
.map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;
let partition_points =
evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
let mut partition_results = vec![];
for partition_point in partition_points {
let length = partition_point.end - partition_point.start;
partition_results.push(compute_window_aggregates(
&self.window_expr,
&batch.slice(partition_point.start, length),
)?)
}
let columns = transpose(partition_results)
.iter()
.map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
let mut batch_columns = batch.columns().to_vec();
batch_columns.extend_from_slice(&columns);
Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?)
}
}
impl Stream for WindowAggStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
}
impl WindowAggStream {
#[inline]
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}
loop {
let result = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.batches.push(batch);
continue;
}
Some(Err(e)) => Err(e),
None => self.compute_aggregates(),
};
self.finished = true;
return Poll::Ready(Some(result));
}
}
}
impl RecordBatchStream for WindowAggStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}