datafusion_expr/partition_evaluator.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partition evaluation module
19
20use arrow::array::ArrayRef;
21use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err};
22use std::fmt::Debug;
23use std::ops::Range;
24
25use crate::window_state::WindowAggState;
26
27/// Partition evaluator for Window Functions
28///
29/// # Background
30///
31/// An implementation of this trait is created and used for each
32/// partition defined by an `OVER` clause and is instantiated by
33/// the DataFusion runtime.
34///
35/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
36/// on the following data:
37///
38/// ```text
39/// col | val
40/// --- + ----
41/// A | 10
42/// A | 10
43/// C | 20
44/// D | 30
45/// D | 30
46/// ```
47///
48/// Will instantiate three `PartitionEvaluator`s, one each for the
49/// partitions defined by `col=A`, `col=B`, and `col=C`.
50///
51/// ```text
52/// col | val
53/// --- + ----
54/// A | 10 <--- partition 1
55/// A | 10
56///
57/// col | val
58/// --- + ----
59/// C | 20 <--- partition 2
60///
61/// col | val
62/// --- + ----
63/// D | 30 <--- partition 3
64/// D | 30
65/// ```
66///
67/// Different methods on this trait will be called depending on the
68/// capabilities described by [`supports_bounded_execution`],
69/// [`uses_window_frame`], and [`include_rank`],
70///
71/// When implementing a new `PartitionEvaluator`, implement
72/// corresponding evaluator according to table below.
73///
74/// # Implementation Table
75///
76/// |[`uses_window_frame`]|[`supports_bounded_execution`]|[`include_rank`]|function_to_implement|
77/// |---|---|----|----|
78/// |false (default) |false (default) |false (default) | [`evaluate_all`] |
79/// |false |true |false | [`evaluate`] |
80/// |false |true/false |true | [`evaluate_all_with_rank`] |
81/// |true |true/false |true/false | [`evaluate`] |
82///
83/// [`evaluate`]: Self::evaluate
84/// [`evaluate_all`]: Self::evaluate_all
85/// [`evaluate_all_with_rank`]: Self::evaluate_all_with_rank
86/// [`uses_window_frame`]: Self::uses_window_frame
87/// [`include_rank`]: Self::include_rank
88/// [`supports_bounded_execution`]: Self::supports_bounded_execution
89///
90/// For more background, please also see the [User defined Window Functions in DataFusion blog]
91///
92/// [User defined Window Functions in DataFusion blog]: https://datafusion.apache.org/blog/2025/04/19/user-defined-window-functions
93pub trait PartitionEvaluator: Debug + Send {
94 /// When the window frame has a fixed beginning (e.g UNBOUNDED
95 /// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
96 /// NTH_VALUE do not need the (unbounded) input once they have
97 /// seen a certain amount of input.
98 ///
99 /// `memoize` is called after each input batch is processed, and
100 /// such functions can save whatever they need and modify
101 /// [`WindowAggState`] appropriately to allow rows to be pruned
102 fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
103 Ok(())
104 }
105
106 /// If `uses_window_frame` flag is `false`. This method is used to
107 /// calculate required range for the window function during
108 /// stateful execution.
109 ///
110 /// Generally there is no required range, hence by default this
111 /// returns smallest range(current row). e.g seeing current row is
112 /// enough to calculate window result (such as row_number, rank,
113 /// etc)
114 fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
115 if self.uses_window_frame() {
116 exec_err!("Range should be calculated from window frame")
117 } else {
118 Ok(Range {
119 start: idx,
120 end: idx + 1,
121 })
122 }
123 }
124
125 /// Get whether evaluator needs future data for its result (if so returns `false`) or not
126 fn is_causal(&self) -> bool {
127 false
128 }
129
130 /// Evaluate a window function on an entire input partition.
131 ///
132 /// This function is called once per input *partition* for window
133 /// functions that *do not use* values from the window frame,
134 /// such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`,
135 /// `CUME_DIST`, `LEAD`, `LAG`).
136 ///
137 /// It produces the result of all rows in a single pass. It
138 /// expects to receive the entire partition as the `value` and
139 /// must produce an output column with one output row for every
140 /// input row.
141 ///
142 /// `num_rows` is required to correctly compute the output in case
143 /// `values.len() == 0`
144 ///
145 /// Implementing this function is an optimization: certain window
146 /// functions are not affected by the window frame definition or
147 /// the query doesn't have a frame, and `evaluate` skips the
148 /// (costly) window frame boundary calculation and the overhead of
149 /// calling `evaluate` for each output row.
150 ///
151 /// For example, the `LAG` built in window function does not use
152 /// the values of its window frame (it can be computed in one shot
153 /// on the entire partition with `Self::evaluate_all` regardless of the
154 /// window defined in the `OVER` clause)
155 ///
156 /// ```sql
157 /// lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
158 /// ```
159 ///
160 /// However, `avg()` computes the average in the window and thus
161 /// does use its window frame
162 ///
163 /// ```sql
164 /// avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
165 /// ```
166 fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
167 // When window frame boundaries are not used and evaluator supports bounded execution
168 // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
169 // If user wants to implement more efficient version, this method should be overwritten
170 // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
171 if !self.uses_window_frame() && self.supports_bounded_execution() {
172 let res = (0..num_rows)
173 .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
174 .collect::<Result<Vec<_>>>()?;
175 ScalarValue::iter_to_array(res)
176 } else {
177 not_impl_err!("evaluate_all is not implemented by default")
178 }
179 }
180
181 /// Evaluate window function on a range of rows in an input
182 /// partition.
183 ///
184 /// This is the simplest and most general function to implement
185 /// but also the least performant as it creates output one row at
186 /// a time. It is typically much faster to implement stateful
187 /// evaluation using one of the other specialized methods on this
188 /// trait.
189 ///
190 /// Returns a [`ScalarValue`] that is the value of the window
191 /// function within `range` for the entire partition. Argument
192 /// `values` contains the evaluation result of function arguments
193 /// and evaluation results of ORDER BY expressions. If function has a
194 /// single argument, `values[1..]` will contain ORDER BY expression results.
195 fn evaluate(
196 &mut self,
197 _values: &[ArrayRef],
198 _range: &Range<usize>,
199 ) -> Result<ScalarValue> {
200 not_impl_err!("evaluate is not implemented by default")
201 }
202
203 /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
204 /// functions that only need the rank of a row within its window
205 /// frame.
206 ///
207 /// Evaluate the partition evaluator against the partition using
208 /// the row ranks. For example, `RANK(col)` produces
209 ///
210 /// ```text
211 /// col | rank
212 /// --- + ----
213 /// A | 1
214 /// A | 1
215 /// C | 3
216 /// D | 4
217 /// D | 4
218 /// ```
219 ///
220 /// For this case, `num_rows` would be `5` and the
221 /// `ranks_in_partition` would be called with
222 ///
223 /// ```text
224 /// [
225 /// (0,1),
226 /// (2,2),
227 /// (3,4),
228 /// ]
229 /// ```
230 fn evaluate_all_with_rank(
231 &self,
232 _num_rows: usize,
233 _ranks_in_partition: &[Range<usize>],
234 ) -> Result<ArrayRef> {
235 not_impl_err!("evaluate_partition_with_rank is not implemented by default")
236 }
237
238 /// Can the window function be incrementally computed using
239 /// bounded memory?
240 ///
241 /// See the table on [`Self`] for what functions to implement
242 fn supports_bounded_execution(&self) -> bool {
243 false
244 }
245
246 /// Does the window function use the values from the window frame,
247 /// if one is specified?
248 ///
249 /// See the table on [`Self`] for what functions to implement
250 fn uses_window_frame(&self) -> bool {
251 false
252 }
253
254 /// Can this function be evaluated with (only) rank
255 ///
256 /// See the table on [`Self`] for what functions to implement
257 fn include_rank(&self) -> bool {
258 false
259 }
260}