datafusion_expr/
window_state.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Structures used to hold window function state (for implementing WindowUDFs)
19
20use std::{collections::VecDeque, ops::Range, sync::Arc};
21
22use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};
23
24use arrow::{
25    array::ArrayRef,
26    compute::{SortOptions, concat, concat_batches},
27    datatypes::{DataType, SchemaRef},
28    record_batch::RecordBatch,
29};
30use datafusion_common::{
31    Result, ScalarValue, internal_datafusion_err, internal_err,
32    utils::{compare_rows, get_row_at_idx, search_in_slice},
33};
34
35/// Holds the state of evaluating a window function
36#[derive(Debug, Clone)]
37pub struct WindowAggState {
38    /// The range that we calculate the window function
39    pub window_frame_range: Range<usize>,
40    pub window_frame_ctx: Option<WindowFrameContext>,
41    /// The index of the last row that its result is calculated inside the partition record batch buffer.
42    pub last_calculated_index: usize,
43    /// The offset of the deleted row number
44    pub offset_pruned_rows: usize,
45    /// Stores the results calculated by window frame
46    pub out_col: ArrayRef,
47    /// Keeps track of how many rows should be generated to be in sync with input record_batch.
48    // (For each row in the input record batch we need to generate a window result).
49    pub n_row_result_missing: usize,
50    /// Flag indicating whether we have received all data for this partition
51    pub is_end: bool,
52}
53
54impl WindowAggState {
55    pub fn prune_state(&mut self, n_prune: usize) {
56        self.window_frame_range = Range {
57            start: self.window_frame_range.start - n_prune,
58            end: self.window_frame_range.end - n_prune,
59        };
60        self.last_calculated_index -= n_prune;
61        self.offset_pruned_rows += n_prune;
62
63        match self.window_frame_ctx.as_mut() {
64            // Rows have no state do nothing
65            Some(WindowFrameContext::Rows(_)) => {}
66            Some(WindowFrameContext::Range { .. }) => {}
67            Some(WindowFrameContext::Groups { state, .. }) => {
68                let mut n_group_to_del = 0;
69                for (_, end_idx) in &state.group_end_indices {
70                    if n_prune < *end_idx {
71                        break;
72                    }
73                    n_group_to_del += 1;
74                }
75                state.group_end_indices.drain(0..n_group_to_del);
76                state
77                    .group_end_indices
78                    .iter_mut()
79                    .for_each(|(_, start_idx)| *start_idx -= n_prune);
80                state.current_group_idx -= n_group_to_del;
81            }
82            None => {}
83        };
84    }
85
86    pub fn update(
87        &mut self,
88        out_col: &ArrayRef,
89        partition_batch_state: &PartitionBatchState,
90    ) -> Result<()> {
91        self.last_calculated_index += out_col.len();
92        // no need to use concat if the current `out_col` is empty
93        if self.out_col.is_empty() {
94            self.out_col = Arc::clone(out_col);
95        } else {
96            self.out_col = concat(&[&self.out_col, &out_col])?;
97        }
98        self.n_row_result_missing =
99            partition_batch_state.record_batch.num_rows() - self.last_calculated_index;
100        self.is_end = partition_batch_state.is_end;
101        Ok(())
102    }
103
104    pub fn new(out_type: &DataType) -> Result<Self> {
105        let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
106        Ok(Self {
107            window_frame_range: Range { start: 0, end: 0 },
108            window_frame_ctx: None,
109            last_calculated_index: 0,
110            offset_pruned_rows: 0,
111            out_col: empty_out_col,
112            n_row_result_missing: 0,
113            is_end: false,
114        })
115    }
116}
117
118/// This object stores the window frame state for use in incremental calculations.
119#[derive(Debug, Clone)]
120pub enum WindowFrameContext {
121    /// ROWS frames are inherently stateless.
122    Rows(Arc<WindowFrame>),
123    /// RANGE frames are stateful, they store indices specifying where the
124    /// previous search left off. This amortizes the overall cost to O(n)
125    /// where n denotes the row count.
126    Range {
127        window_frame: Arc<WindowFrame>,
128        state: WindowFrameStateRange,
129    },
130    /// GROUPS frames are stateful, they store group boundaries and indices
131    /// specifying where the previous search left off. This amortizes the
132    /// overall cost to O(n) where n denotes the row count.
133    Groups {
134        window_frame: Arc<WindowFrame>,
135        state: WindowFrameStateGroups,
136    },
137}
138
139impl WindowFrameContext {
140    /// Create a new state object for the given window frame.
141    pub fn new(window_frame: Arc<WindowFrame>, sort_options: Vec<SortOptions>) -> Self {
142        match window_frame.units {
143            WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
144            WindowFrameUnits::Range => WindowFrameContext::Range {
145                window_frame,
146                state: WindowFrameStateRange::new(sort_options),
147            },
148            WindowFrameUnits::Groups => WindowFrameContext::Groups {
149                window_frame,
150                state: WindowFrameStateGroups::default(),
151            },
152        }
153    }
154
155    /// This function calculates beginning/ending indices for the frame of the current row.
156    pub fn calculate_range(
157        &mut self,
158        range_columns: &[ArrayRef],
159        last_range: &Range<usize>,
160        length: usize,
161        idx: usize,
162    ) -> Result<Range<usize>> {
163        match self {
164            WindowFrameContext::Rows(window_frame) => {
165                Self::calculate_range_rows(window_frame, length, idx)
166            }
167            // Sort options is used in RANGE mode calculations because the
168            // ordering or position of NULLs impact range calculations and
169            // comparison of rows.
170            WindowFrameContext::Range {
171                window_frame,
172                state,
173            } => state.calculate_range(
174                window_frame,
175                last_range,
176                range_columns,
177                length,
178                idx,
179            ),
180            // Sort options is not used in GROUPS mode calculations as the
181            // inequality of two rows indicates a group change, and ordering
182            // or position of NULLs do not impact inequality.
183            WindowFrameContext::Groups {
184                window_frame,
185                state,
186            } => state.calculate_range(window_frame, range_columns, length, idx),
187        }
188    }
189
190    /// This function calculates beginning/ending indices for the frame of the current row.
191    fn calculate_range_rows(
192        window_frame: &Arc<WindowFrame>,
193        length: usize,
194        idx: usize,
195    ) -> Result<Range<usize>> {
196        let start = match window_frame.start_bound {
197            // UNBOUNDED PRECEDING
198            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
199            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
200                idx.saturating_sub(n as usize)
201            }
202            WindowFrameBound::CurrentRow => idx,
203            // UNBOUNDED FOLLOWING
204            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
205                return internal_err!(
206                    "Frame start cannot be UNBOUNDED FOLLOWING '{window_frame:?}'"
207                );
208            }
209            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
210                std::cmp::min(idx + n as usize, length)
211            }
212            // ERRONEOUS FRAMES
213            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
214                return internal_err!("Rows should be UInt64");
215            }
216        };
217        let end = match window_frame.end_bound {
218            // UNBOUNDED PRECEDING
219            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
220                return internal_err!(
221                    "Frame end cannot be UNBOUNDED PRECEDING '{window_frame:?}'"
222                );
223            }
224            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
225                if idx >= n as usize {
226                    idx - n as usize + 1
227                } else {
228                    0
229                }
230            }
231            WindowFrameBound::CurrentRow => idx + 1,
232            // UNBOUNDED FOLLOWING
233            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
234            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
235                std::cmp::min(idx + n as usize + 1, length)
236            }
237            // ERRONEOUS FRAMES
238            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
239                return internal_err!("Rows should be UInt64");
240            }
241        };
242        Ok(Range { start, end })
243    }
244}
245
246/// State for each unique partition determined according to PARTITION BY column(s)
247#[derive(Debug, Clone, PartialEq)]
248pub struct PartitionBatchState {
249    /// The record batch belonging to current partition
250    pub record_batch: RecordBatch,
251    /// The record batch that contains the most recent row at the input.
252    /// Please note that this batch doesn't necessarily have the same partitioning
253    /// with `record_batch`. Keeping track of this batch enables us to prune
254    /// `record_batch` when cardinality of the partition is sparse.
255    pub most_recent_row: Option<RecordBatch>,
256    /// Flag indicating whether we have received all data for this partition
257    pub is_end: bool,
258    /// Number of rows emitted for each partition
259    pub n_out_row: usize,
260}
261
262impl PartitionBatchState {
263    pub fn new(schema: SchemaRef) -> Self {
264        Self {
265            record_batch: RecordBatch::new_empty(schema),
266            most_recent_row: None,
267            is_end: false,
268            n_out_row: 0,
269        }
270    }
271
272    pub fn new_with_batch(batch: RecordBatch) -> Self {
273        Self {
274            record_batch: batch,
275            most_recent_row: None,
276            is_end: false,
277            n_out_row: 0,
278        }
279    }
280
281    pub fn extend(&mut self, batch: &RecordBatch) -> Result<()> {
282        self.record_batch =
283            concat_batches(&self.record_batch.schema(), [&self.record_batch, batch])?;
284        Ok(())
285    }
286
287    pub fn set_most_recent_row(&mut self, batch: RecordBatch) {
288        // It is enough for the batch to contain only a single row (the rest
289        // are not necessary).
290        self.most_recent_row = Some(batch);
291    }
292}
293
294/// This structure encapsulates all the state information we require as we scan
295/// ranges of data while processing RANGE frames.
296/// Attribute `sort_options` stores the column ordering specified by the ORDER
297/// BY clause. This information is used to calculate the range.
298#[derive(Debug, Default, Clone)]
299pub struct WindowFrameStateRange {
300    sort_options: Vec<SortOptions>,
301}
302
303impl WindowFrameStateRange {
304    /// Create a new object to store the search state.
305    fn new(sort_options: Vec<SortOptions>) -> Self {
306        Self { sort_options }
307    }
308
309    /// This function calculates beginning/ending indices for the frame of the current row.
310    // Argument `last_range` stores the resulting indices from the previous search. Since the indices only
311    // advance forward, we start from `last_range` subsequently. Thus, the overall
312    // time complexity of linear search amortizes to O(n) where n denotes the total
313    // row count.
314    fn calculate_range(
315        &mut self,
316        window_frame: &Arc<WindowFrame>,
317        last_range: &Range<usize>,
318        range_columns: &[ArrayRef],
319        length: usize,
320        idx: usize,
321    ) -> Result<Range<usize>> {
322        let start = match window_frame.start_bound {
323            WindowFrameBound::Preceding(ref n) => {
324                if n.is_null() {
325                    // UNBOUNDED PRECEDING
326                    0
327                } else {
328                    self.calculate_index_of_row::<true, true>(
329                        range_columns,
330                        last_range,
331                        idx,
332                        Some(n),
333                        length,
334                    )?
335                }
336            }
337            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
338                range_columns,
339                last_range,
340                idx,
341                None,
342                length,
343            )?,
344            WindowFrameBound::Following(ref n) => self
345                .calculate_index_of_row::<true, false>(
346                    range_columns,
347                    last_range,
348                    idx,
349                    Some(n),
350                    length,
351                )?,
352        };
353        let end = match window_frame.end_bound {
354            WindowFrameBound::Preceding(ref n) => self
355                .calculate_index_of_row::<false, true>(
356                    range_columns,
357                    last_range,
358                    idx,
359                    Some(n),
360                    length,
361                )?,
362            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
363                range_columns,
364                last_range,
365                idx,
366                None,
367                length,
368            )?,
369            WindowFrameBound::Following(ref n) => {
370                if n.is_null() {
371                    // UNBOUNDED FOLLOWING
372                    length
373                } else {
374                    self.calculate_index_of_row::<false, false>(
375                        range_columns,
376                        last_range,
377                        idx,
378                        Some(n),
379                        length,
380                    )?
381                }
382            }
383        };
384        Ok(Range { start, end })
385    }
386
387    /// This function does the heavy lifting when finding range boundaries. It is meant to be
388    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
389    /// supplied as true and false, respectively).
390    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
391        &mut self,
392        range_columns: &[ArrayRef],
393        last_range: &Range<usize>,
394        idx: usize,
395        delta: Option<&ScalarValue>,
396        length: usize,
397    ) -> Result<usize> {
398        let current_row_values = get_row_at_idx(range_columns, idx)?;
399        let end_range = if let Some(delta) = delta {
400            let is_descending: bool = self
401                .sort_options
402                .first()
403                .ok_or_else(|| {
404                    internal_datafusion_err!(
405                        "Sort options unexpectedly absent in a window frame"
406                    )
407                })?
408                .descending;
409
410            current_row_values
411                .iter()
412                .map(|value| {
413                    if value.is_null() {
414                        return Ok(value.clone());
415                    }
416                    if SEARCH_SIDE == is_descending {
417                        // TODO: Handle positive overflows.
418                        value.add(delta)
419                    } else if value.is_unsigned() && value < delta {
420                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
421                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
422                        //       change the following statement to use that.
423                        value.sub(value)
424                    } else {
425                        // TODO: Handle negative overflows.
426                        value.sub(delta)
427                    }
428                })
429                .collect::<Result<Vec<ScalarValue>>>()?
430        } else {
431            current_row_values
432        };
433        let search_start = if SIDE {
434            last_range.start
435        } else {
436            last_range.end
437        };
438        let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
439            let cmp = compare_rows(current, target, &self.sort_options)?;
440            Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
441        };
442        search_in_slice(range_columns, &end_range, compare_fn, search_start, length)
443    }
444}
445
446// In GROUPS mode, rows with duplicate sorting values are grouped together.
447// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
448// The syntax is as follows:
449//     GROUPS frame_start [ frame_exclusion ]
450//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
451// The optional frame_exclusion specifier is not yet supported.
452// The frame_start and frame_end parameters allow us to specify which rows the window
453// frame starts and ends with. They accept the following values:
454//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
455//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
456//                        that comes "offset" groups before the current group (i.e. the group
457//                        containing the current row). When used in frame_end, it refers to the
458//                        last row of the group that comes "offset" groups before the current group.
459//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
460//                   the current row. When used in frame_end, it refers to the last row of the group
461//                   containing the current row.
462//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
463//                        that comes "offset" groups after the current group (i.e. the group
464//                        containing the current row). When used in frame_end, it refers to the
465//                        last row of the group that comes "offset" groups after the current group.
466//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
467
468/// This structure encapsulates all the state information we require as we
469/// scan groups of data while processing window frames.
470#[derive(Debug, Default, Clone)]
471pub struct WindowFrameStateGroups {
472    /// A tuple containing group values and the row index where the group ends.
473    /// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
474    ///          [([1, 1], 2), ([2, 1], 4), ...].
475    pub group_end_indices: VecDeque<(Vec<ScalarValue>, usize)>,
476    /// The group index to which the row index belongs.
477    pub current_group_idx: usize,
478}
479
480impl WindowFrameStateGroups {
481    fn calculate_range(
482        &mut self,
483        window_frame: &Arc<WindowFrame>,
484        range_columns: &[ArrayRef],
485        length: usize,
486        idx: usize,
487    ) -> Result<Range<usize>> {
488        let start = match window_frame.start_bound {
489            WindowFrameBound::Preceding(ref n) => {
490                if n.is_null() {
491                    // UNBOUNDED PRECEDING
492                    0
493                } else {
494                    self.calculate_index_of_row::<true, true>(
495                        range_columns,
496                        idx,
497                        Some(n),
498                        length,
499                    )?
500                }
501            }
502            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
503                range_columns,
504                idx,
505                None,
506                length,
507            )?,
508            WindowFrameBound::Following(ref n) => self
509                .calculate_index_of_row::<true, false>(
510                    range_columns,
511                    idx,
512                    Some(n),
513                    length,
514                )?,
515        };
516        let end = match window_frame.end_bound {
517            WindowFrameBound::Preceding(ref n) => self
518                .calculate_index_of_row::<false, true>(
519                    range_columns,
520                    idx,
521                    Some(n),
522                    length,
523                )?,
524            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
525                range_columns,
526                idx,
527                None,
528                length,
529            )?,
530            WindowFrameBound::Following(ref n) => {
531                if n.is_null() {
532                    // UNBOUNDED FOLLOWING
533                    length
534                } else {
535                    self.calculate_index_of_row::<false, false>(
536                        range_columns,
537                        idx,
538                        Some(n),
539                        length,
540                    )?
541                }
542            }
543        };
544        Ok(Range { start, end })
545    }
546
547    /// This function does the heavy lifting when finding range boundaries. It is meant to be
548    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
549    /// supplied as true and false, respectively). Generic argument `SEARCH_SIDE` determines
550    /// the sign of `delta` (where true/false represents negative/positive respectively).
551    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
552        &mut self,
553        range_columns: &[ArrayRef],
554        idx: usize,
555        delta: Option<&ScalarValue>,
556        length: usize,
557    ) -> Result<usize> {
558        let delta = if let Some(delta) = delta {
559            if let ScalarValue::UInt64(Some(value)) = delta {
560                *value as usize
561            } else {
562                return internal_err!(
563                    "Unexpectedly got a non-UInt64 value in a GROUPS mode window frame"
564                );
565            }
566        } else {
567            0
568        };
569        let mut group_start = 0;
570        let last_group = self.group_end_indices.back_mut();
571        if let Some((group_row, group_end)) = last_group {
572            if *group_end < length {
573                let new_group_row = get_row_at_idx(range_columns, *group_end)?;
574                // If last/current group keys are the same, we extend the last group:
575                if new_group_row.eq(group_row) {
576                    // Update the end boundary of the group (search right boundary):
577                    *group_end = search_in_slice(
578                        range_columns,
579                        group_row,
580                        check_equality,
581                        *group_end,
582                        length,
583                    )?;
584                }
585            }
586            // Start searching from the last group boundary:
587            group_start = *group_end;
588        }
589
590        // Advance groups until `idx` is inside a group:
591        while idx >= group_start {
592            let group_row = get_row_at_idx(range_columns, group_start)?;
593            // Find end boundary of the group (search right boundary):
594            let group_end = search_in_slice(
595                range_columns,
596                &group_row,
597                check_equality,
598                group_start,
599                length,
600            )?;
601            self.group_end_indices.push_back((group_row, group_end));
602            group_start = group_end;
603        }
604
605        // Update the group index `idx` belongs to:
606        while self.current_group_idx < self.group_end_indices.len()
607            && idx >= self.group_end_indices[self.current_group_idx].1
608        {
609            self.current_group_idx += 1;
610        }
611
612        // Find the group index of the frame boundary:
613        let group_idx = if SEARCH_SIDE {
614            self.current_group_idx.saturating_sub(delta)
615        } else {
616            self.current_group_idx + delta
617        };
618
619        // Extend `group_start_indices` until it includes at least `group_idx`:
620        while self.group_end_indices.len() <= group_idx && group_start < length {
621            let group_row = get_row_at_idx(range_columns, group_start)?;
622            // Find end boundary of the group (search right boundary):
623            let group_end = search_in_slice(
624                range_columns,
625                &group_row,
626                check_equality,
627                group_start,
628                length,
629            )?;
630            self.group_end_indices.push_back((group_row, group_end));
631            group_start = group_end;
632        }
633
634        // Calculate index of the group boundary:
635        Ok(match (SIDE, SEARCH_SIDE) {
636            // Window frame start:
637            (true, _) => {
638                let group_idx = std::cmp::min(group_idx, self.group_end_indices.len());
639                if group_idx > 0 {
640                    // Normally, start at the boundary of the previous group.
641                    self.group_end_indices[group_idx - 1].1
642                } else {
643                    // If previous group is out of the table, start at zero.
644                    0
645                }
646            }
647            // Window frame end, PRECEDING n
648            (false, true) => {
649                if self.current_group_idx >= delta {
650                    let group_idx = self.current_group_idx - delta;
651                    self.group_end_indices[group_idx].1
652                } else {
653                    // Group is out of the table, therefore end at zero.
654                    0
655                }
656            }
657            // Window frame end, FOLLOWING n
658            (false, false) => {
659                let group_idx = std::cmp::min(
660                    self.current_group_idx + delta,
661                    self.group_end_indices.len() - 1,
662                );
663                self.group_end_indices[group_idx].1
664            }
665        })
666    }
667}
668
669fn check_equality(current: &[ScalarValue], target: &[ScalarValue]) -> Result<bool> {
670    Ok(current == target)
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676
677    use arrow::array::Float64Array;
678
679    fn get_test_data() -> (Vec<ArrayRef>, Vec<SortOptions>) {
680        let range_columns: Vec<ArrayRef> = vec![Arc::new(Float64Array::from(vec![
681            5.0, 7.0, 8.0, 8.0, 9., 10., 10., 10., 11.,
682        ]))];
683        let sort_options = vec![SortOptions {
684            descending: false,
685            nulls_first: false,
686        }];
687
688        (range_columns, sort_options)
689    }
690
691    fn assert_group_ranges(
692        window_frame: &Arc<WindowFrame>,
693        expected_results: Vec<(Range<usize>, usize)>,
694    ) -> Result<()> {
695        let mut window_frame_groups = WindowFrameStateGroups::default();
696        let (range_columns, _) = get_test_data();
697        let n_row = range_columns[0].len();
698        for (idx, (expected_range, expected_group_idx)) in
699            expected_results.into_iter().enumerate()
700        {
701            let range = window_frame_groups.calculate_range(
702                window_frame,
703                &range_columns,
704                n_row,
705                idx,
706            )?;
707            assert_eq!(range, expected_range);
708            assert_eq!(window_frame_groups.current_group_idx, expected_group_idx);
709        }
710        Ok(())
711    }
712
713    fn assert_frame_ranges(
714        window_frame: &Arc<WindowFrame>,
715        expected_results: Vec<Range<usize>>,
716    ) -> Result<()> {
717        let mut window_frame_context =
718            WindowFrameContext::new(Arc::clone(window_frame), vec![]);
719        let (range_columns, _) = get_test_data();
720        let n_row = range_columns[0].len();
721        let mut last_range = Range { start: 0, end: 0 };
722        for (idx, expected_range) in expected_results.into_iter().enumerate() {
723            let range = window_frame_context.calculate_range(
724                &range_columns,
725                &last_range,
726                n_row,
727                idx,
728            )?;
729            assert_eq!(range, expected_range);
730            last_range = range;
731        }
732        Ok(())
733    }
734
735    #[test]
736    fn test_default_window_frame_group_boundaries() -> Result<()> {
737        let window_frame = Arc::new(WindowFrame::new(None));
738        assert_group_ranges(
739            &window_frame,
740            vec![
741                (Range { start: 0, end: 9 }, 0),
742                (Range { start: 0, end: 9 }, 0),
743                (Range { start: 0, end: 9 }, 0),
744                (Range { start: 0, end: 9 }, 0),
745                (Range { start: 0, end: 9 }, 0),
746                (Range { start: 0, end: 9 }, 0),
747                (Range { start: 0, end: 9 }, 0),
748                (Range { start: 0, end: 9 }, 0),
749                (Range { start: 0, end: 9 }, 0),
750            ],
751        )?;
752
753        assert_frame_ranges(
754            &window_frame,
755            vec![
756                Range { start: 0, end: 9 },
757                Range { start: 0, end: 9 },
758                Range { start: 0, end: 9 },
759                Range { start: 0, end: 9 },
760                Range { start: 0, end: 9 },
761                Range { start: 0, end: 9 },
762                Range { start: 0, end: 9 },
763                Range { start: 0, end: 9 },
764                Range { start: 0, end: 9 },
765            ],
766        )?;
767
768        Ok(())
769    }
770
771    #[test]
772    fn test_unordered_window_frame_group_boundaries() -> Result<()> {
773        let window_frame = Arc::new(WindowFrame::new(Some(false)));
774        assert_group_ranges(
775            &window_frame,
776            vec![
777                (Range { start: 0, end: 1 }, 0),
778                (Range { start: 0, end: 2 }, 1),
779                (Range { start: 0, end: 4 }, 2),
780                (Range { start: 0, end: 4 }, 2),
781                (Range { start: 0, end: 5 }, 3),
782                (Range { start: 0, end: 8 }, 4),
783                (Range { start: 0, end: 8 }, 4),
784                (Range { start: 0, end: 8 }, 4),
785                (Range { start: 0, end: 9 }, 5),
786            ],
787        )?;
788
789        assert_frame_ranges(
790            &window_frame,
791            vec![
792                Range { start: 0, end: 9 },
793                Range { start: 0, end: 9 },
794                Range { start: 0, end: 9 },
795                Range { start: 0, end: 9 },
796                Range { start: 0, end: 9 },
797                Range { start: 0, end: 9 },
798                Range { start: 0, end: 9 },
799                Range { start: 0, end: 9 },
800                Range { start: 0, end: 9 },
801            ],
802        )?;
803
804        Ok(())
805    }
806
807    #[test]
808    fn test_ordered_window_frame_group_boundaries() -> Result<()> {
809        let window_frame = Arc::new(WindowFrame::new(Some(true)));
810        assert_group_ranges(
811            &window_frame,
812            vec![
813                (Range { start: 0, end: 1 }, 0),
814                (Range { start: 0, end: 2 }, 1),
815                (Range { start: 0, end: 4 }, 2),
816                (Range { start: 0, end: 4 }, 2),
817                (Range { start: 0, end: 5 }, 3),
818                (Range { start: 0, end: 8 }, 4),
819                (Range { start: 0, end: 8 }, 4),
820                (Range { start: 0, end: 8 }, 4),
821                (Range { start: 0, end: 9 }, 5),
822            ],
823        )?;
824
825        assert_frame_ranges(
826            &window_frame,
827            vec![
828                Range { start: 0, end: 1 },
829                Range { start: 0, end: 2 },
830                Range { start: 0, end: 3 },
831                Range { start: 0, end: 4 },
832                Range { start: 0, end: 5 },
833                Range { start: 0, end: 6 },
834                Range { start: 0, end: 7 },
835                Range { start: 0, end: 8 },
836                Range { start: 0, end: 9 },
837            ],
838        )?;
839
840        Ok(())
841    }
842
843    #[test]
844    fn test_window_frame_group_boundaries() -> Result<()> {
845        let window_frame = Arc::new(WindowFrame::new_bounds(
846            WindowFrameUnits::Groups,
847            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
848            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
849        ));
850        assert_group_ranges(
851            &window_frame,
852            vec![
853                (Range { start: 0, end: 2 }, 0),
854                (Range { start: 0, end: 4 }, 1),
855                (Range { start: 1, end: 5 }, 2),
856                (Range { start: 1, end: 5 }, 2),
857                (Range { start: 2, end: 8 }, 3),
858                (Range { start: 4, end: 9 }, 4),
859                (Range { start: 4, end: 9 }, 4),
860                (Range { start: 4, end: 9 }, 4),
861                (Range { start: 5, end: 9 }, 5),
862            ],
863        )
864    }
865
866    #[test]
867    fn test_window_frame_group_boundaries_both_following() -> Result<()> {
868        let window_frame = Arc::new(WindowFrame::new_bounds(
869            WindowFrameUnits::Groups,
870            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
871            WindowFrameBound::Following(ScalarValue::UInt64(Some(2))),
872        ));
873        assert_group_ranges(
874            &window_frame,
875            vec![
876                (Range::<usize> { start: 1, end: 4 }, 0),
877                (Range::<usize> { start: 2, end: 5 }, 1),
878                (Range::<usize> { start: 4, end: 8 }, 2),
879                (Range::<usize> { start: 4, end: 8 }, 2),
880                (Range::<usize> { start: 5, end: 9 }, 3),
881                (Range::<usize> { start: 8, end: 9 }, 4),
882                (Range::<usize> { start: 8, end: 9 }, 4),
883                (Range::<usize> { start: 8, end: 9 }, 4),
884                (Range::<usize> { start: 9, end: 9 }, 5),
885            ],
886        )
887    }
888
889    #[test]
890    fn test_window_frame_group_boundaries_both_preceding() -> Result<()> {
891        let window_frame = Arc::new(WindowFrame::new_bounds(
892            WindowFrameUnits::Groups,
893            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2))),
894            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
895        ));
896        assert_group_ranges(
897            &window_frame,
898            vec![
899                (Range::<usize> { start: 0, end: 0 }, 0),
900                (Range::<usize> { start: 0, end: 1 }, 1),
901                (Range::<usize> { start: 0, end: 2 }, 2),
902                (Range::<usize> { start: 0, end: 2 }, 2),
903                (Range::<usize> { start: 1, end: 4 }, 3),
904                (Range::<usize> { start: 2, end: 5 }, 4),
905                (Range::<usize> { start: 2, end: 5 }, 4),
906                (Range::<usize> { start: 2, end: 5 }, 4),
907                (Range::<usize> { start: 4, end: 8 }, 5),
908            ],
909        )
910    }
911}