Skip to main content

datafusion_physical_plan/windows/
window_agg_exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Stream and channel implementations for window function expressions.
19
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::utils::create_schema;
25use crate::execution_plan::{CardinalityEffect, EmissionType};
26use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
27use crate::stream::EmptyRecordBatchStream;
28use crate::windows::{
29    calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
30    window_equivalence_properties,
31};
32use crate::{
33    ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
34    ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
35    SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties,
36};
37
38use arrow::array::ArrayRef;
39use arrow::compute::{concat, concat_batches};
40use arrow::datatypes::SchemaRef;
41use arrow::error::ArrowError;
42use arrow::record_batch::RecordBatch;
43use datafusion_common::stats::Precision;
44use datafusion_common::utils::{evaluate_partition_ranges, transpose};
45use datafusion_common::{Result, assert_eq_or_internal_err};
46use datafusion_execution::TaskContext;
47use datafusion_physical_expr_common::sort_expr::{
48    OrderingRequirements, PhysicalSortExpr,
49};
50
51use futures::{Stream, StreamExt, ready};
52
53/// Window execution plan
54#[derive(Debug, Clone)]
55pub struct WindowAggExec {
56    /// Input plan
57    pub(crate) input: Arc<dyn ExecutionPlan>,
58    /// Window function expression
59    window_expr: Vec<Arc<dyn WindowExpr>>,
60    /// Schema after the window is run
61    schema: SchemaRef,
62    /// Execution metrics
63    metrics: ExecutionPlanMetricsSet,
64    /// Partition by indices that defines preset for existing ordering
65    // see `get_ordered_partition_by_indices` for more details.
66    ordered_partition_by_indices: Vec<usize>,
67    /// Cache holding plan properties like equivalences, output partitioning etc.
68    cache: Arc<PlanProperties>,
69    /// If `can_partition` is false, partition_keys is always empty.
70    can_repartition: bool,
71}
72
73impl WindowAggExec {
74    /// Create a new execution plan for window aggregates
75    pub fn try_new(
76        window_expr: Vec<Arc<dyn WindowExpr>>,
77        input: Arc<dyn ExecutionPlan>,
78        can_repartition: bool,
79    ) -> Result<Self> {
80        let schema = create_schema(&input.schema(), &window_expr)?;
81        let schema = Arc::new(schema);
82
83        let ordered_partition_by_indices =
84            get_ordered_partition_by_indices(window_expr[0].partition_by(), &input)?;
85        let cache = Self::compute_properties(&schema, &input, &window_expr)?;
86        Ok(Self {
87            input,
88            window_expr,
89            schema,
90            metrics: ExecutionPlanMetricsSet::new(),
91            ordered_partition_by_indices,
92            cache: Arc::new(cache),
93            can_repartition,
94        })
95    }
96
97    /// Window expressions
98    pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
99        &self.window_expr
100    }
101
102    /// Input plan
103    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
104        &self.input
105    }
106
107    /// Return the output sort order of partition keys: For example
108    /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
109    // We are sure that partition by columns are always at the beginning of sort_keys
110    // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` columns can be used safely
111    // to calculate partition separation points
112    pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
113        let partition_by = self.window_expr()[0].partition_by();
114        get_partition_by_sort_exprs(
115            &self.input,
116            partition_by,
117            &self.ordered_partition_by_indices,
118        )
119    }
120
121    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
122    fn compute_properties(
123        schema: &SchemaRef,
124        input: &Arc<dyn ExecutionPlan>,
125        window_exprs: &[Arc<dyn WindowExpr>],
126    ) -> Result<PlanProperties> {
127        // Calculate equivalence properties:
128        let eq_properties = window_equivalence_properties(schema, input, window_exprs)?;
129
130        // Get output partitioning:
131        // Because we can have repartitioning using the partition keys this
132        // would be either 1 or more than 1 depending on the presence of repartitioning.
133        let output_partitioning = input.output_partitioning().clone();
134
135        // Construct properties cache:
136        Ok(PlanProperties::new(
137            eq_properties,
138            output_partitioning,
139            // TODO: Emission type and boundedness information can be enhanced here
140            EmissionType::Final,
141            input.boundedness(),
142        ))
143    }
144
145    pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
146        if !self.can_repartition {
147            vec![]
148        } else {
149            let all_partition_keys = self
150                .window_expr()
151                .iter()
152                .map(|expr| expr.partition_by().to_vec())
153                .collect::<Vec<_>>();
154
155            all_partition_keys
156                .into_iter()
157                .min_by_key(|s| s.len())
158                .unwrap_or_else(Vec::new)
159        }
160    }
161
162    fn with_new_children_and_same_properties(
163        &self,
164        mut children: Vec<Arc<dyn ExecutionPlan>>,
165    ) -> Self {
166        Self {
167            input: children.swap_remove(0),
168            metrics: ExecutionPlanMetricsSet::new(),
169            ..Self::clone(self)
170        }
171    }
172}
173
174impl DisplayAs for WindowAggExec {
175    fn fmt_as(
176        &self,
177        t: DisplayFormatType,
178        f: &mut std::fmt::Formatter,
179    ) -> std::fmt::Result {
180        match t {
181            DisplayFormatType::Default | DisplayFormatType::Verbose => {
182                write!(f, "WindowAggExec: ")?;
183                let g: Vec<String> = self
184                    .window_expr
185                    .iter()
186                    .map(|e| {
187                        format!(
188                            "{}: {:?}, frame: {:?}",
189                            e.name().to_owned(),
190                            e.field(),
191                            e.get_window_frame()
192                        )
193                    })
194                    .collect();
195                write!(f, "wdw=[{}]", g.join(", "))?;
196            }
197            DisplayFormatType::TreeRender => {
198                let g: Vec<String> = self
199                    .window_expr
200                    .iter()
201                    .map(|e| e.name().to_owned().to_string())
202                    .collect();
203                writeln!(f, "select_list={}", g.join(", "))?;
204            }
205        }
206        Ok(())
207    }
208}
209
210impl ExecutionPlan for WindowAggExec {
211    fn name(&self) -> &'static str {
212        "WindowAggExec"
213    }
214
215    /// Return a reference to Any that can be used for downcasting
216    fn properties(&self) -> &Arc<PlanProperties> {
217        &self.cache
218    }
219
220    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
221        vec![&self.input]
222    }
223
224    fn maintains_input_order(&self) -> Vec<bool> {
225        vec![true]
226    }
227
228    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
229        let partition_bys = self.window_expr()[0].partition_by();
230        let order_keys = self.window_expr()[0].order_by();
231        if self.ordered_partition_by_indices.len() < partition_bys.len() {
232            vec![calc_requirements(partition_bys, order_keys)]
233        } else {
234            let partition_bys = self
235                .ordered_partition_by_indices
236                .iter()
237                .map(|idx| &partition_bys[*idx]);
238            vec![calc_requirements(partition_bys, order_keys)]
239        }
240    }
241
242    fn required_input_distribution(&self) -> Vec<Distribution> {
243        if self.partition_keys().is_empty() {
244            vec![Distribution::SinglePartition]
245        } else {
246            vec![Distribution::HashPartitioned(self.partition_keys())]
247        }
248    }
249
250    fn with_new_children(
251        self: Arc<Self>,
252        mut children: Vec<Arc<dyn ExecutionPlan>>,
253    ) -> Result<Arc<dyn ExecutionPlan>> {
254        check_if_same_properties!(self, children);
255        Ok(Arc::new(WindowAggExec::try_new(
256            self.window_expr.clone(),
257            children.swap_remove(0),
258            true,
259        )?))
260    }
261
262    fn execute(
263        &self,
264        partition: usize,
265        context: Arc<TaskContext>,
266    ) -> Result<SendableRecordBatchStream> {
267        let input = self.input.execute(partition, context)?;
268        let stream = Box::pin(WindowAggStream::new(
269            Arc::clone(&self.schema),
270            self.window_expr.clone(),
271            input,
272            BaselineMetrics::new(&self.metrics, partition),
273            self.partition_by_sort_keys()?,
274            self.ordered_partition_by_indices.clone(),
275        )?);
276        Ok(stream)
277    }
278
279    fn metrics(&self) -> Option<MetricsSet> {
280        Some(self.metrics.clone_inner())
281    }
282
283    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
284        let input_stat =
285            Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
286        let win_cols = self.window_expr.len();
287        let input_cols = self.input.schema().fields().len();
288        // TODO stats: some windowing function will maintain invariants such as min, max...
289        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
290        // copy stats of the input to the beginning of the schema.
291        column_statistics.extend(input_stat.column_statistics);
292        for _ in 0..win_cols {
293            column_statistics.push(ColumnStatistics::new_unknown())
294        }
295        Ok(Arc::new(Statistics {
296            num_rows: input_stat.num_rows,
297            column_statistics,
298            total_byte_size: Precision::Absent,
299        }))
300    }
301
302    fn cardinality_effect(&self) -> CardinalityEffect {
303        CardinalityEffect::Equal
304    }
305}
306
307/// Compute the window aggregate columns
308fn compute_window_aggregates(
309    window_expr: &[Arc<dyn WindowExpr>],
310    batch: &RecordBatch,
311) -> Result<Vec<ArrayRef>> {
312    window_expr
313        .iter()
314        .map(|window_expr| window_expr.evaluate(batch))
315        .collect()
316}
317
318/// stream for window aggregation plan
319pub struct WindowAggStream {
320    schema: SchemaRef,
321    input: SendableRecordBatchStream,
322    batches: Vec<RecordBatch>,
323    finished: bool,
324    window_expr: Vec<Arc<dyn WindowExpr>>,
325    partition_by_sort_keys: Vec<PhysicalSortExpr>,
326    baseline_metrics: BaselineMetrics,
327    ordered_partition_by_indices: Vec<usize>,
328}
329
330impl WindowAggStream {
331    /// Create a new WindowAggStream
332    pub fn new(
333        schema: SchemaRef,
334        window_expr: Vec<Arc<dyn WindowExpr>>,
335        input: SendableRecordBatchStream,
336        baseline_metrics: BaselineMetrics,
337        partition_by_sort_keys: Vec<PhysicalSortExpr>,
338        ordered_partition_by_indices: Vec<usize>,
339    ) -> Result<Self> {
340        // In WindowAggExec all partition by columns should be ordered.
341        assert_eq_or_internal_err!(
342            window_expr[0].partition_by().len(),
343            ordered_partition_by_indices.len(),
344            "All partition by columns should have an ordering"
345        );
346        Ok(Self {
347            schema,
348            input,
349            batches: vec![],
350            finished: false,
351            window_expr,
352            baseline_metrics,
353            partition_by_sort_keys,
354            ordered_partition_by_indices,
355        })
356    }
357
358    fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
359        // record compute time on drop
360        let _timer = self.baseline_metrics.elapsed_compute().timer();
361
362        let batch = concat_batches(&self.input.schema(), &self.batches)?;
363        if batch.num_rows() == 0 {
364            return Ok(None);
365        }
366
367        let partition_by_sort_keys = self
368            .ordered_partition_by_indices
369            .iter()
370            .map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
371            .collect::<Result<Vec<_>>>()?;
372        let partition_points =
373            evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
374
375        let mut partition_results = vec![];
376        // Calculate window cols
377        for partition_point in partition_points {
378            let length = partition_point.end - partition_point.start;
379            partition_results.push(compute_window_aggregates(
380                &self.window_expr,
381                &batch.slice(partition_point.start, length),
382            )?)
383        }
384        let columns = transpose(partition_results)
385            .iter()
386            .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
387            .collect::<Vec<_>>()
388            .into_iter()
389            .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
390
391        // combine with the original cols
392        // note the setup of window aggregates is that they newly calculated window
393        // expression results are always appended to the columns
394        let mut batch_columns = batch.columns().to_vec();
395        // calculate window cols
396        batch_columns.extend_from_slice(&columns);
397        Ok(Some(RecordBatch::try_new(
398            Arc::clone(&self.schema),
399            batch_columns,
400        )?))
401    }
402}
403
404impl Stream for WindowAggStream {
405    type Item = Result<RecordBatch>;
406
407    fn poll_next(
408        mut self: Pin<&mut Self>,
409        cx: &mut Context<'_>,
410    ) -> Poll<Option<Self::Item>> {
411        let poll = self.poll_next_inner(cx);
412        self.baseline_metrics.record_poll(poll)
413    }
414}
415
416impl WindowAggStream {
417    #[inline]
418    fn poll_next_inner(
419        &mut self,
420        cx: &mut Context<'_>,
421    ) -> Poll<Option<Result<RecordBatch>>> {
422        if self.finished {
423            return Poll::Ready(None);
424        }
425
426        loop {
427            return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
428                Some(Ok(batch)) => {
429                    self.batches.push(batch);
430                    continue;
431                }
432                Some(Err(e)) => Err(e),
433                None => {
434                    // Release the input pipeline's resources before computing
435                    // the final aggregates.
436                    let input_schema = self.input.schema();
437                    self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
438                    let Some(result) = self.compute_aggregates()? else {
439                        return Poll::Ready(None);
440                    };
441                    self.finished = true;
442                    // Empty record batches should not be emitted.
443                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
444                    debug_assert!(result.num_rows() > 0);
445                    Ok(result)
446                }
447            }));
448        }
449    }
450}
451
452impl RecordBatchStream for WindowAggStream {
453    /// Get the schema
454    fn schema(&self) -> SchemaRef {
455        Arc::clone(&self.schema)
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use crate::test::TestMemoryExec;
463    use crate::windows::create_window_expr;
464    use arrow::datatypes::{DataType, Field, Schema};
465    use datafusion_common::ScalarValue;
466    use datafusion_expr::{
467        WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
468    };
469    use datafusion_functions_aggregate::count::count_udaf;
470
471    #[test]
472    fn test_window_agg_cardinality_effect() -> Result<()> {
473        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
474        let input: Arc<dyn ExecutionPlan> =
475            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
476        let args = vec![crate::expressions::col("a", &schema)?];
477        let window_expr = create_window_expr(
478            &WindowFunctionDefinition::AggregateUDF(count_udaf()),
479            "count(a)".to_string(),
480            &args,
481            &[],
482            &[],
483            Arc::new(WindowFrame::new_bounds(
484                WindowFrameUnits::Rows,
485                WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
486                WindowFrameBound::CurrentRow,
487            )),
488            Arc::clone(&schema),
489            false,
490            false,
491            None,
492        )?;
493
494        let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
495        assert!(matches!(
496            window.cardinality_effect(),
497            CardinalityEffect::Equal
498        ));
499        Ok(())
500    }
501}