scirs2_core/
streaming_stats.rs1#[derive(Debug, Clone)]
48pub struct StreamingStats {
49 count: u64,
50 mean: f64,
51 m2: f64,
53 min: f64,
54 max: f64,
55}
56
57impl Default for StreamingStats {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl StreamingStats {
64 pub fn new() -> Self {
66 Self {
67 count: 0,
68 mean: 0.0,
69 m2: 0.0,
70 min: f64::INFINITY,
71 max: f64::NEG_INFINITY,
72 }
73 }
74
75 pub fn update(&mut self, value: f64) {
79 if value.is_nan() {
80 return;
81 }
82 self.count += 1;
83 let delta = value - self.mean;
84 self.mean += delta / self.count as f64;
85 let delta2 = value - self.mean;
86 self.m2 += delta * delta2;
87
88 if value < self.min {
89 self.min = value;
90 }
91 if value > self.max {
92 self.max = value;
93 }
94 }
95
96 pub fn update_batch(&mut self, values: &[f64]) {
99 for &v in values {
100 self.update(v);
101 }
102 }
103
104 #[inline]
108 pub fn count(&self) -> u64 {
109 self.count
110 }
111
112 #[inline]
116 pub fn mean(&self) -> f64 {
117 self.mean
118 }
119
120 pub fn variance(&self) -> f64 {
124 if self.count <= 1 {
125 0.0
126 } else {
127 self.m2 / (self.count - 1) as f64
128 }
129 }
130
131 pub fn population_variance(&self) -> f64 {
135 if self.count == 0 {
136 0.0
137 } else {
138 self.m2 / self.count as f64
139 }
140 }
141
142 pub fn std_dev(&self) -> f64 {
146 self.variance().sqrt()
147 }
148
149 #[inline]
153 pub fn min(&self) -> f64 {
154 self.min
155 }
156
157 #[inline]
161 pub fn max(&self) -> f64 {
162 self.max
163 }
164
165 pub fn range(&self) -> f64 {
169 if self.count == 0 {
170 0.0
171 } else {
172 self.max - self.min
173 }
174 }
175
176 pub fn merge(self, other: Self) -> Self {
198 if self.count == 0 {
199 return other;
200 }
201 if other.count == 0 {
202 return self;
203 }
204
205 let combined_count = self.count + other.count;
206 let delta = other.mean - self.mean;
207 let combined_mean = (self.mean * self.count as f64 + other.mean * other.count as f64)
208 / combined_count as f64;
209 let combined_m2 = self.m2
211 + other.m2
212 + delta * delta * (self.count as f64 * other.count as f64) / combined_count as f64;
213
214 Self {
215 count: combined_count,
216 mean: combined_mean,
217 m2: combined_m2,
218 min: self.min.min(other.min),
219 max: self.max.max(other.max),
220 }
221 }
222
223 pub fn merge_all(stats: Vec<Self>) -> Self {
227 stats.into_iter().fold(Self::new(), |acc, s| acc.merge(s))
228 }
229}
230
231#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
242 fn test_empty_accumulator() {
243 let s = StreamingStats::new();
244 assert_eq!(s.count(), 0);
245 assert_eq!(s.mean(), 0.0);
246 assert_eq!(s.variance(), 0.0);
247 assert_eq!(s.std_dev(), 0.0);
248 assert_eq!(s.min(), f64::INFINITY);
249 assert_eq!(s.max(), f64::NEG_INFINITY);
250 }
251
252 #[test]
253 fn test_single_element() {
254 let mut s = StreamingStats::new();
255 s.update(7.0);
256 assert_eq!(s.count(), 1);
257 assert!((s.mean() - 7.0).abs() < 1e-12);
258 assert_eq!(s.variance(), 0.0); assert_eq!(s.min(), 7.0);
260 assert_eq!(s.max(), 7.0);
261 }
262
263 #[test]
264 fn test_two_elements_mean() {
265 let mut s = StreamingStats::new();
266 s.update(3.0);
267 s.update(7.0);
268 assert!((s.mean() - 5.0).abs() < 1e-12);
269 }
270
271 #[test]
272 fn test_known_variance() {
273 let mut s = StreamingStats::new();
275 for x in [1.0_f64, 2.0, 3.0, 4.0, 5.0] {
276 s.update(x);
277 }
278 assert_eq!(s.count(), 5);
279 assert!((s.mean() - 3.0).abs() < 1e-12);
280 assert!((s.variance() - 2.5).abs() < 1e-12);
281 assert_eq!(s.min(), 1.0);
282 assert_eq!(s.max(), 5.0);
283 }
284
285 #[test]
286 fn test_std_dev_matches_variance_sqrt() {
287 let mut s = StreamingStats::new();
288 for x in [2.0_f64, 4.0, 6.0, 8.0] {
289 s.update(x);
290 }
291 assert!((s.std_dev() - s.variance().sqrt()).abs() < 1e-12);
292 }
293
294 #[test]
295 fn test_update_batch_equals_sequential() {
296 let values = [1.1, 2.2, 3.3, 4.4, 5.5_f64];
297
298 let mut seq = StreamingStats::new();
299 for &v in &values {
300 seq.update(v);
301 }
302
303 let mut batch = StreamingStats::new();
304 batch.update_batch(&values);
305
306 assert_eq!(seq.count(), batch.count());
307 assert!((seq.mean() - batch.mean()).abs() < 1e-14);
308 assert!((seq.variance() - batch.variance()).abs() < 1e-14);
309 }
310
311 #[test]
312 fn test_min_max_tracking() {
313 let mut s = StreamingStats::new();
314 let values = [-5.0_f64, 3.0, 1.0, -1.0, 4.0, 0.0];
315 s.update_batch(&values);
316 assert_eq!(s.min(), -5.0);
317 assert_eq!(s.max(), 4.0);
318 }
319
320 #[test]
321 fn test_range() {
322 let mut s = StreamingStats::new();
323 s.update_batch(&[10.0, 20.0, 30.0]);
324 assert!((s.range() - 20.0).abs() < 1e-12);
325 }
326
327 #[test]
328 fn test_nan_ignored() {
329 let mut s = StreamingStats::new();
330 s.update(1.0);
331 s.update(f64::NAN);
332 s.update(3.0);
333 assert_eq!(s.count(), 2);
334 assert!((s.mean() - 2.0).abs() < 1e-12);
335 }
336
337 #[test]
340 fn test_merge_two_halves_equals_full() {
341 let values: Vec<f64> = (1..=10).map(|x| x as f64).collect();
342
343 let mut full = StreamingStats::new();
344 full.update_batch(&values);
345
346 let mut a = StreamingStats::new();
347 a.update_batch(&values[..5]);
348 let mut b = StreamingStats::new();
349 b.update_batch(&values[5..]);
350 let merged = a.merge(b);
351
352 assert_eq!(merged.count(), full.count());
353 assert!((merged.mean() - full.mean()).abs() < 1e-12);
354 assert!((merged.variance() - full.variance()).abs() < 1e-12);
355 assert_eq!(merged.min(), full.min());
356 assert_eq!(merged.max(), full.max());
357 }
358
359 #[test]
360 fn test_merge_with_empty_left() {
361 let empty = StreamingStats::new();
362 let mut s = StreamingStats::new();
363 s.update_batch(&[1.0, 2.0, 3.0]);
364 let merged = empty.merge(s.clone());
365 assert_eq!(merged.count(), s.count());
366 assert!((merged.mean() - s.mean()).abs() < 1e-12);
367 }
368
369 #[test]
370 fn test_merge_with_empty_right() {
371 let mut s = StreamingStats::new();
372 s.update_batch(&[1.0, 2.0, 3.0]);
373 let empty = StreamingStats::new();
374 let merged = s.clone().merge(empty);
375 assert_eq!(merged.count(), s.count());
376 assert!((merged.mean() - s.mean()).abs() < 1e-12);
377 }
378
379 #[test]
380 fn test_merge_all_empty_slice() {
381 let merged = StreamingStats::merge_all(vec![]);
382 assert_eq!(merged.count(), 0);
383 }
384
385 #[test]
386 fn test_merge_all_matches_sequential() {
387 let partitions: Vec<Vec<f64>> = vec![
388 vec![1.0, 2.0, 3.0],
389 vec![4.0, 5.0],
390 vec![6.0, 7.0, 8.0, 9.0],
391 ];
392
393 let all_values: Vec<f64> = partitions.iter().flatten().copied().collect();
394 let mut full = StreamingStats::new();
395 full.update_batch(&all_values);
396
397 let parts: Vec<StreamingStats> = partitions
398 .iter()
399 .map(|p| {
400 let mut s = StreamingStats::new();
401 s.update_batch(p);
402 s
403 })
404 .collect();
405
406 let merged = StreamingStats::merge_all(parts);
407 assert_eq!(merged.count(), full.count());
408 assert!((merged.mean() - full.mean()).abs() < 1e-12);
409 assert!((merged.variance() - full.variance()).abs() < 1e-10);
410 }
411
412 #[test]
415 fn test_large_offset_stability() {
416 let base = 1_000_000_000.0_f64;
419 let deltas = [0.1, 0.2, 0.3, 0.4, 0.5_f64];
420 let mut s = StreamingStats::new();
421 for &d in &deltas {
422 s.update(base + d);
423 }
424 let expected_mean = base + 0.3;
425 assert!((s.mean() - expected_mean).abs() < 1e-6);
426 }
427
428 #[test]
429 fn test_population_variance() {
430 let mut s = StreamingStats::new();
432 for x in [1.0_f64, 2.0, 3.0, 4.0, 5.0] {
433 s.update(x);
434 }
435 assert!((s.population_variance() - 2.0).abs() < 1e-12);
436 }
437}