1use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::utils::create_schema;
25use crate::execution_plan::{CardinalityEffect, EmissionType};
26use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
27use crate::stream::EmptyRecordBatchStream;
28use crate::windows::{
29 calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
30 window_equivalence_properties,
31};
32use crate::{
33 ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
34 ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
35 SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties,
36};
37
38use arrow::array::ArrayRef;
39use arrow::compute::{concat, concat_batches};
40use arrow::datatypes::SchemaRef;
41use arrow::error::ArrowError;
42use arrow::record_batch::RecordBatch;
43use datafusion_common::stats::Precision;
44use datafusion_common::utils::{evaluate_partition_ranges, transpose};
45use datafusion_common::{Result, assert_eq_or_internal_err};
46use datafusion_execution::TaskContext;
47use datafusion_physical_expr_common::sort_expr::{
48 OrderingRequirements, PhysicalSortExpr,
49};
50
51use futures::{Stream, StreamExt, ready};
52
53#[derive(Debug, Clone)]
55pub struct WindowAggExec {
56 pub(crate) input: Arc<dyn ExecutionPlan>,
58 window_expr: Vec<Arc<dyn WindowExpr>>,
60 schema: SchemaRef,
62 metrics: ExecutionPlanMetricsSet,
64 ordered_partition_by_indices: Vec<usize>,
67 cache: Arc<PlanProperties>,
69 can_repartition: bool,
71}
72
73impl WindowAggExec {
74 pub fn try_new(
76 window_expr: Vec<Arc<dyn WindowExpr>>,
77 input: Arc<dyn ExecutionPlan>,
78 can_repartition: bool,
79 ) -> Result<Self> {
80 let schema = create_schema(&input.schema(), &window_expr)?;
81 let schema = Arc::new(schema);
82
83 let ordered_partition_by_indices =
84 get_ordered_partition_by_indices(window_expr[0].partition_by(), &input)?;
85 let cache = Self::compute_properties(&schema, &input, &window_expr)?;
86 Ok(Self {
87 input,
88 window_expr,
89 schema,
90 metrics: ExecutionPlanMetricsSet::new(),
91 ordered_partition_by_indices,
92 cache: Arc::new(cache),
93 can_repartition,
94 })
95 }
96
97 pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
99 &self.window_expr
100 }
101
102 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
104 &self.input
105 }
106
107 pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
113 let partition_by = self.window_expr()[0].partition_by();
114 get_partition_by_sort_exprs(
115 &self.input,
116 partition_by,
117 &self.ordered_partition_by_indices,
118 )
119 }
120
121 fn compute_properties(
123 schema: &SchemaRef,
124 input: &Arc<dyn ExecutionPlan>,
125 window_exprs: &[Arc<dyn WindowExpr>],
126 ) -> Result<PlanProperties> {
127 let eq_properties = window_equivalence_properties(schema, input, window_exprs)?;
129
130 let output_partitioning = input.output_partitioning().clone();
134
135 Ok(PlanProperties::new(
137 eq_properties,
138 output_partitioning,
139 EmissionType::Final,
141 input.boundedness(),
142 ))
143 }
144
145 pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
146 if !self.can_repartition {
147 vec![]
148 } else {
149 let all_partition_keys = self
150 .window_expr()
151 .iter()
152 .map(|expr| expr.partition_by().to_vec())
153 .collect::<Vec<_>>();
154
155 all_partition_keys
156 .into_iter()
157 .min_by_key(|s| s.len())
158 .unwrap_or_else(Vec::new)
159 }
160 }
161
162 fn with_new_children_and_same_properties(
163 &self,
164 mut children: Vec<Arc<dyn ExecutionPlan>>,
165 ) -> Self {
166 Self {
167 input: children.swap_remove(0),
168 metrics: ExecutionPlanMetricsSet::new(),
169 ..Self::clone(self)
170 }
171 }
172}
173
174impl DisplayAs for WindowAggExec {
175 fn fmt_as(
176 &self,
177 t: DisplayFormatType,
178 f: &mut std::fmt::Formatter,
179 ) -> std::fmt::Result {
180 match t {
181 DisplayFormatType::Default | DisplayFormatType::Verbose => {
182 write!(f, "WindowAggExec: ")?;
183 let g: Vec<String> = self
184 .window_expr
185 .iter()
186 .map(|e| {
187 format!(
188 "{}: {:?}, frame: {:?}",
189 e.name().to_owned(),
190 e.field(),
191 e.get_window_frame()
192 )
193 })
194 .collect();
195 write!(f, "wdw=[{}]", g.join(", "))?;
196 }
197 DisplayFormatType::TreeRender => {
198 let g: Vec<String> = self
199 .window_expr
200 .iter()
201 .map(|e| e.name().to_owned().to_string())
202 .collect();
203 writeln!(f, "select_list={}", g.join(", "))?;
204 }
205 }
206 Ok(())
207 }
208}
209
210impl ExecutionPlan for WindowAggExec {
211 fn name(&self) -> &'static str {
212 "WindowAggExec"
213 }
214
215 fn properties(&self) -> &Arc<PlanProperties> {
217 &self.cache
218 }
219
220 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
221 vec![&self.input]
222 }
223
224 fn maintains_input_order(&self) -> Vec<bool> {
225 vec![true]
226 }
227
228 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
229 let partition_bys = self.window_expr()[0].partition_by();
230 let order_keys = self.window_expr()[0].order_by();
231 if self.ordered_partition_by_indices.len() < partition_bys.len() {
232 vec![calc_requirements(partition_bys, order_keys)]
233 } else {
234 let partition_bys = self
235 .ordered_partition_by_indices
236 .iter()
237 .map(|idx| &partition_bys[*idx]);
238 vec![calc_requirements(partition_bys, order_keys)]
239 }
240 }
241
242 fn required_input_distribution(&self) -> Vec<Distribution> {
243 if self.partition_keys().is_empty() {
244 vec![Distribution::SinglePartition]
245 } else {
246 vec![Distribution::HashPartitioned(self.partition_keys())]
247 }
248 }
249
250 fn with_new_children(
251 self: Arc<Self>,
252 mut children: Vec<Arc<dyn ExecutionPlan>>,
253 ) -> Result<Arc<dyn ExecutionPlan>> {
254 check_if_same_properties!(self, children);
255 Ok(Arc::new(WindowAggExec::try_new(
256 self.window_expr.clone(),
257 children.swap_remove(0),
258 true,
259 )?))
260 }
261
262 fn execute(
263 &self,
264 partition: usize,
265 context: Arc<TaskContext>,
266 ) -> Result<SendableRecordBatchStream> {
267 let input = self.input.execute(partition, context)?;
268 let stream = Box::pin(WindowAggStream::new(
269 Arc::clone(&self.schema),
270 self.window_expr.clone(),
271 input,
272 BaselineMetrics::new(&self.metrics, partition),
273 self.partition_by_sort_keys()?,
274 self.ordered_partition_by_indices.clone(),
275 )?);
276 Ok(stream)
277 }
278
279 fn metrics(&self) -> Option<MetricsSet> {
280 Some(self.metrics.clone_inner())
281 }
282
283 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
284 let input_stat =
285 Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
286 let win_cols = self.window_expr.len();
287 let input_cols = self.input.schema().fields().len();
288 let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
290 column_statistics.extend(input_stat.column_statistics);
292 for _ in 0..win_cols {
293 column_statistics.push(ColumnStatistics::new_unknown())
294 }
295 Ok(Arc::new(Statistics {
296 num_rows: input_stat.num_rows,
297 column_statistics,
298 total_byte_size: Precision::Absent,
299 }))
300 }
301
302 fn cardinality_effect(&self) -> CardinalityEffect {
303 CardinalityEffect::Equal
304 }
305}
306
307fn compute_window_aggregates(
309 window_expr: &[Arc<dyn WindowExpr>],
310 batch: &RecordBatch,
311) -> Result<Vec<ArrayRef>> {
312 window_expr
313 .iter()
314 .map(|window_expr| window_expr.evaluate(batch))
315 .collect()
316}
317
318pub struct WindowAggStream {
320 schema: SchemaRef,
321 input: SendableRecordBatchStream,
322 batches: Vec<RecordBatch>,
323 finished: bool,
324 window_expr: Vec<Arc<dyn WindowExpr>>,
325 partition_by_sort_keys: Vec<PhysicalSortExpr>,
326 baseline_metrics: BaselineMetrics,
327 ordered_partition_by_indices: Vec<usize>,
328}
329
330impl WindowAggStream {
331 pub fn new(
333 schema: SchemaRef,
334 window_expr: Vec<Arc<dyn WindowExpr>>,
335 input: SendableRecordBatchStream,
336 baseline_metrics: BaselineMetrics,
337 partition_by_sort_keys: Vec<PhysicalSortExpr>,
338 ordered_partition_by_indices: Vec<usize>,
339 ) -> Result<Self> {
340 assert_eq_or_internal_err!(
342 window_expr[0].partition_by().len(),
343 ordered_partition_by_indices.len(),
344 "All partition by columns should have an ordering"
345 );
346 Ok(Self {
347 schema,
348 input,
349 batches: vec![],
350 finished: false,
351 window_expr,
352 baseline_metrics,
353 partition_by_sort_keys,
354 ordered_partition_by_indices,
355 })
356 }
357
358 fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
359 let _timer = self.baseline_metrics.elapsed_compute().timer();
361
362 let batch = concat_batches(&self.input.schema(), &self.batches)?;
363 if batch.num_rows() == 0 {
364 return Ok(None);
365 }
366
367 let partition_by_sort_keys = self
368 .ordered_partition_by_indices
369 .iter()
370 .map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
371 .collect::<Result<Vec<_>>>()?;
372 let partition_points =
373 evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
374
375 let mut partition_results = vec![];
376 for partition_point in partition_points {
378 let length = partition_point.end - partition_point.start;
379 partition_results.push(compute_window_aggregates(
380 &self.window_expr,
381 &batch.slice(partition_point.start, length),
382 )?)
383 }
384 let columns = transpose(partition_results)
385 .iter()
386 .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
387 .collect::<Vec<_>>()
388 .into_iter()
389 .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
390
391 let mut batch_columns = batch.columns().to_vec();
395 batch_columns.extend_from_slice(&columns);
397 Ok(Some(RecordBatch::try_new(
398 Arc::clone(&self.schema),
399 batch_columns,
400 )?))
401 }
402}
403
404impl Stream for WindowAggStream {
405 type Item = Result<RecordBatch>;
406
407 fn poll_next(
408 mut self: Pin<&mut Self>,
409 cx: &mut Context<'_>,
410 ) -> Poll<Option<Self::Item>> {
411 let poll = self.poll_next_inner(cx);
412 self.baseline_metrics.record_poll(poll)
413 }
414}
415
416impl WindowAggStream {
417 #[inline]
418 fn poll_next_inner(
419 &mut self,
420 cx: &mut Context<'_>,
421 ) -> Poll<Option<Result<RecordBatch>>> {
422 if self.finished {
423 return Poll::Ready(None);
424 }
425
426 loop {
427 return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
428 Some(Ok(batch)) => {
429 self.batches.push(batch);
430 continue;
431 }
432 Some(Err(e)) => Err(e),
433 None => {
434 let input_schema = self.input.schema();
437 self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
438 let Some(result) = self.compute_aggregates()? else {
439 return Poll::Ready(None);
440 };
441 self.finished = true;
442 debug_assert!(result.num_rows() > 0);
445 Ok(result)
446 }
447 }));
448 }
449 }
450}
451
452impl RecordBatchStream for WindowAggStream {
453 fn schema(&self) -> SchemaRef {
455 Arc::clone(&self.schema)
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::test::TestMemoryExec;
463 use crate::windows::create_window_expr;
464 use arrow::datatypes::{DataType, Field, Schema};
465 use datafusion_common::ScalarValue;
466 use datafusion_expr::{
467 WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
468 };
469 use datafusion_functions_aggregate::count::count_udaf;
470
471 #[test]
472 fn test_window_agg_cardinality_effect() -> Result<()> {
473 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
474 let input: Arc<dyn ExecutionPlan> =
475 Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
476 let args = vec![crate::expressions::col("a", &schema)?];
477 let window_expr = create_window_expr(
478 &WindowFunctionDefinition::AggregateUDF(count_udaf()),
479 "count(a)".to_string(),
480 &args,
481 &[],
482 &[],
483 Arc::new(WindowFrame::new_bounds(
484 WindowFrameUnits::Rows,
485 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
486 WindowFrameBound::CurrentRow,
487 )),
488 Arc::clone(&schema),
489 false,
490 false,
491 None,
492 )?;
493
494 let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
495 assert!(matches!(
496 window.cardinality_effect(),
497 CardinalityEffect::Equal
498 ));
499 Ok(())
500 }
501}