datafusion_physical_plan/windows/
window_agg_exec.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::utils::create_schema;
26use crate::execution_plan::EmissionType;
27use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use crate::windows::{
29 calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
30 window_equivalence_properties,
31};
32use crate::{
33 ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
34 ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
35 SendableRecordBatchStream, Statistics, WindowExpr,
36};
37
38use arrow::array::ArrayRef;
39use arrow::compute::{concat, concat_batches};
40use arrow::datatypes::SchemaRef;
41use arrow::error::ArrowError;
42use arrow::record_batch::RecordBatch;
43use datafusion_common::stats::Precision;
44use datafusion_common::utils::{evaluate_partition_ranges, transpose};
45use datafusion_common::{internal_err, Result};
46use datafusion_execution::TaskContext;
47use datafusion_physical_expr_common::sort_expr::{
48 OrderingRequirements, PhysicalSortExpr,
49};
50
51use futures::{ready, Stream, StreamExt};
52
53#[derive(Debug, Clone)]
55pub struct WindowAggExec {
56 pub(crate) input: Arc<dyn ExecutionPlan>,
58 window_expr: Vec<Arc<dyn WindowExpr>>,
60 schema: SchemaRef,
62 metrics: ExecutionPlanMetricsSet,
64 ordered_partition_by_indices: Vec<usize>,
67 cache: PlanProperties,
69 can_repartition: bool,
71}
72
73impl WindowAggExec {
74 pub fn try_new(
76 window_expr: Vec<Arc<dyn WindowExpr>>,
77 input: Arc<dyn ExecutionPlan>,
78 can_repartition: bool,
79 ) -> Result<Self> {
80 let schema = create_schema(&input.schema(), &window_expr)?;
81 let schema = Arc::new(schema);
82
83 let ordered_partition_by_indices =
84 get_ordered_partition_by_indices(window_expr[0].partition_by(), &input)?;
85 let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr)?;
86 Ok(Self {
87 input,
88 window_expr,
89 schema,
90 metrics: ExecutionPlanMetricsSet::new(),
91 ordered_partition_by_indices,
92 cache,
93 can_repartition,
94 })
95 }
96
97 pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
99 &self.window_expr
100 }
101
102 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
104 &self.input
105 }
106
107 pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
113 let partition_by = self.window_expr()[0].partition_by();
114 get_partition_by_sort_exprs(
115 &self.input,
116 partition_by,
117 &self.ordered_partition_by_indices,
118 )
119 }
120
121 fn compute_properties(
123 schema: SchemaRef,
124 input: &Arc<dyn ExecutionPlan>,
125 window_exprs: &[Arc<dyn WindowExpr>],
126 ) -> Result<PlanProperties> {
127 let eq_properties = window_equivalence_properties(&schema, input, window_exprs)?;
129
130 let output_partitioning = input.output_partitioning().clone();
134
135 Ok(PlanProperties::new(
137 eq_properties,
138 output_partitioning,
139 EmissionType::Final,
141 input.boundedness(),
142 ))
143 }
144
145 pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
146 if !self.can_repartition {
147 vec![]
148 } else {
149 let all_partition_keys = self
150 .window_expr()
151 .iter()
152 .map(|expr| expr.partition_by().to_vec())
153 .collect::<Vec<_>>();
154
155 all_partition_keys
156 .into_iter()
157 .min_by_key(|s| s.len())
158 .unwrap_or_else(Vec::new)
159 }
160 }
161
162 fn statistics_inner(&self) -> Result<Statistics> {
163 let input_stat = self.input.partition_statistics(None)?;
164 let win_cols = self.window_expr.len();
165 let input_cols = self.input.schema().fields().len();
166 let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
168 column_statistics.extend(input_stat.column_statistics);
170 for _ in 0..win_cols {
171 column_statistics.push(ColumnStatistics::new_unknown())
172 }
173 Ok(Statistics {
174 num_rows: input_stat.num_rows,
175 column_statistics,
176 total_byte_size: Precision::Absent,
177 })
178 }
179}
180
181impl DisplayAs for WindowAggExec {
182 fn fmt_as(
183 &self,
184 t: DisplayFormatType,
185 f: &mut std::fmt::Formatter,
186 ) -> std::fmt::Result {
187 match t {
188 DisplayFormatType::Default | DisplayFormatType::Verbose => {
189 write!(f, "WindowAggExec: ")?;
190 let g: Vec<String> = self
191 .window_expr
192 .iter()
193 .map(|e| {
194 format!(
195 "{}: {:?}, frame: {:?}",
196 e.name().to_owned(),
197 e.field(),
198 e.get_window_frame()
199 )
200 })
201 .collect();
202 write!(f, "wdw=[{}]", g.join(", "))?;
203 }
204 DisplayFormatType::TreeRender => {
205 let g: Vec<String> = self
206 .window_expr
207 .iter()
208 .map(|e| e.name().to_owned().to_string())
209 .collect();
210 writeln!(f, "select_list={}", g.join(", "))?;
211 }
212 }
213 Ok(())
214 }
215}
216
217impl ExecutionPlan for WindowAggExec {
218 fn name(&self) -> &'static str {
219 "WindowAggExec"
220 }
221
222 fn as_any(&self) -> &dyn Any {
224 self
225 }
226
227 fn properties(&self) -> &PlanProperties {
228 &self.cache
229 }
230
231 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
232 vec![&self.input]
233 }
234
235 fn maintains_input_order(&self) -> Vec<bool> {
236 vec![true]
237 }
238
239 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
240 let partition_bys = self.window_expr()[0].partition_by();
241 let order_keys = self.window_expr()[0].order_by();
242 if self.ordered_partition_by_indices.len() < partition_bys.len() {
243 vec![calc_requirements(partition_bys, order_keys)]
244 } else {
245 let partition_bys = self
246 .ordered_partition_by_indices
247 .iter()
248 .map(|idx| &partition_bys[*idx]);
249 vec![calc_requirements(partition_bys, order_keys)]
250 }
251 }
252
253 fn required_input_distribution(&self) -> Vec<Distribution> {
254 if self.partition_keys().is_empty() {
255 vec![Distribution::SinglePartition]
256 } else {
257 vec![Distribution::HashPartitioned(self.partition_keys())]
258 }
259 }
260
261 fn with_new_children(
262 self: Arc<Self>,
263 children: Vec<Arc<dyn ExecutionPlan>>,
264 ) -> Result<Arc<dyn ExecutionPlan>> {
265 Ok(Arc::new(WindowAggExec::try_new(
266 self.window_expr.clone(),
267 Arc::clone(&children[0]),
268 true,
269 )?))
270 }
271
272 fn execute(
273 &self,
274 partition: usize,
275 context: Arc<TaskContext>,
276 ) -> Result<SendableRecordBatchStream> {
277 let input = self.input.execute(partition, context)?;
278 let stream = Box::pin(WindowAggStream::new(
279 Arc::clone(&self.schema),
280 self.window_expr.clone(),
281 input,
282 BaselineMetrics::new(&self.metrics, partition),
283 self.partition_by_sort_keys()?,
284 self.ordered_partition_by_indices.clone(),
285 )?);
286 Ok(stream)
287 }
288
289 fn metrics(&self) -> Option<MetricsSet> {
290 Some(self.metrics.clone_inner())
291 }
292
293 fn statistics(&self) -> Result<Statistics> {
294 self.statistics_inner()
295 }
296
297 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
298 if partition.is_none() {
299 self.statistics_inner()
300 } else {
301 Ok(Statistics::new_unknown(&self.schema()))
302 }
303 }
304}
305
306fn compute_window_aggregates(
308 window_expr: &[Arc<dyn WindowExpr>],
309 batch: &RecordBatch,
310) -> Result<Vec<ArrayRef>> {
311 window_expr
312 .iter()
313 .map(|window_expr| window_expr.evaluate(batch))
314 .collect()
315}
316
317pub struct WindowAggStream {
319 schema: SchemaRef,
320 input: SendableRecordBatchStream,
321 batches: Vec<RecordBatch>,
322 finished: bool,
323 window_expr: Vec<Arc<dyn WindowExpr>>,
324 partition_by_sort_keys: Vec<PhysicalSortExpr>,
325 baseline_metrics: BaselineMetrics,
326 ordered_partition_by_indices: Vec<usize>,
327}
328
329impl WindowAggStream {
330 pub fn new(
332 schema: SchemaRef,
333 window_expr: Vec<Arc<dyn WindowExpr>>,
334 input: SendableRecordBatchStream,
335 baseline_metrics: BaselineMetrics,
336 partition_by_sort_keys: Vec<PhysicalSortExpr>,
337 ordered_partition_by_indices: Vec<usize>,
338 ) -> Result<Self> {
339 if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
341 return internal_err!("All partition by columns should have an ordering");
342 }
343 Ok(Self {
344 schema,
345 input,
346 batches: vec![],
347 finished: false,
348 window_expr,
349 baseline_metrics,
350 partition_by_sort_keys,
351 ordered_partition_by_indices,
352 })
353 }
354
355 fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
356 let _timer = self.baseline_metrics.elapsed_compute().timer();
358
359 let batch = concat_batches(&self.input.schema(), &self.batches)?;
360 if batch.num_rows() == 0 {
361 return Ok(None);
362 }
363
364 let partition_by_sort_keys = self
365 .ordered_partition_by_indices
366 .iter()
367 .map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
368 .collect::<Result<Vec<_>>>()?;
369 let partition_points =
370 evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
371
372 let mut partition_results = vec![];
373 for partition_point in partition_points {
375 let length = partition_point.end - partition_point.start;
376 partition_results.push(compute_window_aggregates(
377 &self.window_expr,
378 &batch.slice(partition_point.start, length),
379 )?)
380 }
381 let columns = transpose(partition_results)
382 .iter()
383 .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
384 .collect::<Vec<_>>()
385 .into_iter()
386 .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
387
388 let mut batch_columns = batch.columns().to_vec();
392 batch_columns.extend_from_slice(&columns);
394 Ok(Some(RecordBatch::try_new(
395 Arc::clone(&self.schema),
396 batch_columns,
397 )?))
398 }
399}
400
401impl Stream for WindowAggStream {
402 type Item = Result<RecordBatch>;
403
404 fn poll_next(
405 mut self: Pin<&mut Self>,
406 cx: &mut Context<'_>,
407 ) -> Poll<Option<Self::Item>> {
408 let poll = self.poll_next_inner(cx);
409 self.baseline_metrics.record_poll(poll)
410 }
411}
412
413impl WindowAggStream {
414 #[inline]
415 fn poll_next_inner(
416 &mut self,
417 cx: &mut Context<'_>,
418 ) -> Poll<Option<Result<RecordBatch>>> {
419 if self.finished {
420 return Poll::Ready(None);
421 }
422
423 loop {
424 return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
425 Some(Ok(batch)) => {
426 self.batches.push(batch);
427 continue;
428 }
429 Some(Err(e)) => Err(e),
430 None => {
431 let Some(result) = self.compute_aggregates()? else {
432 return Poll::Ready(None);
433 };
434 self.finished = true;
435 debug_assert!(result.num_rows() > 0);
438 Ok(result)
439 }
440 }));
441 }
442 }
443}
444
445impl RecordBatchStream for WindowAggStream {
446 fn schema(&self) -> SchemaRef {
448 Arc::clone(&self.schema)
449 }
450}