Skip to main content

datafusion_expr/
partition_evaluator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partition evaluation module
19
20use arrow::array::ArrayRef;
21use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err};
22use std::fmt::Debug;
23use std::ops::Range;
24
25use crate::window_state::WindowAggState;
26
27/// Partition evaluator for Window Functions
28///
29/// # Background
30///
31/// An implementation of this trait is created and used for each
32/// partition defined by an `OVER` clause and is instantiated by
33/// the DataFusion runtime.
34///
35/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
36/// on the following data:
37///
38/// ```text
39/// col | val
40/// --- + ----
41///  A  | 10
42///  A  | 10
43///  C  | 20
44///  D  | 30
45///  D  | 30
46/// ```
47///
48/// Will instantiate three `PartitionEvaluator`s, one each for the
49/// partitions defined by `col=A`, `col=B`, and `col=C`.
50///
51/// ```text
52/// col | val
53/// --- + ----
54///  A  | 10     <--- partition 1
55///  A  | 10
56///
57/// col | val
58/// --- + ----
59///  C  | 20     <--- partition 2
60///
61/// col | val
62/// --- + ----
63///  D  | 30     <--- partition 3
64///  D  | 30
65/// ```
66///
67/// Different methods on this trait will be called depending on the
68/// capabilities described by [`supports_bounded_execution`],
69/// [`uses_window_frame`], and [`include_rank`],
70///
71/// When implementing a new `PartitionEvaluator`, implement
72/// corresponding evaluator according to table below.
73///
74/// # Implementation Table
75///
76/// |[`uses_window_frame`]|[`supports_bounded_execution`]|[`include_rank`]|function_to_implement|
77/// |---|---|----|----|
78/// |false (default)      |false (default)               |false (default)   | [`evaluate_all`]           |
79/// |false                |true                          |false             | [`evaluate`]               |
80/// |false                |true/false                    |true              | [`evaluate_all_with_rank`] |
81/// |true                 |true/false                    |true/false        | [`evaluate`]               |
82///
83/// [`evaluate`]: Self::evaluate
84/// [`evaluate_all`]: Self::evaluate_all
85/// [`evaluate_all_with_rank`]: Self::evaluate_all_with_rank
86/// [`uses_window_frame`]: Self::uses_window_frame
87/// [`include_rank`]: Self::include_rank
88/// [`supports_bounded_execution`]: Self::supports_bounded_execution
89///
90/// For more background, please also see the [User defined Window Functions in DataFusion blog]
91///
92/// [User defined Window Functions in DataFusion blog]: https://datafusion.apache.org/blog/2025/04/19/user-defined-window-functions
93pub trait PartitionEvaluator: Debug + Send {
94    /// When the window frame has a fixed beginning (e.g UNBOUNDED
95    /// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
96    /// NTH_VALUE do not need the (unbounded) input once they have
97    /// seen a certain amount of input.
98    ///
99    /// `memoize` is called after each input batch is processed, and
100    /// such functions can save whatever they need and modify
101    /// [`WindowAggState`] appropriately to allow rows to be pruned
102    fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
103        Ok(())
104    }
105
106    /// If `uses_window_frame` flag is `false`. This method is used to
107    /// calculate required range for the window function during
108    /// stateful execution.
109    ///
110    /// Generally there is no required range, hence by default this
111    /// returns smallest range(current row). e.g seeing current row is
112    /// enough to calculate window result (such as row_number, rank,
113    /// etc)
114    fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
115        if self.uses_window_frame() {
116            exec_err!("Range should be calculated from window frame")
117        } else {
118            Ok(Range {
119                start: idx,
120                end: idx + 1,
121            })
122        }
123    }
124
125    /// Get whether evaluator needs future data for its result (if so returns `false`) or not
126    fn is_causal(&self) -> bool {
127        false
128    }
129
130    /// Evaluate a window function on an entire input partition.
131    ///
132    /// This function is called once per input *partition* for window
133    /// functions that *do not use* values from the window frame,
134    /// such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`,
135    /// `CUME_DIST`, `LEAD`, `LAG`).
136    ///
137    /// It produces the result of all rows in a single pass. It
138    /// expects to receive the entire partition as the `value` and
139    /// must produce an output column with one output row for every
140    /// input row.
141    ///
142    /// `num_rows` is required to correctly compute the output in case
143    /// `values.len() == 0`
144    ///
145    /// Implementing this function is an optimization: certain window
146    /// functions are not affected by the window frame definition or
147    /// the query doesn't have a frame, and `evaluate` skips the
148    /// (costly) window frame boundary calculation and the overhead of
149    /// calling `evaluate` for each output row.
150    ///
151    /// For example, the `LAG` built in window function does not use
152    /// the values of its window frame (it can be computed in one shot
153    /// on the entire partition with `Self::evaluate_all` regardless of the
154    /// window defined in the `OVER` clause)
155    ///
156    /// ```sql
157    /// lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
158    /// ```
159    ///
160    /// However, `avg()` computes the average in the window and thus
161    /// does use its window frame
162    ///
163    /// ```sql
164    /// avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
165    /// ```
166    fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
167        // When window frame boundaries are not used and evaluator supports bounded execution
168        // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
169        // If user wants to implement more efficient version, this method should be overwritten
170        // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
171        if !self.uses_window_frame() && self.supports_bounded_execution() {
172            let res = (0..num_rows)
173                .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
174                .collect::<Result<Vec<_>>>()?;
175            ScalarValue::iter_to_array(res)
176        } else {
177            not_impl_err!("evaluate_all is not implemented by default")
178        }
179    }
180
181    /// Evaluate window function on a range of rows in an input
182    /// partition.
183    ///
184    /// This is the simplest and most general function to implement
185    /// but also the least performant as it creates output one row at
186    /// a time. It is typically much faster to implement stateful
187    /// evaluation using one of the other specialized methods on this
188    /// trait.
189    ///
190    /// Returns a [`ScalarValue`] that is the value of the window
191    /// function within `range` for the entire partition. Argument
192    /// `values` contains the evaluation result of function arguments
193    /// and evaluation results of ORDER BY expressions. If function has a
194    /// single argument, `values[1..]` will contain ORDER BY expression results.
195    fn evaluate(
196        &mut self,
197        _values: &[ArrayRef],
198        _range: &Range<usize>,
199    ) -> Result<ScalarValue> {
200        not_impl_err!("evaluate is not implemented by default")
201    }
202
203    /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
204    /// functions that only need the rank of a row within its window
205    /// frame.
206    ///
207    /// Evaluate the partition evaluator against the partition using
208    /// the row ranks. For example, `RANK(col)` produces
209    ///
210    /// ```text
211    /// col | rank
212    /// --- + ----
213    ///  A  | 1
214    ///  A  | 1
215    ///  C  | 3
216    ///  D  | 4
217    ///  D  | 4
218    /// ```
219    ///
220    /// For this case, `num_rows` would be `5` and the
221    /// `ranks_in_partition` would be called with
222    ///
223    /// ```text
224    /// [
225    ///   (0,1),
226    ///   (2,2),
227    ///   (3,4),
228    /// ]
229    /// ```
230    fn evaluate_all_with_rank(
231        &self,
232        _num_rows: usize,
233        _ranks_in_partition: &[Range<usize>],
234    ) -> Result<ArrayRef> {
235        not_impl_err!("evaluate_partition_with_rank is not implemented by default")
236    }
237
238    /// Can the window function be incrementally computed using
239    /// bounded memory?
240    ///
241    /// See the table on [`Self`] for what functions to implement
242    fn supports_bounded_execution(&self) -> bool {
243        false
244    }
245
246    /// Does the window function use the values from the window frame,
247    /// if one is specified?
248    ///
249    /// See the table on [`Self`] for what functions to implement
250    fn uses_window_frame(&self) -> bool {
251        false
252    }
253
254    /// Can this function be evaluated with (only) rank
255    ///
256    /// See the table on [`Self`] for what functions to implement
257    fn include_rank(&self) -> bool {
258        false
259    }
260}