datafusion_expr/
window_state.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Structures used to hold window function state (for implementing WindowUDFs)
19
20use std::{collections::VecDeque, ops::Range, sync::Arc};
21
22use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};
23
24use arrow::{
25    array::ArrayRef,
26    compute::{concat, concat_batches, SortOptions},
27    datatypes::{DataType, SchemaRef},
28    record_batch::RecordBatch,
29};
30use datafusion_common::{
31    internal_datafusion_err, internal_err,
32    utils::{compare_rows, get_row_at_idx, search_in_slice},
33    Result, ScalarValue,
34};
35
36/// Holds the state of evaluating a window function
37#[derive(Debug, Clone)]
38pub struct WindowAggState {
39    /// The range that we calculate the window function
40    pub window_frame_range: Range<usize>,
41    pub window_frame_ctx: Option<WindowFrameContext>,
42    /// The index of the last row that its result is calculated inside the partition record batch buffer.
43    pub last_calculated_index: usize,
44    /// The offset of the deleted row number
45    pub offset_pruned_rows: usize,
46    /// Stores the results calculated by window frame
47    pub out_col: ArrayRef,
48    /// Keeps track of how many rows should be generated to be in sync with input record_batch.
49    // (For each row in the input record batch we need to generate a window result).
50    pub n_row_result_missing: usize,
51    /// Flag indicating whether we have received all data for this partition
52    pub is_end: bool,
53}
54
55impl WindowAggState {
56    pub fn prune_state(&mut self, n_prune: usize) {
57        self.window_frame_range = Range {
58            start: self.window_frame_range.start - n_prune,
59            end: self.window_frame_range.end - n_prune,
60        };
61        self.last_calculated_index -= n_prune;
62        self.offset_pruned_rows += n_prune;
63
64        match self.window_frame_ctx.as_mut() {
65            // Rows have no state do nothing
66            Some(WindowFrameContext::Rows(_)) => {}
67            Some(WindowFrameContext::Range { .. }) => {}
68            Some(WindowFrameContext::Groups { state, .. }) => {
69                let mut n_group_to_del = 0;
70                for (_, end_idx) in &state.group_end_indices {
71                    if n_prune < *end_idx {
72                        break;
73                    }
74                    n_group_to_del += 1;
75                }
76                state.group_end_indices.drain(0..n_group_to_del);
77                state
78                    .group_end_indices
79                    .iter_mut()
80                    .for_each(|(_, start_idx)| *start_idx -= n_prune);
81                state.current_group_idx -= n_group_to_del;
82            }
83            None => {}
84        };
85    }
86
87    pub fn update(
88        &mut self,
89        out_col: &ArrayRef,
90        partition_batch_state: &PartitionBatchState,
91    ) -> Result<()> {
92        self.last_calculated_index += out_col.len();
93        // no need to use concat if the current `out_col` is empty
94        if self.out_col.is_empty() {
95            self.out_col = Arc::clone(out_col);
96        } else {
97            self.out_col = concat(&[&self.out_col, &out_col])?;
98        }
99        self.n_row_result_missing =
100            partition_batch_state.record_batch.num_rows() - self.last_calculated_index;
101        self.is_end = partition_batch_state.is_end;
102        Ok(())
103    }
104
105    pub fn new(out_type: &DataType) -> Result<Self> {
106        let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
107        Ok(Self {
108            window_frame_range: Range { start: 0, end: 0 },
109            window_frame_ctx: None,
110            last_calculated_index: 0,
111            offset_pruned_rows: 0,
112            out_col: empty_out_col,
113            n_row_result_missing: 0,
114            is_end: false,
115        })
116    }
117}
118
119/// This object stores the window frame state for use in incremental calculations.
120#[derive(Debug, Clone)]
121pub enum WindowFrameContext {
122    /// ROWS frames are inherently stateless.
123    Rows(Arc<WindowFrame>),
124    /// RANGE frames are stateful, they store indices specifying where the
125    /// previous search left off. This amortizes the overall cost to O(n)
126    /// where n denotes the row count.
127    Range {
128        window_frame: Arc<WindowFrame>,
129        state: WindowFrameStateRange,
130    },
131    /// GROUPS frames are stateful, they store group boundaries and indices
132    /// specifying where the previous search left off. This amortizes the
133    /// overall cost to O(n) where n denotes the row count.
134    Groups {
135        window_frame: Arc<WindowFrame>,
136        state: WindowFrameStateGroups,
137    },
138}
139
140impl WindowFrameContext {
141    /// Create a new state object for the given window frame.
142    pub fn new(window_frame: Arc<WindowFrame>, sort_options: Vec<SortOptions>) -> Self {
143        match window_frame.units {
144            WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
145            WindowFrameUnits::Range => WindowFrameContext::Range {
146                window_frame,
147                state: WindowFrameStateRange::new(sort_options),
148            },
149            WindowFrameUnits::Groups => WindowFrameContext::Groups {
150                window_frame,
151                state: WindowFrameStateGroups::default(),
152            },
153        }
154    }
155
156    /// This function calculates beginning/ending indices for the frame of the current row.
157    pub fn calculate_range(
158        &mut self,
159        range_columns: &[ArrayRef],
160        last_range: &Range<usize>,
161        length: usize,
162        idx: usize,
163    ) -> Result<Range<usize>> {
164        match self {
165            WindowFrameContext::Rows(window_frame) => {
166                Self::calculate_range_rows(window_frame, length, idx)
167            }
168            // Sort options is used in RANGE mode calculations because the
169            // ordering or position of NULLs impact range calculations and
170            // comparison of rows.
171            WindowFrameContext::Range {
172                window_frame,
173                ref mut state,
174            } => state.calculate_range(
175                window_frame,
176                last_range,
177                range_columns,
178                length,
179                idx,
180            ),
181            // Sort options is not used in GROUPS mode calculations as the
182            // inequality of two rows indicates a group change, and ordering
183            // or position of NULLs do not impact inequality.
184            WindowFrameContext::Groups {
185                window_frame,
186                ref mut state,
187            } => state.calculate_range(window_frame, range_columns, length, idx),
188        }
189    }
190
191    /// This function calculates beginning/ending indices for the frame of the current row.
192    fn calculate_range_rows(
193        window_frame: &Arc<WindowFrame>,
194        length: usize,
195        idx: usize,
196    ) -> Result<Range<usize>> {
197        let start = match window_frame.start_bound {
198            // UNBOUNDED PRECEDING
199            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
200            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
201                idx.saturating_sub(n as usize)
202            }
203            WindowFrameBound::CurrentRow => idx,
204            // UNBOUNDED FOLLOWING
205            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
206                return internal_err!(
207                    "Frame start cannot be UNBOUNDED FOLLOWING '{window_frame:?}'"
208                )
209            }
210            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
211                std::cmp::min(idx + n as usize, length)
212            }
213            // ERRONEOUS FRAMES
214            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
215                return internal_err!("Rows should be UInt64")
216            }
217        };
218        let end = match window_frame.end_bound {
219            // UNBOUNDED PRECEDING
220            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
221                return internal_err!(
222                    "Frame end cannot be UNBOUNDED PRECEDING '{window_frame:?}'"
223                )
224            }
225            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
226                if idx >= n as usize {
227                    idx - n as usize + 1
228                } else {
229                    0
230                }
231            }
232            WindowFrameBound::CurrentRow => idx + 1,
233            // UNBOUNDED FOLLOWING
234            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
235            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
236                std::cmp::min(idx + n as usize + 1, length)
237            }
238            // ERRONEOUS FRAMES
239            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
240                return internal_err!("Rows should be UInt64")
241            }
242        };
243        Ok(Range { start, end })
244    }
245}
246
247/// State for each unique partition determined according to PARTITION BY column(s)
248#[derive(Debug, Clone, PartialEq)]
249pub struct PartitionBatchState {
250    /// The record batch belonging to current partition
251    pub record_batch: RecordBatch,
252    /// The record batch that contains the most recent row at the input.
253    /// Please note that this batch doesn't necessarily have the same partitioning
254    /// with `record_batch`. Keeping track of this batch enables us to prune
255    /// `record_batch` when cardinality of the partition is sparse.
256    pub most_recent_row: Option<RecordBatch>,
257    /// Flag indicating whether we have received all data for this partition
258    pub is_end: bool,
259    /// Number of rows emitted for each partition
260    pub n_out_row: usize,
261}
262
263impl PartitionBatchState {
264    pub fn new(schema: SchemaRef) -> Self {
265        Self {
266            record_batch: RecordBatch::new_empty(schema),
267            most_recent_row: None,
268            is_end: false,
269            n_out_row: 0,
270        }
271    }
272
273    pub fn new_with_batch(batch: RecordBatch) -> Self {
274        Self {
275            record_batch: batch,
276            most_recent_row: None,
277            is_end: false,
278            n_out_row: 0,
279        }
280    }
281
282    pub fn extend(&mut self, batch: &RecordBatch) -> Result<()> {
283        self.record_batch =
284            concat_batches(&self.record_batch.schema(), [&self.record_batch, batch])?;
285        Ok(())
286    }
287
288    pub fn set_most_recent_row(&mut self, batch: RecordBatch) {
289        // It is enough for the batch to contain only a single row (the rest
290        // are not necessary).
291        self.most_recent_row = Some(batch);
292    }
293}
294
295/// This structure encapsulates all the state information we require as we scan
296/// ranges of data while processing RANGE frames.
297/// Attribute `sort_options` stores the column ordering specified by the ORDER
298/// BY clause. This information is used to calculate the range.
299#[derive(Debug, Default, Clone)]
300pub struct WindowFrameStateRange {
301    sort_options: Vec<SortOptions>,
302}
303
304impl WindowFrameStateRange {
305    /// Create a new object to store the search state.
306    fn new(sort_options: Vec<SortOptions>) -> Self {
307        Self { sort_options }
308    }
309
310    /// This function calculates beginning/ending indices for the frame of the current row.
311    // Argument `last_range` stores the resulting indices from the previous search. Since the indices only
312    // advance forward, we start from `last_range` subsequently. Thus, the overall
313    // time complexity of linear search amortizes to O(n) where n denotes the total
314    // row count.
315    fn calculate_range(
316        &mut self,
317        window_frame: &Arc<WindowFrame>,
318        last_range: &Range<usize>,
319        range_columns: &[ArrayRef],
320        length: usize,
321        idx: usize,
322    ) -> Result<Range<usize>> {
323        let start = match window_frame.start_bound {
324            WindowFrameBound::Preceding(ref n) => {
325                if n.is_null() {
326                    // UNBOUNDED PRECEDING
327                    0
328                } else {
329                    self.calculate_index_of_row::<true, true>(
330                        range_columns,
331                        last_range,
332                        idx,
333                        Some(n),
334                        length,
335                    )?
336                }
337            }
338            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
339                range_columns,
340                last_range,
341                idx,
342                None,
343                length,
344            )?,
345            WindowFrameBound::Following(ref n) => self
346                .calculate_index_of_row::<true, false>(
347                    range_columns,
348                    last_range,
349                    idx,
350                    Some(n),
351                    length,
352                )?,
353        };
354        let end = match window_frame.end_bound {
355            WindowFrameBound::Preceding(ref n) => self
356                .calculate_index_of_row::<false, true>(
357                    range_columns,
358                    last_range,
359                    idx,
360                    Some(n),
361                    length,
362                )?,
363            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
364                range_columns,
365                last_range,
366                idx,
367                None,
368                length,
369            )?,
370            WindowFrameBound::Following(ref n) => {
371                if n.is_null() {
372                    // UNBOUNDED FOLLOWING
373                    length
374                } else {
375                    self.calculate_index_of_row::<false, false>(
376                        range_columns,
377                        last_range,
378                        idx,
379                        Some(n),
380                        length,
381                    )?
382                }
383            }
384        };
385        Ok(Range { start, end })
386    }
387
388    /// This function does the heavy lifting when finding range boundaries. It is meant to be
389    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
390    /// supplied as true and false, respectively).
391    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
392        &mut self,
393        range_columns: &[ArrayRef],
394        last_range: &Range<usize>,
395        idx: usize,
396        delta: Option<&ScalarValue>,
397        length: usize,
398    ) -> Result<usize> {
399        let current_row_values = get_row_at_idx(range_columns, idx)?;
400        let end_range = if let Some(delta) = delta {
401            let is_descending: bool = self
402                .sort_options
403                .first()
404                .ok_or_else(|| {
405                    internal_datafusion_err!(
406                        "Sort options unexpectedly absent in a window frame"
407                    )
408                })?
409                .descending;
410
411            current_row_values
412                .iter()
413                .map(|value| {
414                    if value.is_null() {
415                        return Ok(value.clone());
416                    }
417                    if SEARCH_SIDE == is_descending {
418                        // TODO: Handle positive overflows.
419                        value.add(delta)
420                    } else if value.is_unsigned() && value < delta {
421                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
422                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
423                        //       change the following statement to use that.
424                        value.sub(value)
425                    } else {
426                        // TODO: Handle negative overflows.
427                        value.sub(delta)
428                    }
429                })
430                .collect::<Result<Vec<ScalarValue>>>()?
431        } else {
432            current_row_values
433        };
434        let search_start = if SIDE {
435            last_range.start
436        } else {
437            last_range.end
438        };
439        let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
440            let cmp = compare_rows(current, target, &self.sort_options)?;
441            Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
442        };
443        search_in_slice(range_columns, &end_range, compare_fn, search_start, length)
444    }
445}
446
447// In GROUPS mode, rows with duplicate sorting values are grouped together.
448// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
449// The syntax is as follows:
450//     GROUPS frame_start [ frame_exclusion ]
451//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
452// The optional frame_exclusion specifier is not yet supported.
453// The frame_start and frame_end parameters allow us to specify which rows the window
454// frame starts and ends with. They accept the following values:
455//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
456//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
457//                        that comes "offset" groups before the current group (i.e. the group
458//                        containing the current row). When used in frame_end, it refers to the
459//                        last row of the group that comes "offset" groups before the current group.
460//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
461//                   the current row. When used in frame_end, it refers to the last row of the group
462//                   containing the current row.
463//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
464//                        that comes "offset" groups after the current group (i.e. the group
465//                        containing the current row). When used in frame_end, it refers to the
466//                        last row of the group that comes "offset" groups after the current group.
467//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
468
469/// This structure encapsulates all the state information we require as we
470/// scan groups of data while processing window frames.
471#[derive(Debug, Default, Clone)]
472pub struct WindowFrameStateGroups {
473    /// A tuple containing group values and the row index where the group ends.
474    /// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
475    ///          [([1, 1], 2), ([2, 1], 4), ...].
476    pub group_end_indices: VecDeque<(Vec<ScalarValue>, usize)>,
477    /// The group index to which the row index belongs.
478    pub current_group_idx: usize,
479}
480
481impl WindowFrameStateGroups {
482    fn calculate_range(
483        &mut self,
484        window_frame: &Arc<WindowFrame>,
485        range_columns: &[ArrayRef],
486        length: usize,
487        idx: usize,
488    ) -> Result<Range<usize>> {
489        let start = match window_frame.start_bound {
490            WindowFrameBound::Preceding(ref n) => {
491                if n.is_null() {
492                    // UNBOUNDED PRECEDING
493                    0
494                } else {
495                    self.calculate_index_of_row::<true, true>(
496                        range_columns,
497                        idx,
498                        Some(n),
499                        length,
500                    )?
501                }
502            }
503            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<true, true>(
504                range_columns,
505                idx,
506                None,
507                length,
508            )?,
509            WindowFrameBound::Following(ref n) => self
510                .calculate_index_of_row::<true, false>(
511                    range_columns,
512                    idx,
513                    Some(n),
514                    length,
515                )?,
516        };
517        let end = match window_frame.end_bound {
518            WindowFrameBound::Preceding(ref n) => self
519                .calculate_index_of_row::<false, true>(
520                    range_columns,
521                    idx,
522                    Some(n),
523                    length,
524                )?,
525            WindowFrameBound::CurrentRow => self.calculate_index_of_row::<false, false>(
526                range_columns,
527                idx,
528                None,
529                length,
530            )?,
531            WindowFrameBound::Following(ref n) => {
532                if n.is_null() {
533                    // UNBOUNDED FOLLOWING
534                    length
535                } else {
536                    self.calculate_index_of_row::<false, false>(
537                        range_columns,
538                        idx,
539                        Some(n),
540                        length,
541                    )?
542                }
543            }
544        };
545        Ok(Range { start, end })
546    }
547
548    /// This function does the heavy lifting when finding range boundaries. It is meant to be
549    /// called twice, in succession, to get window frame start and end indices (with `SIDE`
550    /// supplied as true and false, respectively). Generic argument `SEARCH_SIDE` determines
551    /// the sign of `delta` (where true/false represents negative/positive respectively).
552    fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
553        &mut self,
554        range_columns: &[ArrayRef],
555        idx: usize,
556        delta: Option<&ScalarValue>,
557        length: usize,
558    ) -> Result<usize> {
559        let delta = if let Some(delta) = delta {
560            if let ScalarValue::UInt64(Some(value)) = delta {
561                *value as usize
562            } else {
563                return internal_err!(
564                    "Unexpectedly got a non-UInt64 value in a GROUPS mode window frame"
565                );
566            }
567        } else {
568            0
569        };
570        let mut group_start = 0;
571        let last_group = self.group_end_indices.back_mut();
572        if let Some((group_row, group_end)) = last_group {
573            if *group_end < length {
574                let new_group_row = get_row_at_idx(range_columns, *group_end)?;
575                // If last/current group keys are the same, we extend the last group:
576                if new_group_row.eq(group_row) {
577                    // Update the end boundary of the group (search right boundary):
578                    *group_end = search_in_slice(
579                        range_columns,
580                        group_row,
581                        check_equality,
582                        *group_end,
583                        length,
584                    )?;
585                }
586            }
587            // Start searching from the last group boundary:
588            group_start = *group_end;
589        }
590
591        // Advance groups until `idx` is inside a group:
592        while idx >= group_start {
593            let group_row = get_row_at_idx(range_columns, group_start)?;
594            // Find end boundary of the group (search right boundary):
595            let group_end = search_in_slice(
596                range_columns,
597                &group_row,
598                check_equality,
599                group_start,
600                length,
601            )?;
602            self.group_end_indices.push_back((group_row, group_end));
603            group_start = group_end;
604        }
605
606        // Update the group index `idx` belongs to:
607        while self.current_group_idx < self.group_end_indices.len()
608            && idx >= self.group_end_indices[self.current_group_idx].1
609        {
610            self.current_group_idx += 1;
611        }
612
613        // Find the group index of the frame boundary:
614        let group_idx = if SEARCH_SIDE {
615            self.current_group_idx.saturating_sub(delta)
616        } else {
617            self.current_group_idx + delta
618        };
619
620        // Extend `group_start_indices` until it includes at least `group_idx`:
621        while self.group_end_indices.len() <= group_idx && group_start < length {
622            let group_row = get_row_at_idx(range_columns, group_start)?;
623            // Find end boundary of the group (search right boundary):
624            let group_end = search_in_slice(
625                range_columns,
626                &group_row,
627                check_equality,
628                group_start,
629                length,
630            )?;
631            self.group_end_indices.push_back((group_row, group_end));
632            group_start = group_end;
633        }
634
635        // Calculate index of the group boundary:
636        Ok(match (SIDE, SEARCH_SIDE) {
637            // Window frame start:
638            (true, _) => {
639                let group_idx = std::cmp::min(group_idx, self.group_end_indices.len());
640                if group_idx > 0 {
641                    // Normally, start at the boundary of the previous group.
642                    self.group_end_indices[group_idx - 1].1
643                } else {
644                    // If previous group is out of the table, start at zero.
645                    0
646                }
647            }
648            // Window frame end, PRECEDING n
649            (false, true) => {
650                if self.current_group_idx >= delta {
651                    let group_idx = self.current_group_idx - delta;
652                    self.group_end_indices[group_idx].1
653                } else {
654                    // Group is out of the table, therefore end at zero.
655                    0
656                }
657            }
658            // Window frame end, FOLLOWING n
659            (false, false) => {
660                let group_idx = std::cmp::min(
661                    self.current_group_idx + delta,
662                    self.group_end_indices.len() - 1,
663                );
664                self.group_end_indices[group_idx].1
665            }
666        })
667    }
668}
669
670fn check_equality(current: &[ScalarValue], target: &[ScalarValue]) -> Result<bool> {
671    Ok(current == target)
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677
678    use arrow::array::Float64Array;
679
680    fn get_test_data() -> (Vec<ArrayRef>, Vec<SortOptions>) {
681        let range_columns: Vec<ArrayRef> = vec![Arc::new(Float64Array::from(vec![
682            5.0, 7.0, 8.0, 8.0, 9., 10., 10., 10., 11.,
683        ]))];
684        let sort_options = vec![SortOptions {
685            descending: false,
686            nulls_first: false,
687        }];
688
689        (range_columns, sort_options)
690    }
691
692    fn assert_group_ranges(
693        window_frame: &Arc<WindowFrame>,
694        expected_results: Vec<(Range<usize>, usize)>,
695    ) -> Result<()> {
696        let mut window_frame_groups = WindowFrameStateGroups::default();
697        let (range_columns, _) = get_test_data();
698        let n_row = range_columns[0].len();
699        for (idx, (expected_range, expected_group_idx)) in
700            expected_results.into_iter().enumerate()
701        {
702            let range = window_frame_groups.calculate_range(
703                window_frame,
704                &range_columns,
705                n_row,
706                idx,
707            )?;
708            assert_eq!(range, expected_range);
709            assert_eq!(window_frame_groups.current_group_idx, expected_group_idx);
710        }
711        Ok(())
712    }
713
714    fn assert_frame_ranges(
715        window_frame: &Arc<WindowFrame>,
716        expected_results: Vec<Range<usize>>,
717    ) -> Result<()> {
718        let mut window_frame_context =
719            WindowFrameContext::new(Arc::clone(window_frame), vec![]);
720        let (range_columns, _) = get_test_data();
721        let n_row = range_columns[0].len();
722        let mut last_range = Range { start: 0, end: 0 };
723        for (idx, expected_range) in expected_results.into_iter().enumerate() {
724            let range = window_frame_context.calculate_range(
725                &range_columns,
726                &last_range,
727                n_row,
728                idx,
729            )?;
730            assert_eq!(range, expected_range);
731            last_range = range;
732        }
733        Ok(())
734    }
735
736    #[test]
737    fn test_default_window_frame_group_boundaries() -> Result<()> {
738        let window_frame = Arc::new(WindowFrame::new(None));
739        assert_group_ranges(
740            &window_frame,
741            vec![
742                (Range { start: 0, end: 9 }, 0),
743                (Range { start: 0, end: 9 }, 0),
744                (Range { start: 0, end: 9 }, 0),
745                (Range { start: 0, end: 9 }, 0),
746                (Range { start: 0, end: 9 }, 0),
747                (Range { start: 0, end: 9 }, 0),
748                (Range { start: 0, end: 9 }, 0),
749                (Range { start: 0, end: 9 }, 0),
750                (Range { start: 0, end: 9 }, 0),
751            ],
752        )?;
753
754        assert_frame_ranges(
755            &window_frame,
756            vec![
757                Range { start: 0, end: 9 },
758                Range { start: 0, end: 9 },
759                Range { start: 0, end: 9 },
760                Range { start: 0, end: 9 },
761                Range { start: 0, end: 9 },
762                Range { start: 0, end: 9 },
763                Range { start: 0, end: 9 },
764                Range { start: 0, end: 9 },
765                Range { start: 0, end: 9 },
766            ],
767        )?;
768
769        Ok(())
770    }
771
772    #[test]
773    fn test_unordered_window_frame_group_boundaries() -> Result<()> {
774        let window_frame = Arc::new(WindowFrame::new(Some(false)));
775        assert_group_ranges(
776            &window_frame,
777            vec![
778                (Range { start: 0, end: 1 }, 0),
779                (Range { start: 0, end: 2 }, 1),
780                (Range { start: 0, end: 4 }, 2),
781                (Range { start: 0, end: 4 }, 2),
782                (Range { start: 0, end: 5 }, 3),
783                (Range { start: 0, end: 8 }, 4),
784                (Range { start: 0, end: 8 }, 4),
785                (Range { start: 0, end: 8 }, 4),
786                (Range { start: 0, end: 9 }, 5),
787            ],
788        )?;
789
790        assert_frame_ranges(
791            &window_frame,
792            vec![
793                Range { start: 0, end: 9 },
794                Range { start: 0, end: 9 },
795                Range { start: 0, end: 9 },
796                Range { start: 0, end: 9 },
797                Range { start: 0, end: 9 },
798                Range { start: 0, end: 9 },
799                Range { start: 0, end: 9 },
800                Range { start: 0, end: 9 },
801                Range { start: 0, end: 9 },
802            ],
803        )?;
804
805        Ok(())
806    }
807
808    #[test]
809    fn test_ordered_window_frame_group_boundaries() -> Result<()> {
810        let window_frame = Arc::new(WindowFrame::new(Some(true)));
811        assert_group_ranges(
812            &window_frame,
813            vec![
814                (Range { start: 0, end: 1 }, 0),
815                (Range { start: 0, end: 2 }, 1),
816                (Range { start: 0, end: 4 }, 2),
817                (Range { start: 0, end: 4 }, 2),
818                (Range { start: 0, end: 5 }, 3),
819                (Range { start: 0, end: 8 }, 4),
820                (Range { start: 0, end: 8 }, 4),
821                (Range { start: 0, end: 8 }, 4),
822                (Range { start: 0, end: 9 }, 5),
823            ],
824        )?;
825
826        assert_frame_ranges(
827            &window_frame,
828            vec![
829                Range { start: 0, end: 1 },
830                Range { start: 0, end: 2 },
831                Range { start: 0, end: 3 },
832                Range { start: 0, end: 4 },
833                Range { start: 0, end: 5 },
834                Range { start: 0, end: 6 },
835                Range { start: 0, end: 7 },
836                Range { start: 0, end: 8 },
837                Range { start: 0, end: 9 },
838            ],
839        )?;
840
841        Ok(())
842    }
843
844    #[test]
845    fn test_window_frame_group_boundaries() -> Result<()> {
846        let window_frame = Arc::new(WindowFrame::new_bounds(
847            WindowFrameUnits::Groups,
848            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
849            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
850        ));
851        assert_group_ranges(
852            &window_frame,
853            vec![
854                (Range { start: 0, end: 2 }, 0),
855                (Range { start: 0, end: 4 }, 1),
856                (Range { start: 1, end: 5 }, 2),
857                (Range { start: 1, end: 5 }, 2),
858                (Range { start: 2, end: 8 }, 3),
859                (Range { start: 4, end: 9 }, 4),
860                (Range { start: 4, end: 9 }, 4),
861                (Range { start: 4, end: 9 }, 4),
862                (Range { start: 5, end: 9 }, 5),
863            ],
864        )
865    }
866
867    #[test]
868    fn test_window_frame_group_boundaries_both_following() -> Result<()> {
869        let window_frame = Arc::new(WindowFrame::new_bounds(
870            WindowFrameUnits::Groups,
871            WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
872            WindowFrameBound::Following(ScalarValue::UInt64(Some(2))),
873        ));
874        assert_group_ranges(
875            &window_frame,
876            vec![
877                (Range::<usize> { start: 1, end: 4 }, 0),
878                (Range::<usize> { start: 2, end: 5 }, 1),
879                (Range::<usize> { start: 4, end: 8 }, 2),
880                (Range::<usize> { start: 4, end: 8 }, 2),
881                (Range::<usize> { start: 5, end: 9 }, 3),
882                (Range::<usize> { start: 8, end: 9 }, 4),
883                (Range::<usize> { start: 8, end: 9 }, 4),
884                (Range::<usize> { start: 8, end: 9 }, 4),
885                (Range::<usize> { start: 9, end: 9 }, 5),
886            ],
887        )
888    }
889
890    #[test]
891    fn test_window_frame_group_boundaries_both_preceding() -> Result<()> {
892        let window_frame = Arc::new(WindowFrame::new_bounds(
893            WindowFrameUnits::Groups,
894            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2))),
895            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
896        ));
897        assert_group_ranges(
898            &window_frame,
899            vec![
900                (Range::<usize> { start: 0, end: 0 }, 0),
901                (Range::<usize> { start: 0, end: 1 }, 1),
902                (Range::<usize> { start: 0, end: 2 }, 2),
903                (Range::<usize> { start: 0, end: 2 }, 2),
904                (Range::<usize> { start: 1, end: 4 }, 3),
905                (Range::<usize> { start: 2, end: 5 }, 4),
906                (Range::<usize> { start: 2, end: 5 }, 4),
907                (Range::<usize> { start: 2, end: 5 }, 4),
908                (Range::<usize> { start: 4, end: 8 }, 5),
909            ],
910        )
911    }
912}