Skip to main content

datafusion_expr/
window_state.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Structures used to hold window function state (for implementing WindowUDFs)
19
20use std::{collections::VecDeque, ops::Range, sync::Arc};
21
22use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};
23
24use arrow::{
25    array::ArrayRef,
26    compute::{SortOptions, concat, concat_batches},
27    datatypes::{DataType, SchemaRef},
28    record_batch::RecordBatch,
29};
30use datafusion_common::{
31    Result, ScalarValue, internal_datafusion_err, internal_err,
32    utils::{compare_rows, get_row_at_idx, search_in_slice},
33};
34
35/// Holds the state of evaluating a window function
36#[derive(Debug, Clone)]
37pub struct WindowAggState {
38    /// The range that we calculate the window function
39    pub window_frame_range: Range<usize>,
40    pub window_frame_ctx: Option<WindowFrameContext>,
41    /// The index of the last row that its result is calculated inside the partition record batch buffer.
42    pub last_calculated_index: usize,
43    /// The offset of the deleted row number
44    pub offset_pruned_rows: usize,
45    /// Stores the results calculated by window frame
46    pub out_col: ArrayRef,
47    /// Keeps track of how many rows should be generated to be in sync with input record_batch.
48    // (For each row in the input record batch we need to generate a window result).
49    pub n_row_result_missing: usize,
50    /// Flag indicating whether we have received all data for this partition
51    pub is_end: bool,
52}
53
54impl WindowAggState {
55    pub fn prune_state(&mut self, n_prune: usize) {
56        self.window_frame_range = Range {
57            start: self.window_frame_range.start - n_prune,
58            end: self.window_frame_range.end - n_prune,
59        };
60        self.last_calculated_index -= n_prune;
61        self.offset_pruned_rows += n_prune;
62
63        match self.window_frame_ctx.as_mut() {
64            // Rows have no state do nothing
65            Some(WindowFrameContext::Rows(_)) => {}
66            Some(WindowFrameContext::Range { .. }) => {}
67            Some(WindowFrameContext::Groups { state, .. }) => {
68                let mut n_group_to_del = 0;
69                for (_, end_idx) in &state.group_end_indices {
70                    if n_prune < *end_idx {
71                        break;
72                    }
73                    n_group_to_del += 1;
74                }
75                state.group_end_indices.drain(0..n_group_to_del);
76                state
77                    .group_end_indices
78                    .iter_mut()
79                    .for_each(|(_, start_idx)| *start_idx -= n_prune);
80                state.current_group_idx -= n_group_to_del;
81            }
82            None => {}
83        };
84    }
85
86    pub fn update(
87        &mut self,
88        out_col: &ArrayRef,
89        partition_batch_state: &PartitionBatchState,
90    ) -> Result<()> {
91        self.last_calculated_index += out_col.len();
92        // no need to use concat if the current `out_col` is empty
93        if self.out_col.is_empty() {
94            self.out_col = Arc::clone(out_col);
95        } else {
96            self.out_col = concat(&[&self.out_col, &out_col])?;
97        }
98        self.n_row_result_missing =
99            partition_batch_state.record_batch.num_rows() - self.last_calculated_index;
100        self.is_end = partition_batch_state.is_end;
101        Ok(())
102    }
103
104    pub fn new(out_type: &DataType) -> Result<Self> {
105        let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
106        Ok(Self {
107            window_frame_range: Range { start: 0, end: 0 },
108            window_frame_ctx: None,
109            last_calculated_index: 0,
110            offset_pruned_rows: 0,
111            out_col: empty_out_col,
112            n_row_result_missing: 0,
113            is_end: false,
114        })
115    }
116}
117
118/// This object stores the window frame state for use in incremental calculations.
119#[derive(Debug, Clone)]
120pub enum WindowFrameContext {
121    /// ROWS frames are inherently stateless.
122    Rows(Arc<WindowFrame>),
123    /// RANGE frames are stateful, they store indices specifying where the
124    /// previous search left off. This amortizes the overall cost to O(n)
125    /// where n denotes the row count.
126    Range {
127        window_frame: Arc<WindowFrame>,
128        state: WindowFrameStateRange,
129    },
130    /// GROUPS frames are stateful, they store group boundaries and indices
131    /// specifying where the previous search left off. This amortizes the
132    /// overall cost to O(n) where n denotes the row count.
133    Groups {
134        window_frame: Arc<WindowFrame>,
135        state: WindowFrameStateGroups,
136    },
137}
138
139impl WindowFrameContext {
140    /// Create a new state object for the given window frame.
141    pub fn new(window_frame: Arc<WindowFrame>, sort_options: Vec<SortOptions>) -> Self {
142        match window_frame.units {
143            WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
144            WindowFrameUnits::Range => WindowFrameContext::Range {
145                window_frame,
146                state: WindowFrameStateRange::new(sort_options),
147            },
148            WindowFrameUnits::Groups => WindowFrameContext::Groups {
149                window_frame,
150                state: WindowFrameStateGroups::default(),
151            },
152        }
153    }
154
155    /// This function calculates beginning/ending indices for the frame of the current row.
156    pub fn calculate_range(
157        &mut self,
158        range_columns: &[ArrayRef],
159        last_range: &Range<usize>,
160        length: usize,
161        idx: usize,
162    ) -> Result<Range<usize>> {
163        match self {
164            WindowFrameContext::Rows(window_frame) => {
165                Self::calculate_range_rows(window_frame, length, idx)
166            }
167            // Sort options is used in RANGE mode calculations because the
168            // ordering or position of NULLs impact range calculations and
169            // comparison of rows.
170            WindowFrameContext::Range {
171                window_frame,
172                state,
173            } => state.calculate_range(
174                window_frame,
175                last_range,
176                range_columns,
177                length,
178                idx,
179            ),
180            // Sort options is not used in GROUPS mode calculations as the
181            // inequality of two rows indicates a group change, and ordering
182            // or position of NULLs do not impact inequality.
183            WindowFrameContext::Groups {
184                window_frame,
185                state,
186            } => state.calculate_range(window_frame, range_columns, length, idx),
187        }
188    }
189
190    /// This function calculates beginning/ending indices for the frame of the current row.
191    fn calculate_range_rows(
192        window_frame: &Arc<WindowFrame>,
193        length: usize,
194        idx: usize,
195    ) -> Result<Range<usize>> {
196        let start = match window_frame.start_bound {
197            // UNBOUNDED PRECEDING
198            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
199            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
200                idx.saturating_sub(n as usize)
201            }
202            WindowFrameBound::CurrentRow => idx,
203            // UNBOUNDED FOLLOWING
204            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
205                return internal_err!(
206                    "Frame start cannot be UNBOUNDED FOLLOWING '{window_frame:?}'"
207                );
208            }
209            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
210                std::cmp::min(idx + n as usize, length)
211            }
212            // ERRONEOUS FRAMES
213            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
214                return internal_err!("Rows should be UInt64");
215            }
216        };
217        let end = match window_frame.end_bound {
218            // UNBOUNDED PRECEDING
219            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
220                return internal_err!(
221                    "Frame end cannot be UNBOUNDED PRECEDING '{window_frame:?}'"
222                );
223            }
224            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
225                if idx >= n as usize {
226                    idx - n as usize + 1
227                } else {
228                    0
229                }
230            }
231            WindowFrameBound::CurrentRow => idx + 1,
232            // UNBOUNDED FOLLOWING
233            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
234            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
235                std::cmp::min(idx + n as usize + 1, length)
236            }
237            // ERRONEOUS FRAMES
238            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
239                return internal_err!("Rows should be UInt64");
240            }
241        };
242        Ok(Range { start, end })
243    }
244}
245
246/// State for each unique partition determined according to PARTITION BY column(s)
247#[derive(Debug, Clone, PartialEq)]
248pub struct PartitionBatchState {
249    /// The record batch belonging to current partition
250    pub record_batch: RecordBatch,
251    /// The record batch that contains the most recent row at the input.
252    /// Please note that this batch doesn't necessarily have the same partitioning
253    /// with `record_batch`. Keeping track of this batch enables us to prune
254    /// `record_batch` when cardinality of the partition is sparse.
255    pub most_recent_row: Option<RecordBatch>,
256    /// Flag indicating whether we have received all data for this partition
257    pub is_end: bool,
258    /// Number of rows emitted for each partition
259    pub n_out_row: usize,
260}
261
262impl PartitionBatchState {
263    pub fn new(schema: SchemaRef) -> Self {
264        Self {
265            record_batch: RecordBatch::new_empty(schema),
266            most_recent_row: None,
267            is_end: false,
268            n_out_row: 0,
269        }
270    }
271
272    pub fn new_with_batch(batch: RecordBatch) -> Self {
273        Self {
274            record_batch: batch,
275            most_recent_row: None,
276            is_end: false,
277            n_out_row: 0,
278        }
279    }
280
281    pub fn extend(&mut self, batch: &RecordBatch) -> Result<()> {
282        self.record_batch =
283            concat_batches(&self.record_batch.schema(), [&self.record_batch, batch])?;
284        Ok(())
285    }
286
287    pub fn set_most_recent_row(&mut self, batch: RecordBatch) {
288        // It is enough for the batch to contain only a single row (the rest
289        // are not necessary).
290        self.most_recent_row = Some(batch);
291    }
292}
293
294/// This structure encapsulates all the state information we require as we scan
295/// ranges of data while processing RANGE frames.
296/// Attribute `sort_options` stores the column ordering specified by the ORDER
297/// BY clause. This information is used to calculate the range.
298#[derive(Debug, Default, Clone)]
299pub struct WindowFrameStateRange {
300    sort_options: Vec<SortOptions>,
301}
302
303impl WindowFrameStateRange {
304    /// Create a new object to store the search state.
305    fn new(sort_options: Vec<SortOptions>) -> Self {
306        Self { sort_options }
307    }
308
309    /// This function calculates beginning/ending indices for the frame of the current row.
310    // Argument `last_range` stores the resulting indices from the previous search. Since the indices only
311    // advance forward, we start from `last_range` subsequently. Thus, the overall
312    // time complexity of linear search amortizes to O(n) where n denotes the total
313    // row count.
314    fn calculate_range(
315        &mut self,
316        window_frame: &Arc<WindowFrame>,
317        last_range: &Range<usize>,
318        range_columns: &[ArrayRef],
319        length: usize,
320        idx: usize,
321    ) -> Result<Range<usize>> {
322        let start = match window_frame.start_bound {
323            WindowFrameBound::Preceding(ref n) => {
324                if n.is_null() {
325                    // UNBOUNDED PRECEDING
326                    0
327                } else {
328                    self.calculate_index_of_row::<true, true>(
329                        range_columns,
330                        last_range,
331                        idx,
332                        Some(n),
333                        length,
334                    )?
335                }
336            }
337            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
338                range_columns,
339                last_range,
340                idx,
341                None,
342                length,
343            )?,
344            WindowFrameBound::Following(ref n) => self
345                .calculate_index_of_row::<true, false>(
346                    range_columns,
347                    last_range,
348                    idx,
349                    Some(n),
350                    length,
351                )?,
352        };
353        let end = match window_frame.end_bound {
354            WindowFrameBound::Preceding(ref n) => self
355                .calculate_index_of_row::<false, true>(
356                    range_columns,
357                    last_range,
358                    idx,
359                    Some(n),
360                    length,
361                )?,
362            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
363                range_columns,
364                last_range,
365                idx,
366                None,
367                length,
368            )?,
369            WindowFrameBound::Following(ref n) => {
370                if n.is_null() {
371                    // UNBOUNDED FOLLOWING
372                    length
373                } else {
374                    self.calculate_index_of_row::<false, false>(
375                        range_columns,
376                        last_range,
377                        idx,
378                        Some(n),
379                        length,
380                    )?
381                }
382            }
383        };
384        Ok(Range { start, end })
385    }
386
387    /// This function does the heavy lifting when finding range boundaries. It is meant to be
388    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
389    /// supplied as true and false, respectively).
390    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
391        &mut self,
392        range_columns: &[ArrayRef],
393        last_range: &Range<usize>,
394        idx: usize,
395        delta: Option<&ScalarValue>,
396        length: usize,
397    ) -> Result<usize> {
398        let current_row_values = get_row_at_idx(range_columns, idx)?;
399        let search_start = if SIDE {
400            last_range.start
401        } else {
402            last_range.end
403        };
404        let end_range = if let Some(delta) = delta {
405            let is_descending: bool = self
406                .sort_options
407                .first()
408                .ok_or_else(|| {
409                    internal_datafusion_err!(
410                        "Sort options unexpectedly absent in a window frame"
411                    )
412                })?
413                .descending;
414
415            // On overflow the boundary exceeds the type's range and is
416            // effectively unbounded within the partition. Collapse to the
417            // partition edge rather than feeding `search_in_slice` a
418            // wrapped-around target: PRECEDING searches reach `search_start`,
419            // FOLLOWING searches reach `length`.
420            let unbounded_edge = if SEARCH_SIDE { search_start } else { length };
421            let mut targets = Vec::with_capacity(current_row_values.len());
422            for value in &current_row_values {
423                if value.is_null() {
424                    targets.push(value.clone());
425                    continue;
426                }
427                let target = if SEARCH_SIDE == is_descending {
428                    match value.add_checked(delta) {
429                        Ok(v) => v,
430                        Err(_) => return Ok(unbounded_edge),
431                    }
432                } else if value.is_unsigned() && value < delta {
433                    // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
434                    //       If we decide to implement a "default" construction mechanism for ScalarValue,
435                    //       change the following statement to use that.
436                    value.sub(value)?
437                } else {
438                    match value.sub_checked(delta) {
439                        Ok(v) => v,
440                        Err(_) => return Ok(unbounded_edge),
441                    }
442                };
443                targets.push(target);
444            }
445            targets
446        } else {
447            current_row_values
448        };
449        let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
450            let cmp = compare_rows(current, target, &self.sort_options)?;
451            Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
452        };
453        search_in_slice(range_columns, &end_range, compare_fn, search_start, length)
454    }
455}
456
457// In GROUPS mode, rows with duplicate sorting values are grouped together.
458// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
459// The syntax is as follows:
460//     GROUPS frame_start [ frame_exclusion ]
461//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
462// The optional frame_exclusion specifier is not yet supported.
463// The frame_start and frame_end parameters allow us to specify which rows the window
464// frame starts and ends with. They accept the following values:
465//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
466//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
467//                        that comes "offset" groups before the current group (i.e. the group
468//                        containing the current row). When used in frame_end, it refers to the
469//                        last row of the group that comes "offset" groups before the current group.
470//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
471//                   the current row. When used in frame_end, it refers to the last row of the group
472//                   containing the current row.
473//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
474//                        that comes "offset" groups after the current group (i.e. the group
475//                        containing the current row). When used in frame_end, it refers to the
476//                        last row of the group that comes "offset" groups after the current group.
477//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
478
479/// This structure encapsulates all the state information we require as we
480/// scan groups of data while processing window frames.
481#[derive(Debug, Default, Clone)]
482pub struct WindowFrameStateGroups {
483    /// A tuple containing group values and the row index where the group ends.
484    /// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
485    ///          [([1, 1], 2), ([2, 1], 4), ...].
486    pub group_end_indices: VecDeque<(Vec<ScalarValue>, usize)>,
487    /// The group index to which the row index belongs.
488    pub current_group_idx: usize,
489}
490
491impl WindowFrameStateGroups {
492    fn calculate_range(
493        &mut self,
494        window_frame: &Arc<WindowFrame>,
495        range_columns: &[ArrayRef],
496        length: usize,
497        idx: usize,
498    ) -> Result<Range<usize>> {
499        let start = match window_frame.start_bound {
500            WindowFrameBound::Preceding(ref n) => {
501                if n.is_null() {
502                    // UNBOUNDED PRECEDING
503                    0
504                } else {
505                    self.calculate_index_of_row::<true, true>(
506                        range_columns,
507                        idx,
508                        Some(n),
509                        length,
510                    )?
511                }
512            }
513            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
514                range_columns,
515                idx,
516                None,
517                length,
518            )?,
519            WindowFrameBound::Following(ref n) => self
520                .calculate_index_of_row::<true, false>(
521                    range_columns,
522                    idx,
523                    Some(n),
524                    length,
525                )?,
526        };
527        let end = match window_frame.end_bound {
528            WindowFrameBound::Preceding(ref n) => self
529                .calculate_index_of_row::<false, true>(
530                    range_columns,
531                    idx,
532                    Some(n),
533                    length,
534                )?,
535            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
536                range_columns,
537                idx,
538                None,
539                length,
540            )?,
541            WindowFrameBound::Following(ref n) => {
542                if n.is_null() {
543                    // UNBOUNDED FOLLOWING
544                    length
545                } else {
546                    self.calculate_index_of_row::<false, false>(
547                        range_columns,
548                        idx,
549                        Some(n),
550                        length,
551                    )?
552                }
553            }
554        };
555        Ok(Range { start, end })
556    }
557
558    /// This function does the heavy lifting when finding range boundaries. It is meant to be
559    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
560    /// supplied as true and false, respectively). Generic argument `SEARCH_SIDE` determines
561    /// the sign of `delta` (where true/false represents negative/positive respectively).
562    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
563        &mut self,
564        range_columns: &[ArrayRef],
565        idx: usize,
566        delta: Option<&ScalarValue>,
567        length: usize,
568    ) -> Result<usize> {
569        let delta = if let Some(delta) = delta {
570            if let ScalarValue::UInt64(Some(value)) = delta {
571                *value as usize
572            } else {
573                return internal_err!(
574                    "Unexpectedly got a non-UInt64 value in a GROUPS mode window frame"
575                );
576            }
577        } else {
578            0
579        };
580        let mut group_start = 0;
581        let last_group = self.group_end_indices.back_mut();
582        if let Some((group_row, group_end)) = last_group {
583            if *group_end < length {
584                let new_group_row = get_row_at_idx(range_columns, *group_end)?;
585                // If last/current group keys are the same, we extend the last group:
586                if new_group_row.eq(group_row) {
587                    // Update the end boundary of the group (search right boundary):
588                    *group_end = search_in_slice(
589                        range_columns,
590                        group_row,
591                        check_equality,
592                        *group_end,
593                        length,
594                    )?;
595                }
596            }
597            // Start searching from the last group boundary:
598            group_start = *group_end;
599        }
600
601        // Advance groups until `idx` is inside a group:
602        while idx >= group_start {
603            let group_row = get_row_at_idx(range_columns, group_start)?;
604            // Find end boundary of the group (search right boundary):
605            let group_end = search_in_slice(
606                range_columns,
607                &group_row,
608                check_equality,
609                group_start,
610                length,
611            )?;
612            self.group_end_indices.push_back((group_row, group_end));
613            group_start = group_end;
614        }
615
616        // Update the group index `idx` belongs to:
617        while self.current_group_idx < self.group_end_indices.len()
618            && idx >= self.group_end_indices[self.current_group_idx].1
619        {
620            self.current_group_idx += 1;
621        }
622
623        // Find the group index of the frame boundary:
624        let group_idx = if SEARCH_SIDE {
625            self.current_group_idx.saturating_sub(delta)
626        } else {
627            self.current_group_idx + delta
628        };
629
630        // Extend `group_start_indices` until it includes at least `group_idx`:
631        while self.group_end_indices.len() <= group_idx && group_start < length {
632            let group_row = get_row_at_idx(range_columns, group_start)?;
633            // Find end boundary of the group (search right boundary):
634            let group_end = search_in_slice(
635                range_columns,
636                &group_row,
637                check_equality,
638                group_start,
639                length,
640            )?;
641            self.group_end_indices.push_back((group_row, group_end));
642            group_start = group_end;
643        }
644
645        // Calculate index of the group boundary:
646        Ok(match (SIDE, SEARCH_SIDE) {
647            // Window frame start:
648            (true, _) => {
649                let group_idx = std::cmp::min(group_idx, self.group_end_indices.len());
650                if group_idx > 0 {
651                    // Normally, start at the boundary of the previous group.
652                    self.group_end_indices[group_idx - 1].1
653                } else {
654                    // If previous group is out of the table, start at zero.
655                    0
656                }
657            }
658            // Window frame end, PRECEDING n
659            (false, true) => {
660                if self.current_group_idx >= delta {
661                    let group_idx = self.current_group_idx - delta;
662                    self.group_end_indices[group_idx].1
663                } else {
664                    // Group is out of the table, therefore end at zero.
665                    0
666                }
667            }
668            // Window frame end, FOLLOWING n
669            (false, false) => {
670                let group_idx = std::cmp::min(
671                    self.current_group_idx + delta,
672                    self.group_end_indices.len() - 1,
673                );
674                self.group_end_indices[group_idx].1
675            }
676        })
677    }
678}
679
680fn check_equality(current: &[ScalarValue], target: &[ScalarValue]) -> Result<bool> {
681    Ok(current == target)
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    use arrow::array::Float64Array;
689
690    fn get_test_data() -> (Vec<ArrayRef>, Vec<SortOptions>) {
691        let range_columns: Vec<ArrayRef> = vec![Arc::new(Float64Array::from(vec![
692            5.0, 7.0, 8.0, 8.0, 9., 10., 10., 10., 11.,
693        ]))];
694        let sort_options = vec![SortOptions {
695            descending: false,
696            nulls_first: false,
697        }];
698
699        (range_columns, sort_options)
700    }
701
702    fn assert_group_ranges(
703        window_frame: &Arc<WindowFrame>,
704        expected_results: Vec<(Range<usize>, usize)>,
705    ) -> Result<()> {
706        let mut window_frame_groups = WindowFrameStateGroups::default();
707        let (range_columns, _) = get_test_data();
708        let n_row = range_columns[0].len();
709        for (idx, (expected_range, expected_group_idx)) in
710            expected_results.into_iter().enumerate()
711        {
712            let range = window_frame_groups.calculate_range(
713                window_frame,
714                &range_columns,
715                n_row,
716                idx,
717            )?;
718            assert_eq!(range, expected_range);
719            assert_eq!(window_frame_groups.current_group_idx, expected_group_idx);
720        }
721        Ok(())
722    }
723
724    fn assert_frame_ranges(
725        window_frame: &Arc<WindowFrame>,
726        expected_results: Vec<Range<usize>>,
727    ) -> Result<()> {
728        let mut window_frame_context =
729            WindowFrameContext::new(Arc::clone(window_frame), vec![]);
730        let (range_columns, _) = get_test_data();
731        let n_row = range_columns[0].len();
732        let mut last_range = Range { start: 0, end: 0 };
733        for (idx, expected_range) in expected_results.into_iter().enumerate() {
734            let range = window_frame_context.calculate_range(
735                &range_columns,
736                &last_range,
737                n_row,
738                idx,
739            )?;
740            assert_eq!(range, expected_range);
741            last_range = range;
742        }
743        Ok(())
744    }
745
746    #[test]
747    fn test_default_window_frame_group_boundaries() -> Result<()> {
748        let window_frame = Arc::new(WindowFrame::new(None));
749        assert_group_ranges(
750            &window_frame,
751            vec![
752                (Range { start: 0, end: 9 }, 0),
753                (Range { start: 0, end: 9 }, 0),
754                (Range { start: 0, end: 9 }, 0),
755                (Range { start: 0, end: 9 }, 0),
756                (Range { start: 0, end: 9 }, 0),
757                (Range { start: 0, end: 9 }, 0),
758                (Range { start: 0, end: 9 }, 0),
759                (Range { start: 0, end: 9 }, 0),
760                (Range { start: 0, end: 9 }, 0),
761            ],
762        )?;
763
764        assert_frame_ranges(
765            &window_frame,
766            vec![
767                Range { start: 0, end: 9 },
768                Range { start: 0, end: 9 },
769                Range { start: 0, end: 9 },
770                Range { start: 0, end: 9 },
771                Range { start: 0, end: 9 },
772                Range { start: 0, end: 9 },
773                Range { start: 0, end: 9 },
774                Range { start: 0, end: 9 },
775                Range { start: 0, end: 9 },
776            ],
777        )?;
778
779        Ok(())
780    }
781
782    #[test]
783    fn test_unordered_window_frame_group_boundaries() -> Result<()> {
784        let window_frame = Arc::new(WindowFrame::new(Some(false)));
785        assert_group_ranges(
786            &window_frame,
787            vec![
788                (Range { start: 0, end: 1 }, 0),
789                (Range { start: 0, end: 2 }, 1),
790                (Range { start: 0, end: 4 }, 2),
791                (Range { start: 0, end: 4 }, 2),
792                (Range { start: 0, end: 5 }, 3),
793                (Range { start: 0, end: 8 }, 4),
794                (Range { start: 0, end: 8 }, 4),
795                (Range { start: 0, end: 8 }, 4),
796                (Range { start: 0, end: 9 }, 5),
797            ],
798        )?;
799
800        assert_frame_ranges(
801            &window_frame,
802            vec![
803                Range { start: 0, end: 9 },
804                Range { start: 0, end: 9 },
805                Range { start: 0, end: 9 },
806                Range { start: 0, end: 9 },
807                Range { start: 0, end: 9 },
808                Range { start: 0, end: 9 },
809                Range { start: 0, end: 9 },
810                Range { start: 0, end: 9 },
811                Range { start: 0, end: 9 },
812            ],
813        )?;
814
815        Ok(())
816    }
817
818    #[test]
819    fn test_ordered_window_frame_group_boundaries() -> Result<()> {
820        let window_frame = Arc::new(WindowFrame::new(Some(true)));
821        assert_group_ranges(
822            &window_frame,
823            vec![
824                (Range { start: 0, end: 1 }, 0),
825                (Range { start: 0, end: 2 }, 1),
826                (Range { start: 0, end: 4 }, 2),
827                (Range { start: 0, end: 4 }, 2),
828                (Range { start: 0, end: 5 }, 3),
829                (Range { start: 0, end: 8 }, 4),
830                (Range { start: 0, end: 8 }, 4),
831                (Range { start: 0, end: 8 }, 4),
832                (Range { start: 0, end: 9 }, 5),
833            ],
834        )?;
835
836        assert_frame_ranges(
837            &window_frame,
838            vec![
839                Range { start: 0, end: 1 },
840                Range { start: 0, end: 2 },
841                Range { start: 0, end: 3 },
842                Range { start: 0, end: 4 },
843                Range { start: 0, end: 5 },
844                Range { start: 0, end: 6 },
845                Range { start: 0, end: 7 },
846                Range { start: 0, end: 8 },
847                Range { start: 0, end: 9 },
848            ],
849        )?;
850
851        Ok(())
852    }
853
854    #[test]
855    fn test_window_frame_group_boundaries() -> Result<()> {
856        let window_frame = Arc::new(WindowFrame::new_bounds(
857            WindowFrameUnits::Groups,
858            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
859            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
860        ));
861        assert_group_ranges(
862            &window_frame,
863            vec![
864                (Range { start: 0, end: 2 }, 0),
865                (Range { start: 0, end: 4 }, 1),
866                (Range { start: 1, end: 5 }, 2),
867                (Range { start: 1, end: 5 }, 2),
868                (Range { start: 2, end: 8 }, 3),
869                (Range { start: 4, end: 9 }, 4),
870                (Range { start: 4, end: 9 }, 4),
871                (Range { start: 4, end: 9 }, 4),
872                (Range { start: 5, end: 9 }, 5),
873            ],
874        )
875    }
876
877    #[test]
878    fn test_window_frame_group_boundaries_both_following() -> Result<()> {
879        let window_frame = Arc::new(WindowFrame::new_bounds(
880            WindowFrameUnits::Groups,
881            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
882            WindowFrameBound::Following(ScalarValue::UInt64(Some(2))),
883        ));
884        assert_group_ranges(
885            &window_frame,
886            vec![
887                (Range::<usize> { start: 1, end: 4 }, 0),
888                (Range::<usize> { start: 2, end: 5 }, 1),
889                (Range::<usize> { start: 4, end: 8 }, 2),
890                (Range::<usize> { start: 4, end: 8 }, 2),
891                (Range::<usize> { start: 5, end: 9 }, 3),
892                (Range::<usize> { start: 8, end: 9 }, 4),
893                (Range::<usize> { start: 8, end: 9 }, 4),
894                (Range::<usize> { start: 8, end: 9 }, 4),
895                (Range::<usize> { start: 9, end: 9 }, 5),
896            ],
897        )
898    }
899
900    #[test]
901    fn test_window_frame_group_boundaries_both_preceding() -> Result<()> {
902        let window_frame = Arc::new(WindowFrame::new_bounds(
903            WindowFrameUnits::Groups,
904            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2))),
905            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
906        ));
907        assert_group_ranges(
908            &window_frame,
909            vec![
910                (Range::<usize> { start: 0, end: 0 }, 0),
911                (Range::<usize> { start: 0, end: 1 }, 1),
912                (Range::<usize> { start: 0, end: 2 }, 2),
913                (Range::<usize> { start: 0, end: 2 }, 2),
914                (Range::<usize> { start: 1, end: 4 }, 3),
915                (Range::<usize> { start: 2, end: 5 }, 4),
916                (Range::<usize> { start: 2, end: 5 }, 4),
917                (Range::<usize> { start: 2, end: 5 }, 4),
918                (Range::<usize> { start: 4, end: 8 }, 5),
919            ],
920        )
921    }
922}