use super::helpers::{as_f64, get_field, order_keys_equal, set_window_col};
use super::spec::WindowFuncSpec;
pub(super) fn running_aggregate(
rows: &mut [(String, serde_json::Value)],
indices: &[usize],
spec: &WindowFuncSpec,
field: &str,
) {
let len = indices.len();
if len == 0 {
return;
}
let mut running_sum = 0.0f64;
let mut running_count = 0u64;
let mut running_min: Option<f64> = None;
let mut running_max: Option<f64> = None;
let mut peer_start = 0usize;
for pos in 0..len {
let i = indices[pos];
let val = get_field(&rows[i].1, field);
if let Some(n) = as_f64(&val) {
running_sum += n;
running_count += 1;
running_min = Some(running_min.map_or(n, |m: f64| m.min(n)));
running_max = Some(running_max.map_or(n, |m: f64| m.max(n)));
} else if spec.func_name == "count" {
running_count += 1;
}
let is_last_in_group =
pos + 1 == len || !order_keys_equal(rows, i, indices[pos + 1], &spec.order_by);
if is_last_in_group {
let result = match spec.func_name.as_str() {
"sum" => serde_json::json!(running_sum),
"count" => serde_json::json!(running_count),
"avg" => {
if running_count > 0 {
serde_json::json!(running_sum / running_count as f64)
} else {
serde_json::Value::Null
}
}
"min" => running_min
.map(|m| serde_json::json!(m))
.unwrap_or(serde_json::Value::Null),
"max" => running_max
.map(|m| serde_json::json!(m))
.unwrap_or(serde_json::Value::Null),
"first_value" => get_field(&rows[indices[0]].1, field),
"last_value" => get_field(&rows[indices[pos]].1, field),
_ => serde_json::Value::Null,
};
for &peer_idx in &indices[peer_start..=pos] {
set_window_col(&mut rows[peer_idx].1, &spec.alias, result.clone());
}
peer_start = pos + 1;
}
}
}