Skip to main content

scirs2_core/
streaming_stats.rs

1//! Online / streaming statistics via Welford's numerically stable algorithm.
2//!
3//! [`StreamingStats`] accumulates count, mean, variance (Bessel-corrected),
4//! minimum, and maximum in a single pass over the data without storing the
5//! full dataset.  Two independent accumulators can be merged in O(1) to
6//! support parallel computation over disjoint partitions.
7//!
8//! This complements the generic [`crate::numeric::stability::WelfordVariance`]
9//! by:
10//!
11//! * fixing the element type to `f64` for ergonomic use in pipelines,
12//! * tracking `min` / `max` alongside the central moments, and
13//! * providing a `merge` method suitable for map-reduce patterns.
14//!
15//! # Example
16//!
17//! ```rust
18//! use scirs2_core::streaming_stats::StreamingStats;
19//!
20//! let mut stats = StreamingStats::new();
21//! for x in [1.0_f64, 2.0, 3.0, 4.0, 5.0] {
22//!     stats.update(x);
23//! }
24//! assert!((stats.mean() - 3.0).abs() < 1e-12);
25//! assert!((stats.variance() - 2.5).abs() < 1e-12); // sample variance
26//! assert_eq!(stats.min(), 1.0);
27//! assert_eq!(stats.max(), 5.0);
28//! ```
29
30// ──────────────────────────────────────────────────────────────────────────────
31// StreamingStats
32// ──────────────────────────────────────────────────────────────────────────────
33
34/// Online statistics accumulator based on Welford's one-pass algorithm.
35///
36/// All operations are O(1) per data point (or O(n) for a batch update).
37/// After processing all data the accumulator exposes:
38///
39/// * `count()` – number of samples seen
40/// * `mean()` – arithmetic mean
41/// * `variance()` – Bessel-corrected sample variance
42/// * `std_dev()` – sample standard deviation
43/// * `min()` / `max()` – extremes
44///
45/// The `merge` method combines two accumulators in O(1) using Chan's
46/// parallel formula, enabling map-reduce over partitioned data.
47#[derive(Debug, Clone)]
48pub struct StreamingStats {
49    count: u64,
50    mean: f64,
51    /// Running sum of squared deviations from the current mean (M₂).
52    m2: f64,
53    min: f64,
54    max: f64,
55}
56
57impl Default for StreamingStats {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl StreamingStats {
64    /// Create a new, empty accumulator.
65    pub fn new() -> Self {
66        Self {
67            count: 0,
68            mean: 0.0,
69            m2: 0.0,
70            min: f64::INFINITY,
71            max: f64::NEG_INFINITY,
72        }
73    }
74
75    /// Incorporate one more observation using Welford's update rule.
76    ///
77    /// `NaN` values are silently ignored.
78    pub fn update(&mut self, value: f64) {
79        if value.is_nan() {
80            return;
81        }
82        self.count += 1;
83        let delta = value - self.mean;
84        self.mean += delta / self.count as f64;
85        let delta2 = value - self.mean;
86        self.m2 += delta * delta2;
87
88        if value < self.min {
89            self.min = value;
90        }
91        if value > self.max {
92            self.max = value;
93        }
94    }
95
96    /// Process a batch of values, equivalent to calling [`update`](Self::update)
97    /// for each element but potentially more cache-friendly.
98    pub fn update_batch(&mut self, values: &[f64]) {
99        for &v in values {
100            self.update(v);
101        }
102    }
103
104    // ── Accessors ────────────────────────────────────────────────────────────
105
106    /// Number of non-NaN observations accumulated so far.
107    #[inline]
108    pub fn count(&self) -> u64 {
109        self.count
110    }
111
112    /// Arithmetic mean of all accumulated observations.
113    ///
114    /// Returns `0.0` when `count == 0`.
115    #[inline]
116    pub fn mean(&self) -> f64 {
117        self.mean
118    }
119
120    /// Bessel-corrected sample variance (divides by `count - 1`).
121    ///
122    /// Returns `0.0` when `count <= 1`.
123    pub fn variance(&self) -> f64 {
124        if self.count <= 1 {
125            0.0
126        } else {
127            self.m2 / (self.count - 1) as f64
128        }
129    }
130
131    /// Population variance (divides by `count`).
132    ///
133    /// Returns `0.0` when `count == 0`.
134    pub fn population_variance(&self) -> f64 {
135        if self.count == 0 {
136            0.0
137        } else {
138            self.m2 / self.count as f64
139        }
140    }
141
142    /// Sample standard deviation (square root of the sample variance).
143    ///
144    /// Returns `0.0` when `count <= 1`.
145    pub fn std_dev(&self) -> f64 {
146        self.variance().sqrt()
147    }
148
149    /// Minimum value seen so far.
150    ///
151    /// Returns `f64::INFINITY` when `count == 0`.
152    #[inline]
153    pub fn min(&self) -> f64 {
154        self.min
155    }
156
157    /// Maximum value seen so far.
158    ///
159    /// Returns `f64::NEG_INFINITY` when `count == 0`.
160    #[inline]
161    pub fn max(&self) -> f64 {
162        self.max
163    }
164
165    /// Range (max − min).
166    ///
167    /// Returns `0.0` when fewer than two distinct values have been seen.
168    pub fn range(&self) -> f64 {
169        if self.count == 0 {
170            0.0
171        } else {
172            self.max - self.min
173        }
174    }
175
176    // ── Parallel merge ────────────────────────────────────────────────────────
177
178    /// Merge two accumulators into one using Chan's parallel update formula.
179    ///
180    /// The result is mathematically equivalent to having accumulated all
181    /// observations from both sources into a single accumulator.
182    ///
183    /// This enables map-reduce patterns:
184    ///
185    /// ```rust
186    /// use scirs2_core::streaming_stats::StreamingStats;
187    ///
188    /// let mut a = StreamingStats::new();
189    /// a.update_batch(&[1.0, 2.0, 3.0]);
190    ///
191    /// let mut b = StreamingStats::new();
192    /// b.update_batch(&[4.0, 5.0, 6.0]);
193    ///
194    /// let combined = a.merge(b);
195    /// assert!((combined.mean() - 3.5).abs() < 1e-12);
196    /// ```
197    pub fn merge(self, other: Self) -> Self {
198        if self.count == 0 {
199            return other;
200        }
201        if other.count == 0 {
202            return self;
203        }
204
205        let combined_count = self.count + other.count;
206        let delta = other.mean - self.mean;
207        let combined_mean = (self.mean * self.count as f64 + other.mean * other.count as f64)
208            / combined_count as f64;
209        // Chan's parallel variance update.
210        let combined_m2 = self.m2
211            + other.m2
212            + delta * delta * (self.count as f64 * other.count as f64) / combined_count as f64;
213
214        Self {
215            count: combined_count,
216            mean: combined_mean,
217            m2: combined_m2,
218            min: self.min.min(other.min),
219            max: self.max.max(other.max),
220        }
221    }
222
223    /// Convenience: merge a slice of accumulators into one.
224    ///
225    /// Returns `StreamingStats::new()` when the slice is empty.
226    pub fn merge_all(stats: Vec<Self>) -> Self {
227        stats.into_iter().fold(Self::new(), |acc, s| acc.merge(s))
228    }
229}
230
231// ──────────────────────────────────────────────────────────────────────────────
232// Tests
233// ──────────────────────────────────────────────────────────────────────────────
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    // ── Basic update ─────────────────────────────────────────────────────────
240
241    #[test]
242    fn test_empty_accumulator() {
243        let s = StreamingStats::new();
244        assert_eq!(s.count(), 0);
245        assert_eq!(s.mean(), 0.0);
246        assert_eq!(s.variance(), 0.0);
247        assert_eq!(s.std_dev(), 0.0);
248        assert_eq!(s.min(), f64::INFINITY);
249        assert_eq!(s.max(), f64::NEG_INFINITY);
250    }
251
252    #[test]
253    fn test_single_element() {
254        let mut s = StreamingStats::new();
255        s.update(7.0);
256        assert_eq!(s.count(), 1);
257        assert!((s.mean() - 7.0).abs() < 1e-12);
258        assert_eq!(s.variance(), 0.0); // Bessel: n-1 = 0 → 0
259        assert_eq!(s.min(), 7.0);
260        assert_eq!(s.max(), 7.0);
261    }
262
263    #[test]
264    fn test_two_elements_mean() {
265        let mut s = StreamingStats::new();
266        s.update(3.0);
267        s.update(7.0);
268        assert!((s.mean() - 5.0).abs() < 1e-12);
269    }
270
271    #[test]
272    fn test_known_variance() {
273        // [1, 2, 3, 4, 5]  sample var = 2.5
274        let mut s = StreamingStats::new();
275        for x in [1.0_f64, 2.0, 3.0, 4.0, 5.0] {
276            s.update(x);
277        }
278        assert_eq!(s.count(), 5);
279        assert!((s.mean() - 3.0).abs() < 1e-12);
280        assert!((s.variance() - 2.5).abs() < 1e-12);
281        assert_eq!(s.min(), 1.0);
282        assert_eq!(s.max(), 5.0);
283    }
284
285    #[test]
286    fn test_std_dev_matches_variance_sqrt() {
287        let mut s = StreamingStats::new();
288        for x in [2.0_f64, 4.0, 6.0, 8.0] {
289            s.update(x);
290        }
291        assert!((s.std_dev() - s.variance().sqrt()).abs() < 1e-12);
292    }
293
294    #[test]
295    fn test_update_batch_equals_sequential() {
296        let values = [1.1, 2.2, 3.3, 4.4, 5.5_f64];
297
298        let mut seq = StreamingStats::new();
299        for &v in &values {
300            seq.update(v);
301        }
302
303        let mut batch = StreamingStats::new();
304        batch.update_batch(&values);
305
306        assert_eq!(seq.count(), batch.count());
307        assert!((seq.mean() - batch.mean()).abs() < 1e-14);
308        assert!((seq.variance() - batch.variance()).abs() < 1e-14);
309    }
310
311    #[test]
312    fn test_min_max_tracking() {
313        let mut s = StreamingStats::new();
314        let values = [-5.0_f64, 3.0, 1.0, -1.0, 4.0, 0.0];
315        s.update_batch(&values);
316        assert_eq!(s.min(), -5.0);
317        assert_eq!(s.max(), 4.0);
318    }
319
320    #[test]
321    fn test_range() {
322        let mut s = StreamingStats::new();
323        s.update_batch(&[10.0, 20.0, 30.0]);
324        assert!((s.range() - 20.0).abs() < 1e-12);
325    }
326
327    #[test]
328    fn test_nan_ignored() {
329        let mut s = StreamingStats::new();
330        s.update(1.0);
331        s.update(f64::NAN);
332        s.update(3.0);
333        assert_eq!(s.count(), 2);
334        assert!((s.mean() - 2.0).abs() < 1e-12);
335    }
336
337    // ── Merge ─────────────────────────────────────────────────────────────────
338
339    #[test]
340    fn test_merge_two_halves_equals_full() {
341        let values: Vec<f64> = (1..=10).map(|x| x as f64).collect();
342
343        let mut full = StreamingStats::new();
344        full.update_batch(&values);
345
346        let mut a = StreamingStats::new();
347        a.update_batch(&values[..5]);
348        let mut b = StreamingStats::new();
349        b.update_batch(&values[5..]);
350        let merged = a.merge(b);
351
352        assert_eq!(merged.count(), full.count());
353        assert!((merged.mean() - full.mean()).abs() < 1e-12);
354        assert!((merged.variance() - full.variance()).abs() < 1e-12);
355        assert_eq!(merged.min(), full.min());
356        assert_eq!(merged.max(), full.max());
357    }
358
359    #[test]
360    fn test_merge_with_empty_left() {
361        let empty = StreamingStats::new();
362        let mut s = StreamingStats::new();
363        s.update_batch(&[1.0, 2.0, 3.0]);
364        let merged = empty.merge(s.clone());
365        assert_eq!(merged.count(), s.count());
366        assert!((merged.mean() - s.mean()).abs() < 1e-12);
367    }
368
369    #[test]
370    fn test_merge_with_empty_right() {
371        let mut s = StreamingStats::new();
372        s.update_batch(&[1.0, 2.0, 3.0]);
373        let empty = StreamingStats::new();
374        let merged = s.clone().merge(empty);
375        assert_eq!(merged.count(), s.count());
376        assert!((merged.mean() - s.mean()).abs() < 1e-12);
377    }
378
379    #[test]
380    fn test_merge_all_empty_slice() {
381        let merged = StreamingStats::merge_all(vec![]);
382        assert_eq!(merged.count(), 0);
383    }
384
385    #[test]
386    fn test_merge_all_matches_sequential() {
387        let partitions: Vec<Vec<f64>> = vec![
388            vec![1.0, 2.0, 3.0],
389            vec![4.0, 5.0],
390            vec![6.0, 7.0, 8.0, 9.0],
391        ];
392
393        let all_values: Vec<f64> = partitions.iter().flatten().copied().collect();
394        let mut full = StreamingStats::new();
395        full.update_batch(&all_values);
396
397        let parts: Vec<StreamingStats> = partitions
398            .iter()
399            .map(|p| {
400                let mut s = StreamingStats::new();
401                s.update_batch(p);
402                s
403            })
404            .collect();
405
406        let merged = StreamingStats::merge_all(parts);
407        assert_eq!(merged.count(), full.count());
408        assert!((merged.mean() - full.mean()).abs() < 1e-12);
409        assert!((merged.variance() - full.variance()).abs() < 1e-10);
410    }
411
412    // ── Numerical stability ───────────────────────────────────────────────────
413
414    #[test]
415    fn test_large_offset_stability() {
416        // Welford's algorithm should remain numerically stable when all values
417        // are very large and nearly equal.
418        let base = 1_000_000_000.0_f64;
419        let deltas = [0.1, 0.2, 0.3, 0.4, 0.5_f64];
420        let mut s = StreamingStats::new();
421        for &d in &deltas {
422            s.update(base + d);
423        }
424        let expected_mean = base + 0.3;
425        assert!((s.mean() - expected_mean).abs() < 1e-6);
426    }
427
428    #[test]
429    fn test_population_variance() {
430        // [1, 2, 3, 4, 5]  population var = 2.0
431        let mut s = StreamingStats::new();
432        for x in [1.0_f64, 2.0, 3.0, 4.0, 5.0] {
433            s.update(x);
434        }
435        assert!((s.population_variance() - 2.0).abs() < 1e-12);
436    }
437}