datafusion_physical_expr/window/
standard.rs1use std::any::Any;
21use std::ops::Range;
22use std::sync::Arc;
23
24use super::{StandardWindowFunctionExpr, WindowExpr};
25use crate::window::window_expr::{get_orderby_values, WindowFn};
26use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
27use crate::{EquivalenceProperties, PhysicalExpr};
28
29use arrow::array::{new_empty_array, ArrayRef};
30use arrow::datatypes::FieldRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::utils::evaluate_partition_ranges;
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
35use datafusion_expr::WindowFrame;
36use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
37
38#[derive(Debug)]
40pub struct StandardWindowExpr {
41 expr: Arc<dyn StandardWindowFunctionExpr>,
42 partition_by: Vec<Arc<dyn PhysicalExpr>>,
43 order_by: Vec<PhysicalSortExpr>,
44 window_frame: Arc<WindowFrame>,
45}
46
47impl StandardWindowExpr {
48 pub fn new(
50 expr: Arc<dyn StandardWindowFunctionExpr>,
51 partition_by: &[Arc<dyn PhysicalExpr>],
52 order_by: &[PhysicalSortExpr],
53 window_frame: Arc<WindowFrame>,
54 ) -> Self {
55 Self {
56 expr,
57 partition_by: partition_by.to_vec(),
58 order_by: order_by.to_vec(),
59 window_frame,
60 }
61 }
62
63 pub fn get_standard_func_expr(&self) -> &Arc<dyn StandardWindowFunctionExpr> {
65 &self.expr
66 }
67
68 pub fn add_equal_orderings(
74 &self,
75 eq_properties: &mut EquivalenceProperties,
76 ) -> Result<()> {
77 let schema = eq_properties.schema();
78 if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
79 add_new_ordering_expr_with_partition_by(
80 eq_properties,
81 fn_res_ordering,
82 &self.partition_by,
83 )?;
84 }
85 Ok(())
86 }
87}
88
89impl WindowExpr for StandardWindowExpr {
90 fn as_any(&self) -> &dyn Any {
92 self
93 }
94
95 fn name(&self) -> &str {
96 self.expr.name()
97 }
98
99 fn field(&self) -> Result<FieldRef> {
100 self.expr.field()
101 }
102
103 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
104 self.expr.expressions()
105 }
106
107 fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
108 &self.partition_by
109 }
110
111 fn order_by(&self) -> &[PhysicalSortExpr] {
112 &self.order_by
113 }
114
115 fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
116 let mut evaluator = self.expr.create_evaluator()?;
117 let num_rows = batch.num_rows();
118 if evaluator.uses_window_frame() {
119 let sort_options = self.order_by.iter().map(|o| o.options).collect();
120 let mut row_wise_results = vec![];
121
122 let mut values = self.evaluate_args(batch)?;
123 let order_bys = get_orderby_values(self.order_by_columns(batch)?);
124 let n_args = values.len();
125 values.extend(order_bys);
126 let order_bys_ref = &values[n_args..];
127
128 let mut window_frame_ctx =
129 WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options);
130 let mut last_range = Range { start: 0, end: 0 };
131 for idx in 0..num_rows {
133 let range = window_frame_ctx.calculate_range(
134 order_bys_ref,
135 &last_range,
136 num_rows,
137 idx,
138 )?;
139 let value = evaluator.evaluate(&values, &range)?;
140 row_wise_results.push(value);
141 last_range = range;
142 }
143 ScalarValue::iter_to_array(row_wise_results)
144 } else if evaluator.include_rank() {
145 let columns = self.order_by_columns(batch)?;
146 let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
147 evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
148 } else {
149 let values = self.evaluate_args(batch)?;
150 evaluator.evaluate_all(&values, num_rows)
151 }
152 }
153
154 fn evaluate_stateful(
157 &self,
158 partition_batches: &PartitionBatches,
159 window_agg_state: &mut PartitionWindowAggStates,
160 ) -> Result<()> {
161 let field = self.expr.field()?;
162 let out_type = field.data_type();
163 let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
164 let new_state = WindowAggState::new(out_type)?;
167 for (partition_row, partition_batch_state) in partition_batches.iter() {
168 let window_state =
169 if let Some(window_state) = window_agg_state.get_mut(partition_row) {
170 window_state
171 } else {
172 let evaluator = self.expr.create_evaluator()?;
173 window_agg_state
174 .entry(partition_row.clone())
175 .or_insert(WindowState {
176 state: new_state.clone(),
177 window_fn: WindowFn::Builtin(evaluator),
178 })
179 };
180 let evaluator = match &mut window_state.window_fn {
181 WindowFn::Builtin(evaluator) => evaluator,
182 _ => unreachable!(),
183 };
184 let state = &mut window_state.state;
185
186 let batch_ref = &partition_batch_state.record_batch;
187 let mut values = self.evaluate_args(batch_ref)?;
188 let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
189 get_orderby_values(self.order_by_columns(batch_ref)?)
190 } else {
191 vec![]
192 };
193 let n_args = values.len();
194 values.extend(order_bys);
195 let order_bys_ref = &values[n_args..];
196
197 let record_batch = &partition_batch_state.record_batch;
199 let num_rows = record_batch.num_rows();
200 let mut row_wise_results: Vec<ScalarValue> = vec![];
201 let is_causal = if evaluator.uses_window_frame() {
202 self.window_frame.is_causal()
203 } else {
204 evaluator.is_causal()
205 };
206 for idx in state.last_calculated_index..num_rows {
207 let frame_range = if evaluator.uses_window_frame() {
208 state
209 .window_frame_ctx
210 .get_or_insert_with(|| {
211 WindowFrameContext::new(
212 Arc::clone(&self.window_frame),
213 sort_options.clone(),
214 )
215 })
216 .calculate_range(
217 order_bys_ref,
218 &state.window_frame_range,
220 num_rows,
221 idx,
222 )
223 } else {
224 evaluator.get_range(idx, num_rows)
225 }?;
226
227 if frame_range.end == num_rows
229 && !is_causal
230 && !partition_batch_state.is_end
231 {
232 break;
233 }
234 state.window_frame_range = frame_range;
236 row_wise_results
237 .push(evaluator.evaluate(&values, &state.window_frame_range)?);
238 }
239 let out_col = if row_wise_results.is_empty() {
240 new_empty_array(out_type)
241 } else if row_wise_results.len() == 1 {
242 row_wise_results[0].to_array()?
244 } else {
245 ScalarValue::iter_to_array(row_wise_results.into_iter())?
246 };
247
248 state.update(&out_col, partition_batch_state)?;
249 if self.window_frame.start_bound.is_unbounded() {
250 evaluator.memoize(state)?;
251 }
252 }
253 Ok(())
254 }
255
256 fn get_window_frame(&self) -> &Arc<WindowFrame> {
257 &self.window_frame
258 }
259
260 fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
261 self.expr.reverse_expr().map(|reverse_expr| {
262 Arc::new(StandardWindowExpr::new(
263 reverse_expr,
264 &self.partition_by.clone(),
265 &self
266 .order_by
267 .iter()
268 .map(|e| e.reverse())
269 .collect::<Vec<_>>(),
270 Arc::new(self.window_frame.reverse()),
271 )) as _
272 })
273 }
274
275 fn uses_bounded_memory(&self) -> bool {
276 if let Ok(evaluator) = self.expr.create_evaluator() {
277 evaluator.supports_bounded_execution()
278 && (!evaluator.uses_window_frame()
279 || !self.window_frame.end_bound.is_unbounded())
280 } else {
281 false
282 }
283 }
284
285 fn create_window_fn(&self) -> Result<WindowFn> {
286 Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
287 }
288}
289
290pub(crate) fn add_new_ordering_expr_with_partition_by(
293 eqp: &mut EquivalenceProperties,
294 expr: PhysicalSortExpr,
295 partition_by: &[Arc<dyn PhysicalExpr>],
296) -> Result<()> {
297 if partition_by.is_empty() {
298 eqp.add_ordering([expr]);
300 } else {
301 let (mut ordering, _) = eqp.find_longest_permutation(partition_by)?;
308 if ordering.len() == partition_by.len() {
309 ordering.push(expr);
310 eqp.add_ordering(ordering);
311 }
312 }
313 Ok(())
314}