datafusion_physical_expr/window/
standard.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical exec for standard window function expressions.
19
20use std::any::Any;
21use std::ops::Range;
22use std::sync::Arc;
23
24use super::{StandardWindowFunctionExpr, WindowExpr};
25use crate::window::window_expr::{get_orderby_values, WindowFn};
26use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
27use crate::{EquivalenceProperties, PhysicalExpr};
28
29use arrow::array::{new_empty_array, ArrayRef};
30use arrow::datatypes::FieldRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::utils::evaluate_partition_ranges;
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
35use datafusion_expr::WindowFrame;
36use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
37
38/// A window expr that takes the form of a [`StandardWindowFunctionExpr`].
39#[derive(Debug)]
40pub struct StandardWindowExpr {
41    expr: Arc<dyn StandardWindowFunctionExpr>,
42    partition_by: Vec<Arc<dyn PhysicalExpr>>,
43    order_by: Vec<PhysicalSortExpr>,
44    window_frame: Arc<WindowFrame>,
45}
46
47impl StandardWindowExpr {
48    /// create a new standard window function expression
49    pub fn new(
50        expr: Arc<dyn StandardWindowFunctionExpr>,
51        partition_by: &[Arc<dyn PhysicalExpr>],
52        order_by: &[PhysicalSortExpr],
53        window_frame: Arc<WindowFrame>,
54    ) -> Self {
55        Self {
56            expr,
57            partition_by: partition_by.to_vec(),
58            order_by: order_by.to_vec(),
59            window_frame,
60        }
61    }
62
63    /// Get StandardWindowFunction expr of StandardWindowExpr
64    pub fn get_standard_func_expr(&self) -> &Arc<dyn StandardWindowFunctionExpr> {
65        &self.expr
66    }
67
68    /// Adds any equivalent orderings generated by `self.expr` to `builder`.
69    ///
70    /// If `self.expr` doesn't have an ordering, ordering equivalence properties
71    /// are not updated. Otherwise, ordering equivalence properties are updated
72    /// by the ordering of `self.expr`.
73    pub fn add_equal_orderings(
74        &self,
75        eq_properties: &mut EquivalenceProperties,
76    ) -> Result<()> {
77        let schema = eq_properties.schema();
78        if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
79            add_new_ordering_expr_with_partition_by(
80                eq_properties,
81                fn_res_ordering,
82                &self.partition_by,
83            )?;
84        }
85        Ok(())
86    }
87}
88
89impl WindowExpr for StandardWindowExpr {
90    /// Return a reference to Any that can be used for downcasting
91    fn as_any(&self) -> &dyn Any {
92        self
93    }
94
95    fn name(&self) -> &str {
96        self.expr.name()
97    }
98
99    fn field(&self) -> Result<FieldRef> {
100        self.expr.field()
101    }
102
103    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
104        self.expr.expressions()
105    }
106
107    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
108        &self.partition_by
109    }
110
111    fn order_by(&self) -> &[PhysicalSortExpr] {
112        &self.order_by
113    }
114
115    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
116        let mut evaluator = self.expr.create_evaluator()?;
117        let num_rows = batch.num_rows();
118        if evaluator.uses_window_frame() {
119            let sort_options = self.order_by.iter().map(|o| o.options).collect();
120            let mut row_wise_results = vec![];
121
122            let mut values = self.evaluate_args(batch)?;
123            let order_bys = get_orderby_values(self.order_by_columns(batch)?);
124            let n_args = values.len();
125            values.extend(order_bys);
126            let order_bys_ref = &values[n_args..];
127
128            let mut window_frame_ctx =
129                WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options);
130            let mut last_range = Range { start: 0, end: 0 };
131            // We iterate on each row to calculate window frame range and and window function result
132            for idx in 0..num_rows {
133                let range = window_frame_ctx.calculate_range(
134                    order_bys_ref,
135                    &last_range,
136                    num_rows,
137                    idx,
138                )?;
139                let value = evaluator.evaluate(&values, &range)?;
140                row_wise_results.push(value);
141                last_range = range;
142            }
143            ScalarValue::iter_to_array(row_wise_results)
144        } else if evaluator.include_rank() {
145            let columns = self.order_by_columns(batch)?;
146            let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
147            evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
148        } else {
149            let values = self.evaluate_args(batch)?;
150            evaluator.evaluate_all(&values, num_rows)
151        }
152    }
153
154    /// Evaluate the window function against the batch. This function facilitates
155    /// stateful, bounded-memory implementations.
156    fn evaluate_stateful(
157        &self,
158        partition_batches: &PartitionBatches,
159        window_agg_state: &mut PartitionWindowAggStates,
160    ) -> Result<()> {
161        let field = self.expr.field()?;
162        let out_type = field.data_type();
163        let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
164        // create a WindowAggState to clone when `window_agg_state` does not contain the respective
165        // group, which is faster than potentially creating a new one at every iteration
166        let new_state = WindowAggState::new(out_type)?;
167        for (partition_row, partition_batch_state) in partition_batches.iter() {
168            let window_state =
169                if let Some(window_state) = window_agg_state.get_mut(partition_row) {
170                    window_state
171                } else {
172                    let evaluator = self.expr.create_evaluator()?;
173                    window_agg_state
174                        .entry(partition_row.clone())
175                        .or_insert(WindowState {
176                            state: new_state.clone(),
177                            window_fn: WindowFn::Builtin(evaluator),
178                        })
179                };
180            let evaluator = match &mut window_state.window_fn {
181                WindowFn::Builtin(evaluator) => evaluator,
182                _ => unreachable!(),
183            };
184            let state = &mut window_state.state;
185
186            let batch_ref = &partition_batch_state.record_batch;
187            let mut values = self.evaluate_args(batch_ref)?;
188            let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
189                get_orderby_values(self.order_by_columns(batch_ref)?)
190            } else {
191                vec![]
192            };
193            let n_args = values.len();
194            values.extend(order_bys);
195            let order_bys_ref = &values[n_args..];
196
197            // We iterate on each row to perform a running calculation.
198            let record_batch = &partition_batch_state.record_batch;
199            let num_rows = record_batch.num_rows();
200            let mut row_wise_results: Vec<ScalarValue> = vec![];
201            let is_causal = if evaluator.uses_window_frame() {
202                self.window_frame.is_causal()
203            } else {
204                evaluator.is_causal()
205            };
206            for idx in state.last_calculated_index..num_rows {
207                let frame_range = if evaluator.uses_window_frame() {
208                    state
209                        .window_frame_ctx
210                        .get_or_insert_with(|| {
211                            WindowFrameContext::new(
212                                Arc::clone(&self.window_frame),
213                                sort_options.clone(),
214                            )
215                        })
216                        .calculate_range(
217                            order_bys_ref,
218                            // Start search from the last range
219                            &state.window_frame_range,
220                            num_rows,
221                            idx,
222                        )
223                } else {
224                    evaluator.get_range(idx, num_rows)
225                }?;
226
227                // Exit if the range is non-causal and extends all the way:
228                if frame_range.end == num_rows
229                    && !is_causal
230                    && !partition_batch_state.is_end
231                {
232                    break;
233                }
234                // Update last range
235                state.window_frame_range = frame_range;
236                row_wise_results
237                    .push(evaluator.evaluate(&values, &state.window_frame_range)?);
238            }
239            let out_col = if row_wise_results.is_empty() {
240                new_empty_array(out_type)
241            } else if row_wise_results.len() == 1 {
242                // fast path when the result only has a single row
243                row_wise_results[0].to_array()?
244            } else {
245                ScalarValue::iter_to_array(row_wise_results.into_iter())?
246            };
247
248            state.update(&out_col, partition_batch_state)?;
249            if self.window_frame.start_bound.is_unbounded() {
250                evaluator.memoize(state)?;
251            }
252        }
253        Ok(())
254    }
255
256    fn get_window_frame(&self) -> &Arc<WindowFrame> {
257        &self.window_frame
258    }
259
260    fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
261        self.expr.reverse_expr().map(|reverse_expr| {
262            Arc::new(StandardWindowExpr::new(
263                reverse_expr,
264                &self.partition_by.clone(),
265                &self
266                    .order_by
267                    .iter()
268                    .map(|e| e.reverse())
269                    .collect::<Vec<_>>(),
270                Arc::new(self.window_frame.reverse()),
271            )) as _
272        })
273    }
274
275    fn uses_bounded_memory(&self) -> bool {
276        if let Ok(evaluator) = self.expr.create_evaluator() {
277            evaluator.supports_bounded_execution()
278                && (!evaluator.uses_window_frame()
279                    || !self.window_frame.end_bound.is_unbounded())
280        } else {
281            false
282        }
283    }
284
285    fn create_window_fn(&self) -> Result<WindowFn> {
286        Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
287    }
288}
289
290/// Adds a new ordering expression into existing ordering equivalence class(es) based on
291/// PARTITION BY information (if it exists).
292pub(crate) fn add_new_ordering_expr_with_partition_by(
293    eqp: &mut EquivalenceProperties,
294    expr: PhysicalSortExpr,
295    partition_by: &[Arc<dyn PhysicalExpr>],
296) -> Result<()> {
297    if partition_by.is_empty() {
298        // In the absence of a PARTITION BY, ordering of `self.expr` is global:
299        eqp.add_ordering([expr]);
300    } else {
301        // If we have a PARTITION BY, standard functions can not introduce
302        // a global ordering unless the existing ordering is compatible
303        // with PARTITION BY expressions. To elaborate, when PARTITION BY
304        // expressions and existing ordering expressions are equal (w.r.t.
305        // set equality), we can prefix the ordering of `self.expr` with
306        // the existing ordering.
307        let (mut ordering, _) = eqp.find_longest_permutation(partition_by)?;
308        if ordering.len() == partition_by.len() {
309            ordering.push(expr);
310            eqp.add_ordering(ordering);
311        }
312    }
313    Ok(())
314}