datafusion_physical_expr/window/
standard.rs1use std::any::Any;
21use std::ops::Range;
22use std::sync::Arc;
23
24use super::{StandardWindowFunctionExpr, WindowExpr};
25use crate::window::window_expr::{get_orderby_values, WindowFn};
26use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
27use crate::{EquivalenceProperties, PhysicalExpr};
28
29use arrow::array::{new_empty_array, ArrayRef};
30use arrow::datatypes::FieldRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::utils::evaluate_partition_ranges;
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
35use datafusion_expr::WindowFrame;
36use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
37
38#[derive(Debug)]
40pub struct StandardWindowExpr {
41 expr: Arc<dyn StandardWindowFunctionExpr>,
42 partition_by: Vec<Arc<dyn PhysicalExpr>>,
43 order_by: Vec<PhysicalSortExpr>,
44 window_frame: Arc<WindowFrame>,
45}
46
47impl StandardWindowExpr {
48 pub fn new(
50 expr: Arc<dyn StandardWindowFunctionExpr>,
51 partition_by: &[Arc<dyn PhysicalExpr>],
52 order_by: &[PhysicalSortExpr],
53 window_frame: Arc<WindowFrame>,
54 ) -> Self {
55 Self {
56 expr,
57 partition_by: partition_by.to_vec(),
58 order_by: order_by.to_vec(),
59 window_frame,
60 }
61 }
62
63 pub fn get_standard_func_expr(&self) -> &Arc<dyn StandardWindowFunctionExpr> {
65 &self.expr
66 }
67
68 pub fn add_equal_orderings(
74 &self,
75 eq_properties: &mut EquivalenceProperties,
76 ) -> Result<()> {
77 let schema = eq_properties.schema();
78 if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
79 add_new_ordering_expr_with_partition_by(
80 eq_properties,
81 fn_res_ordering,
82 &self.partition_by,
83 )?;
84 }
85 Ok(())
86 }
87}
88
89impl WindowExpr for StandardWindowExpr {
90 fn as_any(&self) -> &dyn Any {
92 self
93 }
94
95 fn name(&self) -> &str {
96 self.expr.name()
97 }
98
99 fn field(&self) -> Result<FieldRef> {
100 self.expr.field()
101 }
102
103 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
104 self.expr.expressions()
105 }
106
107 fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
108 &self.partition_by
109 }
110
111 fn order_by(&self) -> &[PhysicalSortExpr] {
112 &self.order_by
113 }
114
115 fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
116 let mut evaluator = self.expr.create_evaluator()?;
117 let num_rows = batch.num_rows();
118 if evaluator.uses_window_frame() {
119 let sort_options = self.order_by.iter().map(|o| o.options).collect();
120 let mut row_wise_results = vec![];
121
122 let mut values = self.evaluate_args(batch)?;
123 let order_bys = get_orderby_values(self.order_by_columns(batch)?);
124 let n_args = values.len();
125 values.extend(order_bys);
126 let order_bys_ref = &values[n_args..];
127
128 let mut window_frame_ctx =
129 WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options);
130 let mut last_range = Range { start: 0, end: 0 };
131 for idx in 0..num_rows {
133 let range = window_frame_ctx.calculate_range(
134 order_bys_ref,
135 &last_range,
136 num_rows,
137 idx,
138 )?;
139 let value = evaluator.evaluate(&values, &range)?;
140 row_wise_results.push(value);
141 last_range = range;
142 }
143 ScalarValue::iter_to_array(row_wise_results)
144 } else if evaluator.include_rank() {
145 let columns = self.order_by_columns(batch)?;
146 let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
147 evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
148 } else {
149 let values = self.evaluate_args(batch)?;
150 evaluator.evaluate_all(&values, num_rows)
151 }
152 }
153
154 fn evaluate_stateful(
157 &self,
158 partition_batches: &PartitionBatches,
159 window_agg_state: &mut PartitionWindowAggStates,
160 ) -> Result<()> {
161 let field = self.expr.field()?;
162 let out_type = field.data_type();
163 let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
164 for (partition_row, partition_batch_state) in partition_batches.iter() {
165 let window_state =
166 if let Some(window_state) = window_agg_state.get_mut(partition_row) {
167 window_state
168 } else {
169 let evaluator = self.expr.create_evaluator()?;
170 window_agg_state
171 .entry(partition_row.clone())
172 .or_insert(WindowState {
173 state: WindowAggState::new(out_type)?,
174 window_fn: WindowFn::Builtin(evaluator),
175 })
176 };
177 let evaluator = match &mut window_state.window_fn {
178 WindowFn::Builtin(evaluator) => evaluator,
179 _ => unreachable!(),
180 };
181 let state = &mut window_state.state;
182
183 let batch_ref = &partition_batch_state.record_batch;
184 let mut values = self.evaluate_args(batch_ref)?;
185 let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
186 get_orderby_values(self.order_by_columns(batch_ref)?)
187 } else {
188 vec![]
189 };
190 let n_args = values.len();
191 values.extend(order_bys);
192 let order_bys_ref = &values[n_args..];
193
194 let record_batch = &partition_batch_state.record_batch;
196 let num_rows = record_batch.num_rows();
197 let mut row_wise_results: Vec<ScalarValue> = vec![];
198 let is_causal = if evaluator.uses_window_frame() {
199 self.window_frame.is_causal()
200 } else {
201 evaluator.is_causal()
202 };
203 for idx in state.last_calculated_index..num_rows {
204 let frame_range = if evaluator.uses_window_frame() {
205 state
206 .window_frame_ctx
207 .get_or_insert_with(|| {
208 WindowFrameContext::new(
209 Arc::clone(&self.window_frame),
210 sort_options.clone(),
211 )
212 })
213 .calculate_range(
214 order_bys_ref,
215 &state.window_frame_range,
217 num_rows,
218 idx,
219 )
220 } else {
221 evaluator.get_range(idx, num_rows)
222 }?;
223
224 if frame_range.end == num_rows
226 && !is_causal
227 && !partition_batch_state.is_end
228 {
229 break;
230 }
231 state.window_frame_range = frame_range;
233 row_wise_results
234 .push(evaluator.evaluate(&values, &state.window_frame_range)?);
235 }
236 let out_col = if row_wise_results.is_empty() {
237 new_empty_array(out_type)
238 } else {
239 ScalarValue::iter_to_array(row_wise_results.into_iter())?
240 };
241
242 state.update(&out_col, partition_batch_state)?;
243 if self.window_frame.start_bound.is_unbounded() {
244 evaluator.memoize(state)?;
245 }
246 }
247 Ok(())
248 }
249
250 fn get_window_frame(&self) -> &Arc<WindowFrame> {
251 &self.window_frame
252 }
253
254 fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
255 self.expr.reverse_expr().map(|reverse_expr| {
256 Arc::new(StandardWindowExpr::new(
257 reverse_expr,
258 &self.partition_by.clone(),
259 &self
260 .order_by
261 .iter()
262 .map(|e| e.reverse())
263 .collect::<Vec<_>>(),
264 Arc::new(self.window_frame.reverse()),
265 )) as _
266 })
267 }
268
269 fn uses_bounded_memory(&self) -> bool {
270 if let Ok(evaluator) = self.expr.create_evaluator() {
271 evaluator.supports_bounded_execution()
272 && (!evaluator.uses_window_frame()
273 || !self.window_frame.end_bound.is_unbounded())
274 } else {
275 false
276 }
277 }
278
279 fn create_window_fn(&self) -> Result<WindowFn> {
280 Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
281 }
282}
283
284pub(crate) fn add_new_ordering_expr_with_partition_by(
287 eqp: &mut EquivalenceProperties,
288 expr: PhysicalSortExpr,
289 partition_by: &[Arc<dyn PhysicalExpr>],
290) -> Result<()> {
291 if partition_by.is_empty() {
292 eqp.add_ordering([expr]);
294 } else {
295 let (mut ordering, _) = eqp.find_longest_permutation(partition_by)?;
302 if ordering.len() == partition_by.len() {
303 ordering.push(expr);
304 eqp.add_ordering(ordering);
305 }
306 }
307 Ok(())
308}