use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct StreamingNumericStats {
count: u64,
mean: f64,
m2: f64,
min: f64,
max: f64,
sum: f64,
zero_count: u64,
negative_count: u64,
benford_counts: [u64; 9],
reservoir: Vec<f64>,
reservoir_capacity: usize,
total_seen: u64,
}
impl StreamingNumericStats {
pub fn new(reservoir_capacity: usize) -> Self {
Self {
count: 0,
mean: 0.0,
m2: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
sum: 0.0,
zero_count: 0,
negative_count: 0,
benford_counts: [0; 9],
reservoir: Vec::with_capacity(reservoir_capacity),
reservoir_capacity,
total_seen: 0,
}
}
pub fn add(&mut self, value: f64) {
self.count += 1;
self.total_seen += 1;
self.sum += value;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value;
}
let delta = value - self.mean;
self.mean += delta / self.count as f64;
let delta2 = value - self.mean;
self.m2 += delta * delta2;
if value == 0.0 {
self.zero_count += 1;
}
if value < 0.0 {
self.negative_count += 1;
}
let abs_value = value.abs();
if abs_value > 0.0 {
let s = format!("{abs_value:.15}");
for c in s.chars() {
if c.is_ascii_digit() && c != '0' {
if let Some(digit) = c.to_digit(10) {
if (1..=9).contains(&digit) {
self.benford_counts[(digit - 1) as usize] += 1;
}
}
break;
}
}
}
if self.reservoir.len() < self.reservoir_capacity {
self.reservoir.push(value);
} else {
let j = rand::random::<u64>() % self.total_seen;
if j < self.reservoir_capacity as u64 {
self.reservoir[j as usize] = value;
}
}
}
pub fn add_batch(&mut self, values: &[f64]) {
for &value in values {
self.add(value);
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn mean(&self) -> f64 {
self.mean
}
pub fn variance(&self) -> f64 {
if self.count < 2 {
0.0
} else {
self.m2 / self.count as f64
}
}
pub fn std_dev(&self) -> f64 {
self.variance().sqrt()
}
pub fn min(&self) -> f64 {
if self.min.is_infinite() {
0.0
} else {
self.min
}
}
pub fn max(&self) -> f64 {
if self.max.is_infinite() {
0.0
} else {
self.max
}
}
pub fn zero_rate(&self) -> f64 {
if self.count == 0 {
0.0
} else {
self.zero_count as f64 / self.count as f64
}
}
pub fn negative_rate(&self) -> f64 {
if self.count == 0 {
0.0
} else {
self.negative_count as f64 / self.count as f64
}
}
pub fn benford_distribution(&self) -> [f64; 9] {
let total: u64 = self.benford_counts.iter().sum();
if total == 0 {
return [0.0; 9];
}
let mut dist = [0.0; 9];
for i in 0..9 {
dist[i] = self.benford_counts[i] as f64 / total as f64;
}
dist
}
pub fn percentiles(&self) -> crate::models::Percentiles {
let mut sorted = self.reservoir.clone();
sorted.sort_by(f64::total_cmp);
fn percentile(sorted: &[f64], p: f64) -> f64 {
if sorted.is_empty() {
return 0.0;
}
let idx = (p / 100.0 * (sorted.len() - 1) as f64).round() as usize;
sorted[idx.min(sorted.len() - 1)]
}
crate::models::Percentiles {
p1: percentile(&sorted, 1.0),
p5: percentile(&sorted, 5.0),
p10: percentile(&sorted, 10.0),
p25: percentile(&sorted, 25.0),
p50: percentile(&sorted, 50.0),
p75: percentile(&sorted, 75.0),
p90: percentile(&sorted, 90.0),
p95: percentile(&sorted, 95.0),
p99: percentile(&sorted, 99.0),
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingCategoricalStats {
count: u64,
frequencies: HashMap<String, u64>,
max_categories: usize,
other_count: u64,
}
impl StreamingCategoricalStats {
pub fn new(max_categories: usize) -> Self {
Self {
count: 0,
frequencies: HashMap::new(),
max_categories,
other_count: 0,
}
}
pub fn add(&mut self, value: String) {
if value.is_empty() {
return;
}
self.count += 1;
if let Some(count) = self.frequencies.get_mut(&value) {
*count += 1;
} else if self.frequencies.len() < self.max_categories {
self.frequencies.insert(value, 1);
} else {
self.other_count += 1;
if self.other_count > self.max_categories as u64 {
self.prune_low_frequency();
}
}
}
fn prune_low_frequency(&mut self) {
let threshold = self.other_count / self.max_categories as u64;
self.frequencies.retain(|_, &mut count| count > threshold);
self.other_count = 0;
}
pub fn count(&self) -> u64 {
self.count
}
pub fn cardinality(&self) -> u64 {
self.frequencies.len() as u64
}
pub fn top_values(&self, limit: usize) -> Vec<(String, u64)> {
let mut values: Vec<_> = self.frequencies.iter().collect();
values.sort_by(|a, b| b.1.cmp(a.1));
values
.into_iter()
.take(limit)
.map(|(k, v)| (k.clone(), *v))
.collect()
}
pub fn entropy(&self) -> f64 {
let total = self.count as f64;
if total == 0.0 {
return 0.0;
}
let mut entropy = 0.0;
for &count in self.frequencies.values() {
if count > 0 {
let p = count as f64 / total;
entropy -= p * p.ln();
}
}
entropy
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_streaming_numeric_stats() {
let mut stats = StreamingNumericStats::new(1000);
for i in 1..=100 {
stats.add(i as f64);
}
assert_eq!(stats.count(), 100);
assert!((stats.mean() - 50.5).abs() < 0.001);
assert_eq!(stats.min(), 1.0);
assert_eq!(stats.max(), 100.0);
}
#[test]
fn test_streaming_categorical_stats() {
let mut stats = StreamingCategoricalStats::new(100);
for _ in 0..50 {
stats.add("A".to_string());
}
for _ in 0..30 {
stats.add("B".to_string());
}
for _ in 0..20 {
stats.add("C".to_string());
}
assert_eq!(stats.count(), 100);
assert_eq!(stats.cardinality(), 3);
let top = stats.top_values(3);
assert_eq!(top[0].0, "A");
assert_eq!(top[0].1, 50);
}
}