1use std::any::Any;
24use std::cmp::{min, Ordering};
25use std::collections::VecDeque;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use super::utils::create_schema;
31use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use crate::windows::{
33 calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
34 window_equivalence_properties,
35};
36use crate::{
37 ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
38 ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
39 SendableRecordBatchStream, Statistics, WindowExpr,
40};
41use ahash::RandomState;
42use arrow::compute::take_record_batch;
43use arrow::{
44 array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
45 compute::{concat, concat_batches, sort_to_indices, take_arrays},
46 datatypes::SchemaRef,
47 record_batch::RecordBatch,
48};
49use datafusion_common::hash_utils::create_hashes;
50use datafusion_common::stats::Precision;
51use datafusion_common::utils::{
52 evaluate_partition_ranges, get_at_indices, get_row_at_idx,
53};
54use datafusion_common::{
55 arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result,
56};
57use datafusion_execution::TaskContext;
58use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
59use datafusion_expr::ColumnarValue;
60use datafusion_physical_expr::window::{
61 PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
62};
63use datafusion_physical_expr::PhysicalExpr;
64use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
65
66use futures::stream::Stream;
67use futures::{ready, StreamExt};
68use hashbrown::hash_table::HashTable;
69use indexmap::IndexMap;
70use log::debug;
71
72#[derive(Debug, Clone)]
74pub struct BoundedWindowAggExec {
75 input: Arc<dyn ExecutionPlan>,
77 window_expr: Vec<Arc<dyn WindowExpr>>,
79 schema: SchemaRef,
81 metrics: ExecutionPlanMetricsSet,
83 pub input_order_mode: InputOrderMode,
85 ordered_partition_by_indices: Vec<usize>,
92 cache: PlanProperties,
94 can_repartition: bool,
96}
97
98impl BoundedWindowAggExec {
99 pub fn try_new(
101 window_expr: Vec<Arc<dyn WindowExpr>>,
102 input: Arc<dyn ExecutionPlan>,
103 input_order_mode: InputOrderMode,
104 can_repartition: bool,
105 ) -> Result<Self> {
106 let schema = create_schema(&input.schema(), &window_expr)?;
107 let schema = Arc::new(schema);
108 let partition_by_exprs = window_expr[0].partition_by();
109 let ordered_partition_by_indices = match &input_order_mode {
110 InputOrderMode::Sorted => {
111 let indices = get_ordered_partition_by_indices(
112 window_expr[0].partition_by(),
113 &input,
114 );
115 if indices.len() == partition_by_exprs.len() {
116 indices
117 } else {
118 (0..partition_by_exprs.len()).collect::<Vec<_>>()
119 }
120 }
121 InputOrderMode::PartiallySorted(ordered_indices) => ordered_indices.clone(),
122 InputOrderMode::Linear => {
123 vec![]
124 }
125 };
126 let cache = Self::compute_properties(&input, &schema, &window_expr);
127 Ok(Self {
128 input,
129 window_expr,
130 schema,
131 metrics: ExecutionPlanMetricsSet::new(),
132 input_order_mode,
133 ordered_partition_by_indices,
134 cache,
135 can_repartition,
136 })
137 }
138
139 pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
141 &self.window_expr
142 }
143
144 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
146 &self.input
147 }
148
149 pub fn partition_by_sort_keys(&self) -> Result<LexOrdering> {
155 let partition_by = self.window_expr()[0].partition_by();
156 get_partition_by_sort_exprs(
157 &self.input,
158 partition_by,
159 &self.ordered_partition_by_indices,
160 )
161 }
162
163 fn get_search_algo(&self) -> Result<Box<dyn PartitionSearcher>> {
166 let partition_by_sort_keys = self.partition_by_sort_keys()?;
167 let ordered_partition_by_indices = self.ordered_partition_by_indices.clone();
168 let input_schema = self.input().schema();
169 Ok(match &self.input_order_mode {
170 InputOrderMode::Sorted => {
171 if self.window_expr()[0].partition_by().len()
173 != ordered_partition_by_indices.len()
174 {
175 return exec_err!("All partition by columns should have an ordering in Sorted mode.");
176 }
177 Box::new(SortedSearch {
178 partition_by_sort_keys,
179 ordered_partition_by_indices,
180 input_schema,
181 })
182 }
183 InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => Box::new(
184 LinearSearch::new(ordered_partition_by_indices, input_schema),
185 ),
186 })
187 }
188
189 fn compute_properties(
191 input: &Arc<dyn ExecutionPlan>,
192 schema: &SchemaRef,
193 window_exprs: &[Arc<dyn WindowExpr>],
194 ) -> PlanProperties {
195 let eq_properties = window_equivalence_properties(schema, input, window_exprs);
197
198 let output_partitioning = input.output_partitioning().clone();
202
203 PlanProperties::new(
205 eq_properties,
206 output_partitioning,
207 input.pipeline_behavior(),
209 input.boundedness(),
210 )
211 }
212
213 pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
214 if !self.can_repartition {
215 vec![]
216 } else {
217 let all_partition_keys = self
218 .window_expr()
219 .iter()
220 .map(|expr| expr.partition_by().to_vec())
221 .collect::<Vec<_>>();
222
223 all_partition_keys
224 .into_iter()
225 .min_by_key(|s| s.len())
226 .unwrap_or_else(Vec::new)
227 }
228 }
229}
230
231impl DisplayAs for BoundedWindowAggExec {
232 fn fmt_as(
233 &self,
234 t: DisplayFormatType,
235 f: &mut std::fmt::Formatter,
236 ) -> std::fmt::Result {
237 match t {
238 DisplayFormatType::Default | DisplayFormatType::Verbose => {
239 write!(f, "BoundedWindowAggExec: ")?;
240 let g: Vec<String> = self
241 .window_expr
242 .iter()
243 .map(|e| {
244 format!(
245 "{}: {:?}, frame: {:?}",
246 e.name().to_owned(),
247 e.field(),
248 e.get_window_frame()
249 )
250 })
251 .collect();
252 let mode = &self.input_order_mode;
253 write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
254 }
255 DisplayFormatType::TreeRender => {
256 let g: Vec<String> = self
257 .window_expr
258 .iter()
259 .map(|e| e.name().to_owned().to_string())
260 .collect();
261 writeln!(f, "select_list={}", g.join(", "))?;
262
263 let mode = &self.input_order_mode;
264 writeln!(f, "mode={:?}", mode)?;
265 }
266 }
267 Ok(())
268 }
269}
270
271impl ExecutionPlan for BoundedWindowAggExec {
272 fn name(&self) -> &'static str {
273 "BoundedWindowAggExec"
274 }
275
276 fn as_any(&self) -> &dyn Any {
278 self
279 }
280
281 fn properties(&self) -> &PlanProperties {
282 &self.cache
283 }
284
285 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
286 vec![&self.input]
287 }
288
289 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
290 let partition_bys = self.window_expr()[0].partition_by();
291 let order_keys = self.window_expr()[0].order_by();
292 let partition_bys = self
293 .ordered_partition_by_indices
294 .iter()
295 .map(|idx| &partition_bys[*idx]);
296 vec![calc_requirements(partition_bys, order_keys.iter())]
297 }
298
299 fn required_input_distribution(&self) -> Vec<Distribution> {
300 if self.partition_keys().is_empty() {
301 debug!("No partition defined for BoundedWindowAggExec!!!");
302 vec![Distribution::SinglePartition]
303 } else {
304 vec![Distribution::HashPartitioned(self.partition_keys().clone())]
305 }
306 }
307
308 fn maintains_input_order(&self) -> Vec<bool> {
309 vec![true]
310 }
311
312 fn with_new_children(
313 self: Arc<Self>,
314 children: Vec<Arc<dyn ExecutionPlan>>,
315 ) -> Result<Arc<dyn ExecutionPlan>> {
316 Ok(Arc::new(BoundedWindowAggExec::try_new(
317 self.window_expr.clone(),
318 Arc::clone(&children[0]),
319 self.input_order_mode.clone(),
320 self.can_repartition,
321 )?))
322 }
323
324 fn execute(
325 &self,
326 partition: usize,
327 context: Arc<TaskContext>,
328 ) -> Result<SendableRecordBatchStream> {
329 let input = self.input.execute(partition, context)?;
330 let search_mode = self.get_search_algo()?;
331 let stream = Box::pin(BoundedWindowAggStream::new(
332 Arc::clone(&self.schema),
333 self.window_expr.clone(),
334 input,
335 BaselineMetrics::new(&self.metrics, partition),
336 search_mode,
337 )?);
338 Ok(stream)
339 }
340
341 fn metrics(&self) -> Option<MetricsSet> {
342 Some(self.metrics.clone_inner())
343 }
344
345 fn statistics(&self) -> Result<Statistics> {
346 let input_stat = self.input.statistics()?;
347 let win_cols = self.window_expr.len();
348 let input_cols = self.input.schema().fields().len();
349 let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
351 column_statistics.extend(input_stat.column_statistics);
353 for _ in 0..win_cols {
354 column_statistics.push(ColumnStatistics::new_unknown())
355 }
356 Ok(Statistics {
357 num_rows: input_stat.num_rows,
358 column_statistics,
359 total_byte_size: Precision::Absent,
360 })
361 }
362}
363
364trait PartitionSearcher: Send {
367 fn calculate_out_columns(
377 &mut self,
378 input_buffer: &RecordBatch,
379 window_agg_states: &[PartitionWindowAggStates],
380 partition_buffers: &mut PartitionBatches,
381 window_expr: &[Arc<dyn WindowExpr>],
382 ) -> Result<Option<Vec<ArrayRef>>>;
383
384 fn is_mode_linear(&self) -> bool {
386 false
387 }
388
389 fn evaluate_partition_batches(
391 &mut self,
392 record_batch: &RecordBatch,
393 window_expr: &[Arc<dyn WindowExpr>],
394 ) -> Result<Vec<(PartitionKey, RecordBatch)>>;
395
396 fn prune(&mut self, _n_out: usize) {}
398
399 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches);
402
403 fn update_partition_batch(
405 &mut self,
406 input_buffer: &mut RecordBatch,
407 record_batch: RecordBatch,
408 window_expr: &[Arc<dyn WindowExpr>],
409 partition_buffers: &mut PartitionBatches,
410 ) -> Result<()> {
411 if record_batch.num_rows() == 0 {
412 return Ok(());
413 }
414 let partition_batches =
415 self.evaluate_partition_batches(&record_batch, window_expr)?;
416 for (partition_row, partition_batch) in partition_batches {
417 let partition_batch_state = partition_buffers
418 .entry(partition_row)
419 .or_insert_with(|| {
424 PartitionBatchState::new(Arc::clone(self.input_schema()))
425 });
426 partition_batch_state.extend(&partition_batch)?;
427 }
428
429 if self.is_mode_linear() {
430 let last_row = get_last_row_batch(&record_batch)?;
444 for (_, partition_batch) in partition_buffers.iter_mut() {
445 partition_batch.set_most_recent_row(last_row.clone());
446 }
447 }
448 self.mark_partition_end(partition_buffers);
449
450 *input_buffer = if input_buffer.num_rows() == 0 {
451 record_batch
452 } else {
453 concat_batches(self.input_schema(), [input_buffer, &record_batch])?
454 };
455
456 Ok(())
457 }
458
459 fn input_schema(&self) -> &SchemaRef;
460}
461
462pub struct LinearSearch {
465 input_buffer_hashes: VecDeque<u64>,
468 random_state: RandomState,
470 ordered_partition_by_indices: Vec<usize>,
475 row_map_batch: HashTable<(u64, usize)>,
479 row_map_out: HashTable<(u64, usize, usize)>,
485 input_schema: SchemaRef,
486}
487
488impl PartitionSearcher for LinearSearch {
489 fn calculate_out_columns(
524 &mut self,
525 input_buffer: &RecordBatch,
526 window_agg_states: &[PartitionWindowAggStates],
527 partition_buffers: &mut PartitionBatches,
528 window_expr: &[Arc<dyn WindowExpr>],
529 ) -> Result<Option<Vec<ArrayRef>>> {
530 let partition_output_indices = self.calc_partition_output_indices(
531 input_buffer,
532 window_agg_states,
533 window_expr,
534 )?;
535
536 let n_window_col = window_agg_states.len();
537 let mut new_columns = vec![vec![]; n_window_col];
538 let mut all_indices = UInt32Builder::with_capacity(input_buffer.num_rows());
540 for (row, indices) in partition_output_indices {
541 let length = indices.len();
542 for (idx, window_agg_state) in window_agg_states.iter().enumerate() {
543 let partition = &window_agg_state[&row];
544 let values = Arc::clone(&partition.state.out_col.slice(0, length));
545 new_columns[idx].push(values);
546 }
547 let partition_batch_state = &mut partition_buffers[&row];
548 partition_batch_state.n_out_row = length;
550 all_indices.append_slice(&indices);
552 }
553 let all_indices = all_indices.finish();
554 if all_indices.is_empty() {
555 return Ok(None);
557 }
558
559 let new_columns = new_columns
562 .iter()
563 .map(|items| {
564 concat(&items.iter().map(|e| e.as_ref()).collect::<Vec<_>>())
565 .map_err(|e| arrow_datafusion_err!(e))
566 })
567 .collect::<Result<Vec<_>>>()?;
568 let sorted_indices = sort_to_indices(&all_indices, None, None)?;
570 take_arrays(&new_columns, &sorted_indices, None)
572 .map(Some)
573 .map_err(|e| arrow_datafusion_err!(e))
574 }
575
576 fn evaluate_partition_batches(
577 &mut self,
578 record_batch: &RecordBatch,
579 window_expr: &[Arc<dyn WindowExpr>],
580 ) -> Result<Vec<(PartitionKey, RecordBatch)>> {
581 let partition_bys =
582 evaluate_partition_by_column_values(record_batch, window_expr)?;
583 self.get_per_partition_indices(&partition_bys, record_batch)?
588 .into_iter()
589 .map(|(row, indices)| {
590 let mut new_indices = UInt32Builder::with_capacity(indices.len());
591 new_indices.append_slice(&indices);
592 let indices = new_indices.finish();
593 Ok((row, take_record_batch(record_batch, &indices)?))
594 })
595 .collect()
596 }
597
598 fn prune(&mut self, n_out: usize) {
599 self.input_buffer_hashes.drain(0..n_out);
601 }
602
603 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches) {
604 if !self.ordered_partition_by_indices.is_empty() {
607 if let Some((last_row, _)) = partition_buffers.last() {
608 let last_sorted_cols = self
609 .ordered_partition_by_indices
610 .iter()
611 .map(|idx| last_row[*idx].clone())
612 .collect::<Vec<_>>();
613 for (row, partition_batch_state) in partition_buffers.iter_mut() {
614 let sorted_cols = self
615 .ordered_partition_by_indices
616 .iter()
617 .map(|idx| &row[*idx]);
618 partition_batch_state.is_end = !sorted_cols.eq(&last_sorted_cols);
622 }
623 }
624 }
625 }
626
627 fn is_mode_linear(&self) -> bool {
628 self.ordered_partition_by_indices.is_empty()
629 }
630
631 fn input_schema(&self) -> &SchemaRef {
632 &self.input_schema
633 }
634}
635
636impl LinearSearch {
637 fn new(ordered_partition_by_indices: Vec<usize>, input_schema: SchemaRef) -> Self {
639 LinearSearch {
640 input_buffer_hashes: VecDeque::new(),
641 random_state: Default::default(),
642 ordered_partition_by_indices,
643 row_map_batch: HashTable::with_capacity(256),
644 row_map_out: HashTable::with_capacity(256),
645 input_schema,
646 }
647 }
648
649 fn get_per_partition_indices(
652 &mut self,
653 columns: &[ArrayRef],
654 batch: &RecordBatch,
655 ) -> Result<Vec<(PartitionKey, Vec<u32>)>> {
656 let mut batch_hashes = vec![0; batch.num_rows()];
657 create_hashes(columns, &self.random_state, &mut batch_hashes)?;
658 self.input_buffer_hashes.extend(&batch_hashes);
659 self.row_map_batch.clear();
661 let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
663 for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
664 let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
665 let row = get_row_at_idx(columns, row_idx as usize).unwrap();
668 row.eq(&result[*group_idx].0)
670 });
671 if let Some((_, group_idx)) = entry {
672 result[*group_idx].1.push(row_idx)
673 } else {
674 self.row_map_batch.insert_unique(
675 hash,
676 (hash, result.len()),
677 |(hash, _)| *hash,
678 );
679 let row = get_row_at_idx(columns, row_idx as usize)?;
680 result.push((row, vec![row_idx]));
682 }
683 }
684 Ok(result)
685 }
686
687 fn calc_partition_output_indices(
692 &mut self,
693 input_buffer: &RecordBatch,
694 window_agg_states: &[PartitionWindowAggStates],
695 window_expr: &[Arc<dyn WindowExpr>],
696 ) -> Result<Vec<(PartitionKey, Vec<u32>)>> {
697 let partition_by_columns =
698 evaluate_partition_by_column_values(input_buffer, window_expr)?;
699 self.row_map_out.clear();
701 let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
702 for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
703 let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
704 let row =
705 get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
706 row == partition_indices[*group_idx].0
707 });
708 if let Some((_, group_idx, n_out)) = entry {
709 let (_, indices) = &mut partition_indices[*group_idx];
710 if indices.len() >= *n_out {
711 break;
712 }
713 indices.push(row_idx);
714 } else {
715 let row = get_row_at_idx(&partition_by_columns, row_idx as usize)?;
716 let min_out = window_agg_states
717 .iter()
718 .map(|window_agg_state| {
719 window_agg_state
720 .get(&row)
721 .map(|partition| partition.state.out_col.len())
722 .unwrap_or(0)
723 })
724 .min()
725 .unwrap_or(0);
726 if min_out == 0 {
727 break;
728 }
729 self.row_map_out.insert_unique(
730 *hash,
731 (*hash, partition_indices.len(), min_out),
732 |(hash, _, _)| *hash,
733 );
734 partition_indices.push((row, vec![row_idx]));
735 }
736 }
737 Ok(partition_indices)
738 }
739}
740
741pub struct SortedSearch {
744 partition_by_sort_keys: LexOrdering,
746 ordered_partition_by_indices: Vec<usize>,
751 input_schema: SchemaRef,
752}
753
754impl PartitionSearcher for SortedSearch {
755 fn calculate_out_columns(
757 &mut self,
758 _input_buffer: &RecordBatch,
759 window_agg_states: &[PartitionWindowAggStates],
760 partition_buffers: &mut PartitionBatches,
761 _window_expr: &[Arc<dyn WindowExpr>],
762 ) -> Result<Option<Vec<ArrayRef>>> {
763 let n_out = self.calculate_n_out_row(window_agg_states, partition_buffers);
764 if n_out == 0 {
765 Ok(None)
766 } else {
767 window_agg_states
768 .iter()
769 .map(|map| get_aggregate_result_out_column(map, n_out).map(Some))
770 .collect()
771 }
772 }
773
774 fn evaluate_partition_batches(
775 &mut self,
776 record_batch: &RecordBatch,
777 _window_expr: &[Arc<dyn WindowExpr>],
778 ) -> Result<Vec<(PartitionKey, RecordBatch)>> {
779 let num_rows = record_batch.num_rows();
780 let partition_columns = self
782 .partition_by_sort_keys
783 .iter()
784 .map(|elem| elem.evaluate_to_sort_column(record_batch))
785 .collect::<Result<Vec<_>>>()?;
786 let partition_columns_ordered =
788 get_at_indices(&partition_columns, &self.ordered_partition_by_indices)?;
789 let partition_points =
790 evaluate_partition_ranges(num_rows, &partition_columns_ordered)?;
791 let partition_bys = partition_columns
792 .into_iter()
793 .map(|arr| arr.values)
794 .collect::<Vec<ArrayRef>>();
795
796 partition_points
797 .iter()
798 .map(|range| {
799 let row = get_row_at_idx(&partition_bys, range.start)?;
800 let len = range.end - range.start;
801 let slice = record_batch.slice(range.start, len);
802 Ok((row, slice))
803 })
804 .collect::<Result<Vec<_>>>()
805 }
806
807 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches) {
808 let n_partitions = partition_buffers.len();
812 for (idx, (_, partition_batch_state)) in partition_buffers.iter_mut().enumerate()
813 {
814 partition_batch_state.is_end |= idx < n_partitions - 1;
815 }
816 }
817
818 fn input_schema(&self) -> &SchemaRef {
819 &self.input_schema
820 }
821}
822
823impl SortedSearch {
824 fn calculate_n_out_row(
826 &mut self,
827 window_agg_states: &[PartitionWindowAggStates],
828 partition_buffers: &mut PartitionBatches,
829 ) -> usize {
830 let mut counts = vec![];
833 let out_col_counts = window_agg_states.iter().map(|window_agg_state| {
834 let mut cur_window_expr_out_result_len = 0;
837 let mut per_partition_out_results = HashMap::new();
841 for (row, WindowState { state, .. }) in window_agg_state.iter() {
842 cur_window_expr_out_result_len += state.out_col.len();
843 let count = per_partition_out_results.entry(row).or_insert(0);
844 if *count < state.out_col.len() {
845 *count = state.out_col.len();
846 }
847 if state.n_row_result_missing > 0 {
851 break;
852 }
853 }
854 counts.push(per_partition_out_results);
855 cur_window_expr_out_result_len
856 });
857 argmin(out_col_counts).map_or(0, |(min_idx, minima)| {
858 for (row, count) in counts.swap_remove(min_idx).into_iter() {
859 let partition_batch = &mut partition_buffers[row];
860 partition_batch.n_out_row = count;
861 }
862 minima
863 })
864 }
865}
866
867fn evaluate_partition_by_column_values(
870 record_batch: &RecordBatch,
871 window_expr: &[Arc<dyn WindowExpr>],
872) -> Result<Vec<ArrayRef>> {
873 window_expr[0]
874 .partition_by()
875 .iter()
876 .map(|item| match item.evaluate(record_batch)? {
877 ColumnarValue::Array(array) => Ok(array),
878 ColumnarValue::Scalar(scalar) => {
879 scalar.to_array_of_size(record_batch.num_rows())
880 }
881 })
882 .collect()
883}
884
885pub struct BoundedWindowAggStream {
887 schema: SchemaRef,
888 input: SendableRecordBatchStream,
889 input_buffer: RecordBatch,
892 partition_buffers: PartitionBatches,
901 window_agg_states: Vec<PartitionWindowAggStates>,
905 finished: bool,
906 window_expr: Vec<Arc<dyn WindowExpr>>,
907 baseline_metrics: BaselineMetrics,
908 search_mode: Box<dyn PartitionSearcher>,
911}
912
913impl BoundedWindowAggStream {
914 fn prune_state(&mut self, n_out: usize) -> Result<()> {
922 self.prune_out_columns();
924 self.prune_partition_batches();
926 self.prune_input_batch(n_out)?;
928 self.search_mode.prune(n_out);
930 Ok(())
931 }
932}
933
934impl Stream for BoundedWindowAggStream {
935 type Item = Result<RecordBatch>;
936
937 fn poll_next(
938 mut self: Pin<&mut Self>,
939 cx: &mut Context<'_>,
940 ) -> Poll<Option<Self::Item>> {
941 let poll = self.poll_next_inner(cx);
942 self.baseline_metrics.record_poll(poll)
943 }
944}
945
946impl BoundedWindowAggStream {
947 fn new(
949 schema: SchemaRef,
950 window_expr: Vec<Arc<dyn WindowExpr>>,
951 input: SendableRecordBatchStream,
952 baseline_metrics: BaselineMetrics,
953 search_mode: Box<dyn PartitionSearcher>,
954 ) -> Result<Self> {
955 let state = window_expr.iter().map(|_| IndexMap::new()).collect();
956 let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
957 Ok(Self {
958 schema,
959 input,
960 input_buffer: empty_batch,
961 partition_buffers: IndexMap::new(),
962 window_agg_states: state,
963 finished: false,
964 window_expr,
965 baseline_metrics,
966 search_mode,
967 })
968 }
969
970 fn compute_aggregates(&mut self) -> Result<Option<RecordBatch>> {
971 for (cur_window_expr, state) in
973 self.window_expr.iter().zip(&mut self.window_agg_states)
974 {
975 cur_window_expr.evaluate_stateful(&self.partition_buffers, state)?;
976 }
977
978 let schema = Arc::clone(&self.schema);
979 let window_expr_out = self.search_mode.calculate_out_columns(
980 &self.input_buffer,
981 &self.window_agg_states,
982 &mut self.partition_buffers,
983 &self.window_expr,
984 )?;
985 if let Some(window_expr_out) = window_expr_out {
986 let n_out = window_expr_out[0].len();
987 let columns_to_show = self
989 .input_buffer
990 .columns()
991 .iter()
992 .map(|elem| elem.slice(0, n_out))
993 .chain(window_expr_out)
994 .collect::<Vec<_>>();
995 let n_generated = columns_to_show[0].len();
996 self.prune_state(n_generated)?;
997 Ok(Some(RecordBatch::try_new(schema, columns_to_show)?))
998 } else {
999 Ok(None)
1000 }
1001 }
1002
1003 #[inline]
1004 fn poll_next_inner(
1005 &mut self,
1006 cx: &mut Context<'_>,
1007 ) -> Poll<Option<Result<RecordBatch>>> {
1008 if self.finished {
1009 return Poll::Ready(None);
1010 }
1011
1012 let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
1013 match ready!(self.input.poll_next_unpin(cx)) {
1014 Some(Ok(batch)) => {
1015 let _timer = elapsed_compute.timer();
1018
1019 self.search_mode.update_partition_batch(
1020 &mut self.input_buffer,
1021 batch,
1022 &self.window_expr,
1023 &mut self.partition_buffers,
1024 )?;
1025 if let Some(batch) = self.compute_aggregates()? {
1026 return Poll::Ready(Some(Ok(batch)));
1027 }
1028 self.poll_next_inner(cx)
1029 }
1030 Some(Err(e)) => Poll::Ready(Some(Err(e))),
1031 None => {
1032 let _timer = elapsed_compute.timer();
1033
1034 self.finished = true;
1035 for (_, partition_batch_state) in self.partition_buffers.iter_mut() {
1036 partition_batch_state.is_end = true;
1037 }
1038 if let Some(batch) = self.compute_aggregates()? {
1039 return Poll::Ready(Some(Ok(batch)));
1040 }
1041 Poll::Ready(None)
1042 }
1043 }
1044 }
1045
1046 fn prune_partition_batches(&mut self) {
1049 self.partition_buffers
1053 .retain(|_, partition_batch_state| !partition_batch_state.is_end);
1054
1055 let mut n_prune_each_partition = HashMap::new();
1067 for window_agg_state in self.window_agg_states.iter_mut() {
1068 window_agg_state.retain(|_, WindowState { state, .. }| !state.is_end);
1069 for (partition_row, WindowState { state: value, .. }) in window_agg_state {
1070 let n_prune =
1071 min(value.window_frame_range.start, value.last_calculated_index);
1072 if let Some(current) = n_prune_each_partition.get_mut(partition_row) {
1073 if n_prune < *current {
1074 *current = n_prune;
1075 }
1076 } else {
1077 n_prune_each_partition.insert(partition_row.clone(), n_prune);
1078 }
1079 }
1080 }
1081
1082 for (partition_row, n_prune) in n_prune_each_partition.iter() {
1084 let pb_state = &mut self.partition_buffers[partition_row];
1085
1086 let batch = &pb_state.record_batch;
1087 pb_state.record_batch = batch.slice(*n_prune, batch.num_rows() - n_prune);
1088 pb_state.n_out_row = 0;
1089
1090 for window_agg_state in self.window_agg_states.iter_mut() {
1092 window_agg_state[partition_row].state.prune_state(*n_prune);
1093 }
1094 }
1095 }
1096
1097 fn prune_input_batch(&mut self, n_out: usize) -> Result<()> {
1100 let n_to_keep = self.input_buffer.num_rows() - n_out;
1102 let batch_to_keep = self
1103 .input_buffer
1104 .columns()
1105 .iter()
1106 .map(|elem| elem.slice(n_out, n_to_keep))
1107 .collect::<Vec<_>>();
1108 self.input_buffer = RecordBatch::try_new_with_options(
1109 self.input_buffer.schema(),
1110 batch_to_keep,
1111 &RecordBatchOptions::new().with_row_count(Some(n_to_keep)),
1112 )?;
1113 Ok(())
1114 }
1115
1116 fn prune_out_columns(&mut self) {
1118 for partition_window_agg_states in self.window_agg_states.iter_mut() {
1122 partition_window_agg_states
1126 .retain(|_, partition_batch_state| !partition_batch_state.state.is_end);
1127 for (
1128 partition_key,
1129 WindowState {
1130 state: WindowAggState { out_col, .. },
1131 ..
1132 },
1133 ) in partition_window_agg_states
1134 {
1135 let partition_batch = &mut self.partition_buffers[partition_key];
1136 let n_to_del = partition_batch.n_out_row;
1137 let n_to_keep = out_col.len() - n_to_del;
1138 *out_col = out_col.slice(n_to_del, n_to_keep);
1139 }
1140 }
1141 }
1142}
1143
1144impl RecordBatchStream for BoundedWindowAggStream {
1145 fn schema(&self) -> SchemaRef {
1147 Arc::clone(&self.schema)
1148 }
1149}
1150
1151fn argmin<T: PartialOrd>(data: impl Iterator<Item = T>) -> Option<(usize, T)> {
1153 data.enumerate()
1154 .min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(Ordering::Equal))
1155}
1156
1157fn get_aggregate_result_out_column(
1159 partition_window_agg_states: &PartitionWindowAggStates,
1160 len_to_show: usize,
1161) -> Result<ArrayRef> {
1162 let mut result = None;
1163 let mut running_length = 0;
1164 for (
1166 _,
1167 WindowState {
1168 state: WindowAggState { out_col, .. },
1169 ..
1170 },
1171 ) in partition_window_agg_states
1172 {
1173 if running_length < len_to_show {
1174 let n_to_use = min(len_to_show - running_length, out_col.len());
1175 let slice_to_use = out_col.slice(0, n_to_use);
1176 result = Some(match result {
1177 Some(arr) => concat(&[&arr, &slice_to_use])?,
1178 None => slice_to_use,
1179 });
1180 running_length += n_to_use;
1181 } else {
1182 break;
1183 }
1184 }
1185 if running_length != len_to_show {
1186 return exec_err!(
1187 "Generated row number should be {len_to_show}, it is {running_length}"
1188 );
1189 }
1190 result
1191 .ok_or_else(|| DataFusionError::Execution("Should contain something".to_string()))
1192}
1193
1194pub(crate) fn get_last_row_batch(batch: &RecordBatch) -> Result<RecordBatch> {
1196 if batch.num_rows() == 0 {
1197 return exec_err!("Latest batch should have at least 1 row");
1198 }
1199 Ok(batch.slice(batch.num_rows() - 1, 1))
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204 use std::pin::Pin;
1205 use std::sync::Arc;
1206 use std::task::{Context, Poll};
1207 use std::time::Duration;
1208
1209 use crate::common::collect;
1210 use crate::expressions::PhysicalSortExpr;
1211 use crate::projection::ProjectionExec;
1212 use crate::streaming::{PartitionStream, StreamingTableExec};
1213 use crate::test::TestMemoryExec;
1214 use crate::windows::{
1215 create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode,
1216 };
1217 use crate::{execute_stream, get_plan_string, ExecutionPlan};
1218
1219 use arrow::array::{
1220 builder::{Int64Builder, UInt64Builder},
1221 RecordBatch,
1222 };
1223 use arrow::compute::SortOptions;
1224 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1225 use datafusion_common::test_util::batches_to_string;
1226 use datafusion_common::{exec_datafusion_err, Result, ScalarValue};
1227 use datafusion_execution::config::SessionConfig;
1228 use datafusion_execution::{
1229 RecordBatchStream, SendableRecordBatchStream, TaskContext,
1230 };
1231 use datafusion_expr::{
1232 WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
1233 };
1234 use datafusion_functions_aggregate::count::count_udaf;
1235 use datafusion_functions_window::nth_value::last_value_udwf;
1236 use datafusion_functions_window::nth_value::nth_value_udwf;
1237 use datafusion_physical_expr::expressions::{col, Column, Literal};
1238 use datafusion_physical_expr::window::StandardWindowExpr;
1239 use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
1240
1241 use futures::future::Shared;
1242 use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
1243 use insta::assert_snapshot;
1244 use itertools::Itertools;
1245 use tokio::time::timeout;
1246
1247 #[derive(Debug, Clone)]
1248 struct TestStreamPartition {
1249 schema: SchemaRef,
1250 batches: Vec<RecordBatch>,
1251 idx: usize,
1252 state: PolingState,
1253 sleep_duration: Duration,
1254 send_exit: bool,
1255 }
1256
1257 impl PartitionStream for TestStreamPartition {
1258 fn schema(&self) -> &SchemaRef {
1259 &self.schema
1260 }
1261
1262 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1263 Box::pin(self.clone())
1266 }
1267 }
1268
1269 impl Stream for TestStreamPartition {
1270 type Item = Result<RecordBatch>;
1271
1272 fn poll_next(
1273 mut self: Pin<&mut Self>,
1274 cx: &mut Context<'_>,
1275 ) -> Poll<Option<Self::Item>> {
1276 self.poll_next_inner(cx)
1277 }
1278 }
1279
1280 #[derive(Debug, Clone)]
1281 enum PolingState {
1282 Sleep(Shared<futures::future::BoxFuture<'static, ()>>),
1283 BatchReturn,
1284 }
1285
1286 impl TestStreamPartition {
1287 fn poll_next_inner(
1288 self: &mut Pin<&mut Self>,
1289 cx: &mut Context<'_>,
1290 ) -> Poll<Option<Result<RecordBatch>>> {
1291 loop {
1292 match &mut self.state {
1293 PolingState::BatchReturn => {
1294 let f = tokio::time::sleep(self.sleep_duration).boxed().shared();
1296 self.state = PolingState::Sleep(f);
1297 let input_batch = if let Some(batch) =
1298 self.batches.clone().get(self.idx)
1299 {
1300 batch.clone()
1301 } else if self.send_exit {
1302 return Poll::Ready(None);
1304 } else {
1305 let f =
1307 tokio::time::sleep(self.sleep_duration).boxed().shared();
1308 self.state = PolingState::Sleep(f);
1309 continue;
1310 };
1311 self.idx += 1;
1312 return Poll::Ready(Some(Ok(input_batch)));
1313 }
1314 PolingState::Sleep(future) => {
1315 pin_mut!(future);
1316 ready!(future.poll_unpin(cx));
1317 self.state = PolingState::BatchReturn;
1318 }
1319 }
1320 }
1321 }
1322 }
1323
1324 impl RecordBatchStream for TestStreamPartition {
1325 fn schema(&self) -> SchemaRef {
1326 Arc::clone(&self.schema)
1327 }
1328 }
1329
1330 fn bounded_window_exec_pb_latent_range(
1331 input: Arc<dyn ExecutionPlan>,
1332 n_future_range: usize,
1333 hash: &str,
1334 order_by: &str,
1335 ) -> Result<Arc<dyn ExecutionPlan>> {
1336 let schema = input.schema();
1337 let window_fn = WindowFunctionDefinition::AggregateUDF(count_udaf());
1338 let col_expr =
1339 Arc::new(Column::new(schema.fields[0].name(), 0)) as Arc<dyn PhysicalExpr>;
1340 let args = vec![col_expr];
1341 let partitionby_exprs = vec![col(hash, &schema)?];
1342 let orderby_exprs = LexOrdering::new(vec![PhysicalSortExpr {
1343 expr: col(order_by, &schema)?,
1344 options: SortOptions::default(),
1345 }]);
1346 let window_frame = WindowFrame::new_bounds(
1347 WindowFrameUnits::Range,
1348 WindowFrameBound::CurrentRow,
1349 WindowFrameBound::Following(ScalarValue::UInt64(Some(n_future_range as u64))),
1350 );
1351 let fn_name = format!(
1352 "{}({:?}) PARTITION BY: [{:?}], ORDER BY: [{:?}]",
1353 window_fn, args, partitionby_exprs, orderby_exprs
1354 );
1355 let input_order_mode = InputOrderMode::Linear;
1356 Ok(Arc::new(BoundedWindowAggExec::try_new(
1357 vec![create_window_expr(
1358 &window_fn,
1359 fn_name,
1360 &args,
1361 &partitionby_exprs,
1362 orderby_exprs.as_ref(),
1363 Arc::new(window_frame),
1364 &input.schema(),
1365 false,
1366 )?],
1367 input,
1368 input_order_mode,
1369 true,
1370 )?))
1371 }
1372
1373 fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1374 let schema = input.schema();
1375 let exprs = input
1376 .schema()
1377 .fields
1378 .iter()
1379 .enumerate()
1380 .map(|(idx, field)| {
1381 let name = if field.name().len() > 20 {
1382 format!("col_{idx}")
1383 } else {
1384 field.name().clone()
1385 };
1386 let expr = col(field.name(), &schema).unwrap();
1387 (expr, name)
1388 })
1389 .collect::<Vec<_>>();
1390 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
1391 }
1392
1393 fn task_context_helper() -> TaskContext {
1394 let task_ctx = TaskContext::default();
1395 let session_config = SessionConfig::new()
1397 .with_batch_size(1)
1398 .with_target_partitions(2)
1399 .with_round_robin_repartition(false);
1400 task_ctx.with_session_config(session_config)
1401 }
1402
1403 fn task_context() -> Arc<TaskContext> {
1404 Arc::new(task_context_helper())
1405 }
1406
1407 pub async fn collect_stream(
1408 mut stream: SendableRecordBatchStream,
1409 results: &mut Vec<RecordBatch>,
1410 ) -> Result<()> {
1411 while let Some(item) = stream.next().await {
1412 results.push(item?);
1413 }
1414 Ok(())
1415 }
1416
1417 pub async fn collect_with_timeout(
1419 plan: Arc<dyn ExecutionPlan>,
1420 context: Arc<TaskContext>,
1421 timeout_duration: Duration,
1422 ) -> Result<Vec<RecordBatch>> {
1423 let stream = execute_stream(plan, context)?;
1424 let mut results = vec![];
1425
1426 if timeout(timeout_duration, collect_stream(stream, &mut results))
1428 .await
1429 .is_ok()
1430 {
1431 return Err(exec_datafusion_err!("shouldn't have completed"));
1432 };
1433
1434 Ok(results)
1435 }
1436
1437 #[allow(dead_code)]
1439 pub async fn collect_bonafide(
1440 plan: Arc<dyn ExecutionPlan>,
1441 context: Arc<TaskContext>,
1442 ) -> Result<Vec<RecordBatch>> {
1443 let stream = execute_stream(plan, context)?;
1444 let mut results = vec![];
1445
1446 collect_stream(stream, &mut results).await?;
1447
1448 Ok(results)
1449 }
1450
1451 fn test_schema() -> SchemaRef {
1452 Arc::new(Schema::new(vec![
1453 Field::new("sn", DataType::UInt64, true),
1454 Field::new("hash", DataType::Int64, true),
1455 ]))
1456 }
1457
1458 fn schema_orders(schema: &SchemaRef) -> Result<Vec<LexOrdering>> {
1459 let orderings = vec![LexOrdering::new(vec![PhysicalSortExpr {
1460 expr: col("sn", schema)?,
1461 options: SortOptions {
1462 descending: false,
1463 nulls_first: false,
1464 },
1465 }])];
1466 Ok(orderings)
1467 }
1468
1469 fn is_integer_division_safe(lhs: usize, rhs: usize) -> bool {
1470 let res = lhs / rhs;
1471 res * rhs == lhs
1472 }
1473 fn generate_batches(
1474 schema: &SchemaRef,
1475 n_row: usize,
1476 n_chunk: usize,
1477 ) -> Result<Vec<RecordBatch>> {
1478 let mut batches = vec![];
1479 assert!(n_row > 0);
1480 assert!(n_chunk > 0);
1481 assert!(is_integer_division_safe(n_row, n_chunk));
1482 let hash_replicate = 4;
1483
1484 let chunks = (0..n_row)
1485 .chunks(n_chunk)
1486 .into_iter()
1487 .map(|elem| elem.into_iter().collect::<Vec<_>>())
1488 .collect::<Vec<_>>();
1489
1490 for sn_values in chunks {
1492 let mut sn1_array = UInt64Builder::with_capacity(sn_values.len());
1493 let mut hash_array = Int64Builder::with_capacity(sn_values.len());
1494
1495 for sn in sn_values {
1496 sn1_array.append_value(sn as u64);
1497 let hash_value = (2 - (sn / hash_replicate)) as i64;
1498 hash_array.append_value(hash_value);
1499 }
1500
1501 let batch = RecordBatch::try_new(
1502 Arc::clone(schema),
1503 vec![Arc::new(sn1_array.finish()), Arc::new(hash_array.finish())],
1504 )?;
1505 batches.push(batch);
1506 }
1507 Ok(batches)
1508 }
1509
1510 fn generate_never_ending_source(
1511 n_rows: usize,
1512 chunk_length: usize,
1513 n_partition: usize,
1514 is_infinite: bool,
1515 send_exit: bool,
1516 per_batch_wait_duration_in_millis: u64,
1517 ) -> Result<Arc<dyn ExecutionPlan>> {
1518 assert!(n_partition > 0);
1519
1520 let schema = test_schema();
1524 let orderings = schema_orders(&schema)?;
1525
1526 let per_batch_wait_duration =
1528 Duration::from_millis(per_batch_wait_duration_in_millis);
1529
1530 let batches = generate_batches(&schema, n_rows, chunk_length)?;
1531
1532 let partitions = vec![
1534 Arc::new(TestStreamPartition {
1535 schema: Arc::clone(&schema),
1536 batches,
1537 idx: 0,
1538 state: PolingState::BatchReturn,
1539 sleep_duration: per_batch_wait_duration,
1540 send_exit,
1541 }) as _;
1542 n_partition
1543 ];
1544 let source = Arc::new(StreamingTableExec::try_new(
1545 Arc::clone(&schema),
1546 partitions,
1547 None,
1548 orderings,
1549 is_infinite,
1550 None,
1551 )?) as _;
1552 Ok(source)
1553 }
1554
1555 #[tokio::test]
1561 async fn test_window_nth_value_bounded_memoize() -> Result<()> {
1562 let config = SessionConfig::new().with_target_partitions(1);
1563 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
1564
1565 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1566 let batch = RecordBatch::try_new(
1568 Arc::clone(&schema),
1569 vec![Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]))],
1570 )?;
1571
1572 let memory_exec = TestMemoryExec::try_new_exec(
1573 &[vec![batch.clone(), batch.clone(), batch.clone()]],
1574 Arc::clone(&schema),
1575 None,
1576 )?;
1577 let col_a = col("a", &schema)?;
1578 let nth_value_func1 = create_udwf_window_expr(
1579 &nth_value_udwf(),
1580 &[
1581 Arc::clone(&col_a),
1582 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1583 ],
1584 &schema,
1585 "nth_value(-1)".to_string(),
1586 false,
1587 )?
1588 .reverse_expr()
1589 .unwrap();
1590 let nth_value_func2 = create_udwf_window_expr(
1591 &nth_value_udwf(),
1592 &[
1593 Arc::clone(&col_a),
1594 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
1595 ],
1596 &schema,
1597 "nth_value(-2)".to_string(),
1598 false,
1599 )?
1600 .reverse_expr()
1601 .unwrap();
1602
1603 let last_value_func = create_udwf_window_expr(
1604 &last_value_udwf(),
1605 &[Arc::clone(&col_a)],
1606 &schema,
1607 "last".to_string(),
1608 false,
1609 )?;
1610
1611 let window_exprs = vec![
1612 Arc::new(StandardWindowExpr::new(
1614 last_value_func,
1615 &[],
1616 &LexOrdering::default(),
1617 Arc::new(WindowFrame::new_bounds(
1618 WindowFrameUnits::Rows,
1619 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1620 WindowFrameBound::CurrentRow,
1621 )),
1622 )) as _,
1623 Arc::new(StandardWindowExpr::new(
1625 nth_value_func1,
1626 &[],
1627 &LexOrdering::default(),
1628 Arc::new(WindowFrame::new_bounds(
1629 WindowFrameUnits::Rows,
1630 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1631 WindowFrameBound::CurrentRow,
1632 )),
1633 )) as _,
1634 Arc::new(StandardWindowExpr::new(
1636 nth_value_func2,
1637 &[],
1638 &LexOrdering::default(),
1639 Arc::new(WindowFrame::new_bounds(
1640 WindowFrameUnits::Rows,
1641 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1642 WindowFrameBound::CurrentRow,
1643 )),
1644 )) as _,
1645 ];
1646 let physical_plan = BoundedWindowAggExec::try_new(
1647 window_exprs,
1648 memory_exec,
1649 InputOrderMode::Sorted,
1650 true,
1651 )
1652 .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
1653
1654 let batches = collect(physical_plan.execute(0, task_ctx)?).await?;
1655
1656 let expected = vec![
1657 "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, nth_value(-2): Ok(Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[Sorted]",
1658 " DataSourceExec: partitions=1, partition_sizes=[3]",
1659 ];
1660 let actual = get_plan_string(&physical_plan);
1662 assert_eq!(
1663 expected, actual,
1664 "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1665 );
1666
1667 assert_snapshot!(batches_to_string(&batches), @r#"
1668 +---+------+---------------+---------------+
1669 | a | last | nth_value(-1) | nth_value(-2) |
1670 +---+------+---------------+---------------+
1671 | 1 | 1 | 1 | |
1672 | 2 | 2 | 2 | 1 |
1673 | 3 | 3 | 3 | 2 |
1674 | 1 | 1 | 1 | 3 |
1675 | 2 | 2 | 2 | 1 |
1676 | 3 | 3 | 3 | 2 |
1677 | 1 | 1 | 1 | 3 |
1678 | 2 | 2 | 2 | 1 |
1679 | 3 | 3 | 3 | 2 |
1680 +---+------+---------------+---------------+
1681 "#);
1682 Ok(())
1683 }
1684
1685 #[tokio::test]
1763 async fn bounded_window_exec_linear_mode_range_information() -> Result<()> {
1764 let n_rows = 10;
1765 let chunk_length = 2;
1766 let n_future_range = 1;
1767
1768 let timeout_duration = Duration::from_millis(2000);
1769
1770 let source =
1771 generate_never_ending_source(n_rows, chunk_length, 1, true, false, 5)?;
1772
1773 let window =
1774 bounded_window_exec_pb_latent_range(source, n_future_range, "hash", "sn")?;
1775
1776 let plan = projection_exec(window)?;
1777
1778 let expected_plan = vec![
1779 "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]@2 as col_2]",
1780 " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]",
1781 " StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
1782 ];
1783
1784 let actual = get_plan_string(&plan);
1786 assert_eq!(
1787 expected_plan, actual,
1788 "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_plan:#?}\nactual:\n\n{actual:#?}\n\n"
1789 );
1790
1791 let task_ctx = task_context();
1792 let batches = collect_with_timeout(plan, task_ctx, timeout_duration).await?;
1793
1794 assert_snapshot!(batches_to_string(&batches), @r#"
1795 +----+------+-------+
1796 | sn | hash | col_2 |
1797 +----+------+-------+
1798 | 0 | 2 | 2 |
1799 | 1 | 2 | 2 |
1800 | 2 | 2 | 2 |
1801 | 3 | 2 | 1 |
1802 | 4 | 1 | 2 |
1803 | 5 | 1 | 2 |
1804 | 6 | 1 | 2 |
1805 | 7 | 1 | 1 |
1806 +----+------+-------+
1807 "#);
1808
1809 Ok(())
1810 }
1811}