use crate::expr::SqlExpr;
use super::frame::{build_peer_groups, evaluate_frame_bounds};
use super::helpers::{as_f64, get_field, set_window_col};
use super::running::running_aggregate;
use super::spec::{FrameBound, WindowFuncSpec};
pub(super) fn apply_aggregate_window(
rows: &mut [(String, serde_json::Value)],
indices: &[usize],
spec: &WindowFuncSpec,
) {
let field = spec
.args
.first()
.and_then(|e| match e {
SqlExpr::Column(c) => Some(c.as_str()),
_ => None,
})
.unwrap_or("*");
let use_running = spec.frame.mode == "range"
&& matches!(spec.frame.start, FrameBound::UnboundedPreceding)
&& matches!(spec.frame.end, FrameBound::CurrentRow);
if use_running {
running_aggregate(rows, indices, spec, field);
return;
}
per_row_aggregate(rows, indices, spec, field);
}
fn per_row_aggregate(
rows: &mut [(String, serde_json::Value)],
indices: &[usize],
spec: &WindowFuncSpec,
field: &str,
) {
let len = indices.len();
if len == 0 {
return;
}
let order_col = spec.order_by.first().map(|(col, _)| col.as_str());
let order_values: Vec<serde_json::Value> = indices
.iter()
.map(|&i| {
order_col
.map(|col| get_field(&rows[i].1, col))
.unwrap_or(serde_json::Value::Null)
})
.collect();
let peer_groups: Vec<usize> = if spec.frame.mode == "groups" {
build_peer_groups(&order_values)
} else {
Vec::new()
};
let all_vals: Vec<Option<f64>> = indices
.iter()
.map(|&i| as_f64(&get_field(&rows[i].1, field)))
.collect();
let results: Vec<serde_json::Value> = (0..len)
.map(|pos| {
let (start_idx, end_idx) =
evaluate_frame_bounds(&spec.frame, pos, len, &order_values, &peer_groups);
aggregate_slice(&all_vals, indices, rows, field, spec, start_idx, end_idx)
})
.collect();
for (pos, result) in results.into_iter().enumerate() {
let row_idx = indices[pos];
set_window_col(&mut rows[row_idx].1, &spec.alias, result);
}
}
fn aggregate_slice(
all_vals: &[Option<f64>],
indices: &[usize],
rows: &[(String, serde_json::Value)],
field: &str,
spec: &WindowFuncSpec,
start_idx: usize,
end_idx: usize,
) -> serde_json::Value {
let slice_vals: Vec<f64> = all_vals[start_idx..=end_idx]
.iter()
.filter_map(|v| *v)
.collect();
let slice_count = end_idx - start_idx + 1;
match spec.func_name.as_str() {
"sum" => {
let rt = crate::simd_agg::ts_runtime();
serde_json::json!((rt.sum_f64)(&slice_vals))
}
"count" => serde_json::json!(slice_count),
"avg" => {
if slice_vals.is_empty() {
serde_json::Value::Null
} else {
let rt = crate::simd_agg::ts_runtime();
serde_json::json!((rt.sum_f64)(&slice_vals) / slice_vals.len() as f64)
}
}
"min" => {
if slice_vals.is_empty() {
serde_json::Value::Null
} else {
let rt = crate::simd_agg::ts_runtime();
serde_json::json!((rt.min_f64)(&slice_vals))
}
}
"max" => {
if slice_vals.is_empty() {
serde_json::Value::Null
} else {
let rt = crate::simd_agg::ts_runtime();
serde_json::json!((rt.max_f64)(&slice_vals))
}
}
"first_value" => indices
.get(start_idx)
.map(|&i| get_field(&rows[i].1, field))
.unwrap_or(serde_json::Value::Null),
"last_value" => indices
.get(end_idx)
.map(|&i| get_field(&rows[i].1, field))
.unwrap_or(serde_json::Value::Null),
_ => serde_json::Value::Null,
}
}
#[cfg(test)]
mod tests {
use super::super::spec::{FrameBound, WindowFrame, WindowFuncSpec};
use super::apply_aggregate_window;
use crate::expr::SqlExpr;
use serde_json::json;
fn numbered(n: usize) -> Vec<(String, serde_json::Value)> {
(1..=n)
.map(|i| (i.to_string(), json!({ "n": i as i64 })))
.collect()
}
fn make_spec(func: &str, field: &str, frame: WindowFrame) -> WindowFuncSpec {
WindowFuncSpec {
alias: "result".into(),
func_name: func.into(),
args: if field == "*" {
vec![]
} else {
vec![SqlExpr::Column(field.into())]
},
partition_by: vec![],
order_by: vec![("n".into(), true)],
frame,
}
}
fn rows_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
WindowFrame {
mode: "rows".into(),
start,
end,
}
}
fn range_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
WindowFrame {
mode: "range".into(),
start,
end,
}
}
fn groups_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
WindowFrame {
mode: "groups".into(),
start,
end,
}
}
#[test]
fn rows_1_preceding_1_following_sum() {
let mut rows = numbered(5);
let indices: Vec<usize> = (0..5).collect();
let spec = make_spec(
"sum",
"n",
rows_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
);
apply_aggregate_window(&mut rows, &indices, &spec);
assert_eq!(rows[0].1["result"], json!(3.0));
assert_eq!(rows[1].1["result"], json!(6.0));
assert_eq!(rows[2].1["result"], json!(9.0));
assert_eq!(rows[3].1["result"], json!(12.0));
assert_eq!(rows[4].1["result"], json!(9.0));
}
#[test]
fn rows_unbounded_preceding_current_sum() {
let mut rows = numbered(5);
let indices: Vec<usize> = (0..5).collect();
let spec = make_spec(
"sum",
"n",
rows_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
);
apply_aggregate_window(&mut rows, &indices, &spec);
assert_eq!(rows[0].1["result"], json!(1.0));
assert_eq!(rows[1].1["result"], json!(3.0));
assert_eq!(rows[2].1["result"], json!(6.0));
assert_eq!(rows[3].1["result"], json!(10.0));
assert_eq!(rows[4].1["result"], json!(15.0));
}
#[test]
fn rows_current_unbounded_following_sum() {
let mut rows = numbered(5);
let indices: Vec<usize> = (0..5).collect();
let spec = make_spec(
"sum",
"n",
rows_frame(FrameBound::CurrentRow, FrameBound::UnboundedFollowing),
);
apply_aggregate_window(&mut rows, &indices, &spec);
assert_eq!(rows[0].1["result"], json!(15.0));
assert_eq!(rows[1].1["result"], json!(14.0));
assert_eq!(rows[4].1["result"], json!(5.0));
}
#[test]
fn range_unbounded_preceding_current_row_with_ties() {
let mut rows = vec![
("a".into(), json!({"n": 1i64})),
("b".into(), json!({"n": 1i64})),
("c".into(), json!({"n": 2i64})),
("d".into(), json!({"n": 3i64})),
];
let indices: Vec<usize> = (0..4).collect();
let spec = make_spec(
"sum",
"n",
range_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
);
apply_aggregate_window(&mut rows, &indices, &spec);
assert_eq!(rows[0].1["result"], json!(2.0));
assert_eq!(rows[1].1["result"], json!(2.0));
assert_eq!(rows[2].1["result"], json!(4.0));
assert_eq!(rows[3].1["result"], json!(7.0));
}
#[test]
fn groups_1_preceding_1_following_sum() {
let mut rows = vec![
("a".into(), json!({"n": 1i64})),
("b".into(), json!({"n": 1i64})),
("c".into(), json!({"n": 2i64})),
("d".into(), json!({"n": 3i64})),
("e".into(), json!({"n": 3i64})),
];
let indices: Vec<usize> = (0..5).collect();
let spec = make_spec(
"sum",
"n",
groups_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
);
apply_aggregate_window(&mut rows, &indices, &spec);
assert_eq!(rows[0].1["result"], json!(4.0));
assert_eq!(rows[1].1["result"], json!(4.0));
assert_eq!(rows[2].1["result"], json!(10.0));
assert_eq!(rows[3].1["result"], json!(8.0));
assert_eq!(rows[4].1["result"], json!(8.0));
}
}