use arrow::array::ArrayRef;
use arrow::compute::kernels::sort::SortOptions;
use datafusion_common::utils::{
compare_rows, find_bisect_point, get_row_at_idx, search_in_slice,
};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
#[derive(Debug)]
pub enum WindowFrameContext<'a> {
Rows(&'a Arc<WindowFrame>),
Range {
window_frame: &'a Arc<WindowFrame>,
state: WindowFrameStateRange,
},
Groups {
window_frame: &'a Arc<WindowFrame>,
state: WindowFrameStateGroups,
},
}
impl<'a> WindowFrameContext<'a> {
pub fn new(window_frame: &'a Arc<WindowFrame>) -> Self {
match window_frame.units {
WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
WindowFrameUnits::Range => WindowFrameContext::Range {
window_frame,
state: WindowFrameStateRange::default(),
},
WindowFrameUnits::Groups => WindowFrameContext::Groups {
window_frame,
state: WindowFrameStateGroups::default(),
},
}
}
pub fn calculate_range(
&mut self,
range_columns: &[ArrayRef],
sort_options: &[SortOptions],
length: usize,
idx: usize,
last_range: &Range<usize>,
) -> Result<Range<usize>> {
match *self {
WindowFrameContext::Rows(window_frame) => {
Self::calculate_range_rows(window_frame, length, idx)
}
WindowFrameContext::Range {
window_frame,
ref mut state,
} => state.calculate_range(
window_frame,
range_columns,
sort_options,
length,
idx,
last_range,
),
WindowFrameContext::Groups {
window_frame,
ref mut state,
} => state.calculate_range(window_frame, range_columns, length, idx),
}
}
fn calculate_range_rows(
window_frame: &Arc<WindowFrame>,
length: usize,
idx: usize,
) -> Result<Range<usize>> {
let start = match window_frame.start_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
if idx >= n as usize {
idx - n as usize
} else {
0
}
}
WindowFrameBound::CurrentRow => idx,
WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
return Err(DataFusionError::Internal(format!(
"Frame start cannot be UNBOUNDED FOLLOWING '{window_frame:?}'"
)))
}
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
min(idx + n as usize, length)
}
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
}
};
let end = match window_frame.end_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
return Err(DataFusionError::Internal(format!(
"Frame end cannot be UNBOUNDED PRECEDING '{window_frame:?}'"
)))
}
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
if idx >= n as usize {
idx - n as usize + 1
} else {
0
}
}
WindowFrameBound::CurrentRow => idx + 1,
WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
min(idx + n as usize + 1, length)
}
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
}
};
Ok(Range { start, end })
}
}
#[derive(Debug, Default)]
pub struct WindowFrameStateRange {}
impl WindowFrameStateRange {
fn calculate_range(
&mut self,
window_frame: &Arc<WindowFrame>,
range_columns: &[ArrayRef],
sort_options: &[SortOptions],
length: usize,
idx: usize,
last_range: &Range<usize>,
) -> Result<Range<usize>> {
let start = match window_frame.start_bound {
WindowFrameBound::Preceding(ref n) => {
if n.is_null() {
0
} else {
self.calculate_index_of_row::<true, true>(
range_columns,
sort_options,
idx,
Some(n),
last_range,
length,
)?
}
}
WindowFrameBound::CurrentRow => {
if range_columns.is_empty() {
0
} else {
self.calculate_index_of_row::<true, true>(
range_columns,
sort_options,
idx,
None,
last_range,
length,
)?
}
}
WindowFrameBound::Following(ref n) => self
.calculate_index_of_row::<true, false>(
range_columns,
sort_options,
idx,
Some(n),
last_range,
length,
)?,
};
let end = match window_frame.end_bound {
WindowFrameBound::Preceding(ref n) => self
.calculate_index_of_row::<false, true>(
range_columns,
sort_options,
idx,
Some(n),
last_range,
length,
)?,
WindowFrameBound::CurrentRow => {
if range_columns.is_empty() {
length
} else {
self.calculate_index_of_row::<false, false>(
range_columns,
sort_options,
idx,
None,
last_range,
length,
)?
}
}
WindowFrameBound::Following(ref n) => {
if n.is_null() {
length
} else {
self.calculate_index_of_row::<false, false>(
range_columns,
sort_options,
idx,
Some(n),
last_range,
length,
)?
}
}
};
Ok(Range { start, end })
}
fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
&mut self,
range_columns: &[ArrayRef],
sort_options: &[SortOptions],
idx: usize,
delta: Option<&ScalarValue>,
last_range: &Range<usize>,
length: usize,
) -> Result<usize> {
let current_row_values = get_row_at_idx(range_columns, idx)?;
let end_range = if let Some(delta) = delta {
let is_descending: bool = sort_options
.first()
.ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
.descending;
current_row_values
.iter()
.map(|value| {
if value.is_null() {
return Ok(value.clone());
}
if SEARCH_SIDE == is_descending {
value.add(delta)
} else if value.is_unsigned() && value < delta {
value.sub(value)
} else {
value.sub(delta)
}
})
.collect::<Result<Vec<ScalarValue>>>()?
} else {
current_row_values
};
let search_start = if SIDE {
last_range.start
} else {
last_range.end
};
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, sort_options)?;
Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
};
search_in_slice(range_columns, &end_range, compare_fn, search_start, length)
}
}
#[derive(Debug, Default)]
pub struct WindowFrameStateGroups {
current_group_idx: u64,
group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
previous_row_values: Option<Vec<ScalarValue>>,
reached_end: bool,
window_frame_end_idx: u64,
window_frame_start_idx: u64,
}
impl WindowFrameStateGroups {
fn calculate_range(
&mut self,
window_frame: &Arc<WindowFrame>,
range_columns: &[ArrayRef],
length: usize,
idx: usize,
) -> Result<Range<usize>> {
if range_columns.is_empty() {
return Err(DataFusionError::Execution(
"GROUPS mode requires an ORDER BY clause".to_string(),
));
}
let start = match window_frame.start_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
.calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
range_columns,
idx,
0,
length,
)?,
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
.calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
return Err(DataFusionError::Internal(format!(
"Frame start cannot be UNBOUNDED FOLLOWING '{window_frame:?}'"
)))
}
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
return Err(DataFusionError::Internal(
"Groups should be Uint".to_string(),
))
}
};
let end = match window_frame.end_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
return Err(DataFusionError::Internal(format!(
"Frame end cannot be UNBOUNDED PRECEDING '{window_frame:?}'"
)))
}
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
.calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
WindowFrameBound::CurrentRow => self
.calculate_index_of_group::<false, false>(
range_columns,
idx,
0,
length,
)?,
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
.calculate_index_of_group::<false, false>(
range_columns,
idx,
n,
length,
)?,
WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
return Err(DataFusionError::Internal(
"Groups should be Uint".to_string(),
))
}
};
Ok(Range { start, end })
}
fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
&mut self,
range_columns: &[ArrayRef],
idx: usize,
delta: u64,
length: usize,
) -> Result<usize> {
let current_row_values = range_columns
.iter()
.map(|col| ScalarValue::try_from_array(col, idx))
.collect::<Result<Vec<ScalarValue>>>()?;
if BISECT_SIDE {
if !self.initialized() {
self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
}
} else if !self.reached_end {
self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
}
let group_change = match &self.previous_row_values {
None => false,
Some(values) => ¤t_row_values != values,
};
if self.previous_row_values.is_none() || group_change {
self.previous_row_values = Some(current_row_values);
}
if group_change {
self.current_group_idx += 1;
self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
self.shift_one_group::<SEARCH_SIDE>(delta);
}
Ok(if self.group_start_indices.is_empty() {
if self.reached_end {
length
} else {
0
}
} else if BISECT_SIDE {
match self.group_start_indices.get(0) {
Some(&(_, idx)) => idx,
None => 0,
}
} else {
match (self.reached_end, self.group_start_indices.back()) {
(false, Some(&(_, idx))) => idx,
_ => length,
}
})
}
fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
&mut self,
range_columns: &[ArrayRef],
delta: u64,
) -> Result<()> {
let current_window_frame_end_idx = if !SEARCH_SIDE {
self.current_group_idx + delta + 1
} else if self.current_group_idx >= delta {
self.current_group_idx - delta + 1
} else {
0
};
if current_window_frame_end_idx == 0 {
return Ok(());
}
if self.group_start_indices.is_empty() {
self.initialize_window_frame_start(range_columns)?;
}
while !self.reached_end
&& self.window_frame_end_idx <= current_window_frame_end_idx
{
self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
}
Ok(())
}
fn initialize<const SEARCH_SIDE: bool>(
&mut self,
delta: u64,
range_columns: &[ArrayRef],
) -> Result<()> {
if !SEARCH_SIDE {
self.window_frame_start_idx = self.current_group_idx + delta;
self.initialize_window_frame_start(range_columns)
} else if self.current_group_idx >= delta {
self.window_frame_start_idx = self.current_group_idx - delta;
self.initialize_window_frame_start(range_columns)
} else {
Ok(())
}
}
fn initialize_window_frame_start(
&mut self,
range_columns: &[ArrayRef],
) -> Result<()> {
let mut group_values = range_columns
.iter()
.map(|col| ScalarValue::try_from_array(col, 0))
.collect::<Result<Vec<ScalarValue>>>()?;
let mut start_idx: usize = 0;
for _ in 0..self.window_frame_start_idx {
let next_group_and_start_index =
WindowFrameStateGroups::find_next_group_and_start_index(
range_columns,
&group_values,
start_idx,
)?;
if let Some(entry) = next_group_and_start_index {
(group_values, start_idx) = entry;
} else {
self.window_frame_end_idx = self.window_frame_start_idx;
self.reached_end = true;
return Ok(());
}
}
self.group_start_indices
.push_back((group_values, start_idx));
self.window_frame_end_idx = self.window_frame_start_idx + 1;
Ok(())
}
fn initialized(&self) -> bool {
self.reached_end || !self.group_start_indices.is_empty()
}
fn advance_one_group<const SEARCH_SIDE: bool>(
&mut self,
range_columns: &[ArrayRef],
) -> Result<()> {
let last_group_values = self.group_start_indices.back();
let last_group_values = if let Some(values) = last_group_values {
values
} else {
return Ok(());
};
let next_group_and_start_index =
WindowFrameStateGroups::find_next_group_and_start_index(
range_columns,
&last_group_values.0,
last_group_values.1,
)?;
if let Some(entry) = next_group_and_start_index {
self.group_start_indices.push_back(entry);
self.window_frame_end_idx += 1;
} else {
self.reached_end = true;
}
Ok(())
}
fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
let current_window_frame_start_idx = if !SEARCH_SIDE {
self.current_group_idx + delta
} else if self.current_group_idx >= delta {
self.current_group_idx - delta
} else {
0
};
if current_window_frame_start_idx > self.window_frame_start_idx {
self.group_start_indices.pop_front();
self.window_frame_start_idx += 1;
}
}
fn find_next_group_and_start_index(
range_columns: &[ArrayRef],
current_row_values: &[ScalarValue],
idx: usize,
) -> Result<Option<(Vec<ScalarValue>, usize)>> {
let mut step_size: usize = 1;
let data_size: usize = range_columns
.get(0)
.ok_or_else(|| {
DataFusionError::Internal("Column array shouldn't be empty".to_string())
})?
.len();
let mut low = idx;
let mut high = idx + step_size;
while high < data_size {
let val = range_columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, high))
.collect::<Result<Vec<ScalarValue>>>()?;
if val == current_row_values {
low = high;
step_size *= 2;
high += step_size;
} else {
break;
}
}
low = find_bisect_point(
range_columns,
current_row_values,
|current, to_compare| Ok(current == to_compare),
low,
min(high, data_size),
)?;
if low == data_size {
return Ok(None);
}
let val = range_columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, low))
.collect::<Result<Vec<ScalarValue>>>()?;
Ok(Some((val, low)))
}
}
#[cfg(test)]
mod tests {
use arrow::array::Float64Array;
use datafusion_common::ScalarValue;
use std::sync::Arc;
use crate::from_slice::FromSlice;
use super::*;
struct TestData {
arrays: Vec<ArrayRef>,
group_indices: [usize; 6],
num_groups: usize,
num_rows: usize,
next_group_indices: [usize; 5],
}
fn test_data() -> TestData {
let num_groups: usize = 5;
let num_rows: usize = 6;
let group_indices = [0, 1, 2, 2, 4, 5];
let next_group_indices = [1, 2, 4, 4, 5];
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 9., 10.])),
Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 3., 4.0, 5.0])),
Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 10., 11.0])),
Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 8., 5., 0.0])),
];
TestData {
arrays,
group_indices,
num_groups,
num_rows,
next_group_indices,
}
}
#[test]
fn test_find_next_group_and_start_index() {
let test_data = test_data();
for (current_idx, next_idx) in test_data.next_group_indices.iter().enumerate() {
let current_row_values = test_data
.arrays
.iter()
.map(|col| ScalarValue::try_from_array(col, current_idx))
.collect::<Result<Vec<ScalarValue>>>()
.unwrap();
let next_row_values = test_data
.arrays
.iter()
.map(|col| ScalarValue::try_from_array(col, *next_idx))
.collect::<Result<Vec<ScalarValue>>>()
.unwrap();
let res = WindowFrameStateGroups::find_next_group_and_start_index(
&test_data.arrays,
¤t_row_values,
current_idx,
)
.unwrap();
assert_eq!(res, Some((next_row_values, *next_idx)));
}
let current_idx = test_data.num_rows - 1;
let current_row_values = test_data
.arrays
.iter()
.map(|col| ScalarValue::try_from_array(col, current_idx))
.collect::<Result<Vec<ScalarValue>>>()
.unwrap();
let res = WindowFrameStateGroups::find_next_group_and_start_index(
&test_data.arrays,
¤t_row_values,
current_idx,
)
.unwrap();
assert_eq!(res, None);
}
#[test]
fn test_window_frame_groups_preceding_delta_greater_than_partition_size() {
const START: bool = true;
const END: bool = false;
const PRECEDING: bool = true;
const DELTA: u64 = 10;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<PRECEDING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 0);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
window_frame_groups
.extend_window_frame_if_necessary::<PRECEDING>(&test_data.arrays, DELTA)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 0);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
for idx in 0..test_data.num_rows {
let start = window_frame_groups
.calculate_index_of_group::<START, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, 0);
let end = window_frame_groups
.calculate_index_of_group::<END, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(end, 0);
}
}
#[test]
fn test_window_frame_groups_following_delta_greater_than_partition_size() {
const START: bool = true;
const END: bool = false;
const FOLLOWING: bool = false;
const DELTA: u64 = 10;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<FOLLOWING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
assert!(window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
window_frame_groups
.extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
assert!(window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
for idx in 0..test_data.num_rows {
let start = window_frame_groups
.calculate_index_of_group::<START, FOLLOWING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, test_data.num_rows);
let end = window_frame_groups
.calculate_index_of_group::<END, FOLLOWING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(end, test_data.num_rows);
}
}
#[test]
fn test_window_frame_groups_preceding_and_following_delta_greater_than_partition_size(
) {
const START: bool = true;
const END: bool = false;
const FOLLOWING: bool = false;
const PRECEDING: bool = true;
const DELTA: u64 = 10;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<PRECEDING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 0);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
window_frame_groups
.extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(
window_frame_groups.window_frame_end_idx,
test_data.num_groups as u64
);
assert!(window_frame_groups.reached_end);
assert_eq!(
window_frame_groups.group_start_indices.len(),
test_data.num_groups
);
for idx in 0..test_data.num_rows {
let start = window_frame_groups
.calculate_index_of_group::<START, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, 0);
let end = window_frame_groups
.calculate_index_of_group::<END, FOLLOWING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(end, test_data.num_rows);
}
}
#[test]
fn test_window_frame_groups_preceding_and_following_1() {
const START: bool = true;
const END: bool = false;
const FOLLOWING: bool = false;
const PRECEDING: bool = true;
const DELTA: u64 = 1;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<PRECEDING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 0);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
window_frame_groups
.extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 2 * DELTA + 1);
assert!(!window_frame_groups.reached_end);
assert_eq!(
window_frame_groups.group_start_indices.len(),
2 * DELTA as usize + 1
);
for idx in 0..test_data.num_rows {
let start_idx = if idx < DELTA as usize {
0
} else {
test_data.group_indices[idx] - DELTA as usize
};
let start = window_frame_groups
.calculate_index_of_group::<START, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, test_data.group_indices[start_idx]);
let mut end_idx = if idx >= test_data.num_groups {
test_data.num_rows
} else {
test_data.next_group_indices[idx]
};
for _ in 0..DELTA {
end_idx = if end_idx >= test_data.num_groups {
test_data.num_rows
} else {
test_data.next_group_indices[end_idx]
};
}
let end = window_frame_groups
.calculate_index_of_group::<END, FOLLOWING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(end, end_idx);
}
}
#[test]
fn test_window_frame_groups_preceding_1_and_unbounded_following() {
const START: bool = true;
const PRECEDING: bool = true;
const DELTA: u64 = 1;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<PRECEDING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 0);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 0);
for idx in 0..test_data.num_rows {
let start_idx = if idx < DELTA as usize {
0
} else {
test_data.group_indices[idx] - DELTA as usize
};
let start = window_frame_groups
.calculate_index_of_group::<START, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, test_data.group_indices[start_idx]);
}
}
#[test]
fn test_window_frame_groups_current_and_unbounded_following() {
const START: bool = true;
const PRECEDING: bool = true;
const DELTA: u64 = 0;
let test_data = test_data();
let mut window_frame_groups = WindowFrameStateGroups::default();
window_frame_groups
.initialize::<PRECEDING>(DELTA, &test_data.arrays)
.unwrap();
assert_eq!(window_frame_groups.window_frame_start_idx, 0);
assert_eq!(window_frame_groups.window_frame_end_idx, 1);
assert!(!window_frame_groups.reached_end);
assert_eq!(window_frame_groups.group_start_indices.len(), 1);
for idx in 0..test_data.num_rows {
let start = window_frame_groups
.calculate_index_of_group::<START, PRECEDING>(
&test_data.arrays,
idx,
DELTA,
test_data.num_rows,
)
.unwrap();
assert_eq!(start, test_data.group_indices[idx]);
}
}
}