use std::sync::Arc;
use oxigdal_query::executor::filter::Value;
use oxigdal_query::executor::scan::{ColumnData, DataType, Field, RecordBatch, Schema};
use oxigdal_query::executor::window::{
OrderKey, WindowFunction, WindowSpec, evaluate_window, evaluate_window_batch,
};
fn table(columns: Vec<Vec<Value>>) -> (usize, impl Fn(usize, usize) -> Value) {
let num_rows = columns.first().map(|c| c.len()).unwrap_or(0);
(num_rows, move |row: usize, col: usize| {
columns[col][row].clone()
})
}
#[test]
fn test_row_number_sequential_within_partition() {
let (rows, value_at) = table(vec![vec![
Value::Int64(50),
Value::Int64(10),
Value::Int64(30),
Value::Int64(20),
Value::Int64(40),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, value_at)
.expect("row_number should evaluate");
assert_eq!(out[0], Value::Int64(5));
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[2], Value::Int64(3));
assert_eq!(out[3], Value::Int64(2));
assert_eq!(out[4], Value::Int64(4));
}
#[test]
fn test_row_number_resets_per_partition() {
let (rows, value_at) = table(vec![
vec![
Value::String("A".to_string()),
Value::String("B".to_string()),
Value::String("A".to_string()),
Value::String("B".to_string()),
Value::String("A".to_string()),
],
vec![
Value::Int64(30),
Value::Int64(10),
Value::Int64(10),
Value::Int64(20),
Value::Int64(20),
],
]);
let spec = WindowSpec::new(vec![0], vec![OrderKey::asc(1)]);
let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 1, value_at)
.expect("row_number should evaluate");
assert_eq!(out[2], Value::Int64(1));
assert_eq!(out[4], Value::Int64(2));
assert_eq!(out[0], Value::Int64(3));
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[3], Value::Int64(2));
}
#[test]
fn test_rank_ties_share_rank_with_gap() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out =
evaluate_window(&WindowFunction::Rank, &spec, rows, 0, value_at).expect("rank should eval");
assert_eq!(out[0], Value::Int64(1));
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[2], Value::Int64(3));
assert_eq!(out[3], Value::Int64(4));
}
#[test]
fn test_dense_rank_ties_no_gap() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out = evaluate_window(&WindowFunction::DenseRank, &spec, rows, 0, value_at)
.expect("dense_rank should eval");
assert_eq!(out[0], Value::Int64(1));
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[2], Value::Int64(2));
assert_eq!(out[3], Value::Int64(3));
}
#[test]
fn test_lag_returns_previous_row_value() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out =
evaluate_window(&WindowFunction::lag(), &spec, rows, 0, value_at).expect("lag should eval");
assert_eq!(out[0], Value::Null);
assert_eq!(out[1], Value::Int64(10));
assert_eq!(out[2], Value::Int64(20));
}
#[test]
fn test_lag_first_row_returns_default() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let func = WindowFunction::lag_offset_default(1, Value::Int64(-1));
let out = evaluate_window(&func, &spec, rows, 0, value_at).expect("lag should eval");
assert_eq!(out[0], Value::Int64(-1));
assert_eq!(out[1], Value::Int64(10));
assert_eq!(out[2], Value::Int64(20));
}
#[test]
fn test_lead_returns_next_row_value() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out = evaluate_window(&WindowFunction::lead(), &spec, rows, 0, value_at)
.expect("lead should eval");
assert_eq!(out[0], Value::Int64(20));
assert_eq!(out[1], Value::Int64(30));
assert_eq!(out[2], Value::Null);
}
#[test]
fn test_lead_last_row_returns_null_default() {
let (rows, value_at) = table(vec![vec![Value::Int64(7), Value::Int64(9)]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out = evaluate_window(&WindowFunction::lead(), &spec, rows, 0, value_at)
.expect("lead should eval");
assert_eq!(out[0], Value::Int64(9));
assert_eq!(out[1], Value::Null);
}
#[test]
fn test_lag_offset_2() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(20),
Value::Int64(30),
Value::Int64(40),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let func = WindowFunction::lag_offset(2);
let out = evaluate_window(&func, &spec, rows, 0, value_at).expect("lag should eval");
assert_eq!(out[0], Value::Null);
assert_eq!(out[1], Value::Null);
assert_eq!(out[2], Value::Int64(10));
assert_eq!(out[3], Value::Int64(20));
}
#[test]
fn test_partition_by_two_keys() {
let (rows, value_at) = table(vec![
vec![
Value::String("A".to_string()),
Value::String("A".to_string()),
Value::String("A".to_string()),
Value::String("B".to_string()),
Value::String("A".to_string()),
],
vec![
Value::Int64(1),
Value::Int64(1),
Value::Int64(2),
Value::Int64(1),
Value::Int64(1),
],
vec![
Value::Int64(5),
Value::Int64(3),
Value::Int64(9),
Value::Int64(8),
Value::Int64(4),
],
]);
let spec = WindowSpec::new(vec![0, 1], vec![OrderKey::asc(2)]);
let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 2, value_at)
.expect("row_number should eval");
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[4], Value::Int64(2));
assert_eq!(out[0], Value::Int64(3));
assert_eq!(out[2], Value::Int64(1));
assert_eq!(out[3], Value::Int64(1));
}
#[test]
fn test_order_by_desc() {
let (rows, value_at) = table(vec![vec![
Value::Int64(10),
Value::Int64(30),
Value::Int64(20),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::desc(0)]);
let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, value_at)
.expect("row_number should eval");
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[2], Value::Int64(2));
assert_eq!(out[0], Value::Int64(3));
}
#[test]
fn test_empty_input_returns_empty() {
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let value_at = |_row: usize, _col: usize| Value::Null;
let out = evaluate_window(&WindowFunction::RowNumber, &spec, 0, 0, value_at)
.expect("empty input should eval");
assert!(out.is_empty());
let out_rank =
evaluate_window(&WindowFunction::Rank, &spec, 0, 0, value_at).expect("empty rank");
assert!(out_rank.is_empty());
}
#[test]
fn test_first_last_nth_value() {
let (rows, value_at) = table(vec![vec![
Value::Int64(30),
Value::Int64(10),
Value::Int64(20),
]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let first = evaluate_window(&WindowFunction::FirstValue, &spec, rows, 0, &value_at)
.expect("first_value should eval");
assert_eq!(first[0], Value::Int64(10));
assert_eq!(first[1], Value::Int64(10));
assert_eq!(first[2], Value::Int64(10));
let last = evaluate_window(&WindowFunction::LastValue, &spec, rows, 0, &value_at)
.expect("last_value should eval");
assert_eq!(last[0], Value::Int64(30));
assert_eq!(last[1], Value::Int64(30));
assert_eq!(last[2], Value::Int64(30));
let nth = evaluate_window(&WindowFunction::nth_value(2), &spec, rows, 0, &value_at)
.expect("nth_value should eval");
assert_eq!(nth[0], Value::Int64(20));
assert_eq!(nth[1], Value::Int64(20));
assert_eq!(nth[2], Value::Int64(20));
let nth_oob = evaluate_window(&WindowFunction::nth_value(99), &spec, rows, 0, &value_at)
.expect("nth_value oob should eval");
assert_eq!(nth_oob[0], Value::Null);
}
#[test]
fn test_rank_no_order_by_all_tie() {
let (rows, value_at) = table(vec![vec![
Value::Int64(7),
Value::Int64(7),
Value::Int64(7),
]]);
let spec = WindowSpec::default();
let rank =
evaluate_window(&WindowFunction::Rank, &spec, rows, 0, &value_at).expect("rank eval");
assert_eq!(
rank,
vec![Value::Int64(1), Value::Int64(1), Value::Int64(1)]
);
let dense =
evaluate_window(&WindowFunction::DenseRank, &spec, rows, 0, &value_at).expect("dense eval");
assert_eq!(
dense,
vec![Value::Int64(1), Value::Int64(1), Value::Int64(1)]
);
let rn = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, &value_at)
.expect("row_number eval");
assert_eq!(rn, vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)]);
}
#[test]
fn test_evaluate_window_batch_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("grp".to_string(), DataType::String, true),
Field::new("val".to_string(), DataType::Int64, true),
]));
let columns = vec![
ColumnData::String(vec![
Some("A".to_string()),
Some("A".to_string()),
Some("B".to_string()),
]),
ColumnData::Int64(vec![Some(20), Some(10), Some(5)]),
];
let batch = RecordBatch::new(schema, columns, 3).expect("batch should build");
let spec = WindowSpec::new(vec![0], vec![OrderKey::asc(1)]);
let out = evaluate_window_batch(&WindowFunction::RowNumber, &spec, 1, &batch)
.expect("batch window should eval");
assert_eq!(out[1], Value::Int64(1));
assert_eq!(out[0], Value::Int64(2));
assert_eq!(out[2], Value::Int64(1));
}
#[test]
fn test_evaluate_window_batch_out_of_bounds_column() {
let schema = Arc::new(Schema::new(vec![Field::new(
"val".to_string(),
DataType::Int64,
true,
)]));
let columns = vec![ColumnData::Int64(vec![Some(1), Some(2)])];
let batch = RecordBatch::new(schema, columns, 2).expect("batch should build");
let spec = WindowSpec::ordered(vec![OrderKey::asc(5)]);
let result = evaluate_window_batch(&WindowFunction::RowNumber, &spec, 0, &batch);
assert!(result.is_err());
}
#[test]
fn test_null_ordering_places_nulls_last() {
let (rows, value_at) = table(vec![vec![Value::Int64(20), Value::Null, Value::Int64(10)]]);
let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, value_at)
.expect("row_number should eval");
assert_eq!(out[2], Value::Int64(1));
assert_eq!(out[0], Value::Int64(2));
assert_eq!(out[1], Value::Int64(3));
}