use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use futures::stream::Stream;
use futures::{ready, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct WindowAggExec {
input: Arc<dyn ExecutionPlan>,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
input_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
}
impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
Ok(Self {
input,
window_expr,
schema,
input_schema,
metrics: ExecutionPlanMetricsSet::new(),
})
}
pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
&self.window_expr
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}
}
impl ExecutionPlan for WindowAggExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
fn maintains_input_order(&self) -> bool {
true
}
fn relies_on_input_order(&self) -> bool {
true
}
fn required_child_distribution(&self) -> Distribution {
if self
.window_expr()
.iter()
.all(|expr| expr.partition_by().is_empty())
{
Distribution::SinglePartition
} else {
Distribution::UnspecifiedDistribution
}
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
)?))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
input,
BaselineMetrics::new(&self.metrics, partition),
));
Ok(stream)
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "WindowAggExec: ")?;
let g: Vec<String> = self
.window_expr
.iter()
.map(|e| format!("{}: {:?}", e.name().to_owned(), e.field()))
.collect();
write!(f, "wdw=[{}]", g.join(", "))?;
}
}
Ok(())
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let mut column_statistics = vec![ColumnStatistics::default(); win_cols];
if let Some(input_col_stats) = input_stat.column_statistics {
column_statistics.extend(input_col_stats);
} else {
column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
}
Statistics {
is_exact: input_stat.is_exact,
num_rows: input_stat.num_rows,
column_statistics: Some(column_statistics),
total_byte_size: None,
}
}
}
fn create_schema(
input_schema: &Schema,
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Schema> {
let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
for expr in window_expr {
fields.push(expr.field()?);
}
fields.extend_from_slice(input_schema.fields());
Ok(Schema::new(fields))
}
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
window_expr
.iter()
.map(|window_expr| window_expr.evaluate(batch))
.collect()
}
pub struct WindowAggStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
batches: Vec<RecordBatch>,
finished: bool,
window_expr: Vec<Arc<dyn WindowExpr>>,
baseline_metrics: BaselineMetrics,
}
impl WindowAggStream {
pub fn new(
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
) -> Self {
Self {
schema,
input,
batches: vec![],
finished: false,
window_expr,
baseline_metrics,
}
}
fn compute_aggregates(&self) -> ArrowResult<RecordBatch> {
let _timer = self.baseline_metrics.elapsed_compute().timer();
let batch = common::combine_batches(&self.batches, self.input.schema())?;
if let Some(batch) = batch {
let mut columns = compute_window_aggregates(&self.window_expr, &batch)
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
columns.extend_from_slice(batch.columns());
RecordBatch::try_new(self.schema.clone(), columns)
} else {
Ok(RecordBatch::new_empty(self.schema.clone()))
}
}
}
impl Stream for WindowAggStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
}
impl WindowAggStream {
#[inline]
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}
loop {
let result = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.batches.push(batch);
continue;
}
Some(Err(e)) => Err(e),
None => self.compute_aggregates(),
};
self.finished = true;
return Poll::Ready(Some(result));
}
}
}
impl RecordBatchStream for WindowAggStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}