use cjc_repro::kahan::KahanAccumulatorF64;
use cjc_repro::kahan_sum_f64;
use std::collections::BTreeSet;
pub fn agg_sum_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| kahan_sum_f64(&data[start..end]))
.collect()
}
pub fn agg_mean_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
let n = end - start;
if n == 0 {
return f64::NAN;
}
kahan_sum_f64(&data[start..end]) / n as f64
})
.collect()
}
pub fn agg_count(segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| (end - start) as i64)
.collect()
}
pub fn agg_min_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
if start == end {
return f64::NAN;
}
data[start..end]
.iter()
.cloned()
.fold(f64::INFINITY, |a, b| if b.is_nan() || b < a { b } else { a })
})
.collect()
}
pub fn agg_max_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
if start == end {
return f64::NAN;
}
data[start..end]
.iter()
.cloned()
.fold(f64::NEG_INFINITY, |a, b| if b.is_nan() || b > a { b } else { a })
})
.collect()
}
pub fn agg_var_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| welford_variance(&data[start..end]))
.collect()
}
pub fn agg_sd_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
let var = welford_variance(&data[start..end]);
if var.is_nan() { f64::NAN } else { var.sqrt() }
})
.collect()
}
pub fn agg_median_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
let n = end - start;
if n == 0 {
return f64::NAN;
}
let mut buf: Vec<f64> = data[start..end].to_vec();
buf.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
if n % 2 == 1 {
buf[n / 2]
} else {
(buf[n / 2 - 1] + buf[n / 2]) / 2.0
}
})
.collect()
}
pub fn agg_quantile_f64(data: &[f64], p: f64, segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
let n = end - start;
if n == 0 {
return f64::NAN;
}
let mut buf: Vec<f64> = data[start..end].to_vec();
buf.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p = p.clamp(0.0, 1.0);
let idx = p * (n - 1) as f64;
let lo = idx.floor() as usize;
let hi = idx.ceil() as usize;
if lo == hi {
buf[lo]
} else {
let frac = idx - lo as f64;
buf[lo] * (1.0 - frac) + buf[hi] * frac
}
})
.collect()
}
pub fn agg_n_distinct_str(data: &[String], segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| {
let set: BTreeSet<&String> = data[start..end].iter().collect();
set.len() as i64
})
.collect()
}
pub fn agg_n_distinct_i64(data: &[i64], segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| {
let set: BTreeSet<&i64> = data[start..end].iter().collect();
set.len() as i64
})
.collect()
}
pub fn agg_first_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
if start == end { f64::NAN } else { data[start] }
})
.collect()
}
pub fn agg_last_f64(data: &[f64], segments: &[(usize, usize)]) -> Vec<f64> {
segments
.iter()
.map(|&(start, end)| {
if start == end { f64::NAN } else { data[end - 1] }
})
.collect()
}
pub fn agg_sum_i64(data: &[i64], segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| {
data[start..end]
.iter()
.fold(0i64, |acc, &x| acc.wrapping_add(x))
})
.collect()
}
pub fn agg_min_i64(data: &[i64], segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| {
if start == end {
i64::MAX
} else {
data[start..end].iter().cloned().min().unwrap()
}
})
.collect()
}
pub fn agg_max_i64(data: &[i64], segments: &[(usize, usize)]) -> Vec<i64> {
segments
.iter()
.map(|&(start, end)| {
if start == end {
i64::MIN
} else {
data[start..end].iter().cloned().max().unwrap()
}
})
.collect()
}
pub fn gather_agg_sum_f64(data: &[f64], groups: &[Vec<usize>]) -> Vec<f64> {
groups
.iter()
.map(|indices| {
let mut acc = KahanAccumulatorF64::new();
for &i in indices {
acc.add(data[i]);
}
acc.finalize()
})
.collect()
}
pub fn gather_agg_mean_f64(data: &[f64], groups: &[Vec<usize>]) -> Vec<f64> {
groups
.iter()
.map(|indices| {
if indices.is_empty() {
return f64::NAN;
}
let mut acc = KahanAccumulatorF64::new();
for &i in indices {
acc.add(data[i]);
}
acc.finalize() / indices.len() as f64
})
.collect()
}
pub fn gather_agg_var_f64(data: &[f64], groups: &[Vec<usize>]) -> Vec<f64> {
groups
.iter()
.map(|indices| {
let gathered: Vec<f64> = indices.iter().map(|&i| data[i]).collect();
welford_variance(&gathered)
})
.collect()
}
pub fn gather_agg_n_distinct_str(data: &[String], groups: &[Vec<usize>]) -> Vec<i64> {
groups
.iter()
.map(|indices| {
let set: BTreeSet<&String> = indices.iter().map(|&i| &data[i]).collect();
set.len() as i64
})
.collect()
}
fn welford_variance(values: &[f64]) -> f64 {
let n = values.len();
if n == 0 {
return f64::NAN;
}
if n == 1 {
return 0.0;
}
let mut mean = 0.0f64;
let mut m2 = 0.0f64;
for (i, &x) in values.iter().enumerate() {
let delta = x - mean;
mean += delta / (i + 1) as f64;
let delta2 = x - mean;
m2 += delta * delta2;
}
m2 / n as f64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_welford_known() {
let data = vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
let var = welford_variance(&data);
assert!((var - 4.0).abs() < 1e-12, "got {}", var);
}
#[test]
fn test_welford_empty() {
assert!(welford_variance(&[]).is_nan());
}
#[test]
fn test_welford_single() {
assert_eq!(welford_variance(&[42.0]), 0.0);
}
}