Skip to main content

flowstats/statistics/
moments.rs

1//! Running statistics (mean, variance, min, max)
2//!
3//! Computes streaming statistics using Welford's numerically stable online algorithm.
4//! Supports merging for distributed computation.
5
6use crate::math;
7use crate::traits::{MergeError, Sketch};
8
9/// Running statistics calculator using Welford's algorithm
10///
11/// Computes mean, variance, standard deviation, min, and max in a single pass
12/// with O(1) memory. Uses Welford's numerically stable algorithm to avoid
13/// catastrophic cancellation.
14///
15/// # Example
16///
17/// ```
18/// use flowstats::statistics::RunningStats;
19///
20/// let mut stats = RunningStats::new();
21///
22/// for value in [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
23///     stats.add(value);
24/// }
25///
26/// assert!((stats.mean() - 5.0).abs() < 0.001);
27/// assert!((stats.variance() - 4.0).abs() < 0.001);
28/// assert!((stats.stddev() - 2.0).abs() < 0.001);
29/// assert_eq!(stats.min(), Some(2.0));
30/// assert_eq!(stats.max(), Some(9.0));
31/// ```
32///
33/// # Distributed Usage
34///
35/// ```
36/// use flowstats::statistics::RunningStats;
37/// use flowstats::traits::Sketch;
38///
39/// let mut stats1 = RunningStats::new();
40/// let mut stats2 = RunningStats::new();
41///
42/// // Worker 1
43/// for v in [1.0, 2.0, 3.0] {
44///     stats1.add(v);
45/// }
46///
47/// // Worker 2
48/// for v in [4.0, 5.0, 6.0] {
49///     stats2.add(v);
50/// }
51///
52/// // Merge
53/// stats1.merge(&stats2).unwrap();
54/// assert!((stats1.mean() - 3.5).abs() < 0.001);
55/// ```
56#[derive(Clone, Debug)]
57pub struct RunningStats {
58    /// Number of values seen
59    count: u64,
60    /// Running mean
61    mean: f64,
62    /// Sum of squared differences from mean (M2 in Welford's algorithm)
63    m2: f64,
64    /// Minimum value
65    min: f64,
66    /// Maximum value
67    max: f64,
68}
69
70impl Default for RunningStats {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl RunningStats {
77    /// Create a new empty statistics accumulator
78    pub fn new() -> Self {
79        Self {
80            count: 0,
81            mean: 0.0,
82            m2: 0.0,
83            min: f64::INFINITY,
84            max: f64::NEG_INFINITY,
85        }
86    }
87
88    /// Add a value to the statistics
89    ///
90    /// Uses Welford's online algorithm for numerical stability.
91    /// NaN values are ignored to prevent poisoning the statistics.
92    pub fn add(&mut self, value: f64) {
93        // Ignore NaN to prevent poisoning statistics
94        if value.is_nan() {
95            return;
96        }
97
98        self.count += 1;
99
100        // Update min/max
101        if value < self.min {
102            self.min = value;
103        }
104        if value > self.max {
105            self.max = value;
106        }
107
108        // Welford's algorithm
109        let delta = value - self.mean;
110        self.mean += delta / self.count as f64;
111        let delta2 = value - self.mean;
112        self.m2 += delta * delta2;
113    }
114
115    /// Get the number of values
116    pub fn len(&self) -> u64 {
117        self.count
118    }
119
120    /// Check if empty
121    pub fn is_empty(&self) -> bool {
122        self.count == 0
123    }
124
125    /// Get the mean (average)
126    pub fn mean(&self) -> f64 {
127        if self.count == 0 {
128            0.0
129        } else {
130            self.mean
131        }
132    }
133
134    /// Get the population variance
135    ///
136    /// This is the variance assuming the data represents the entire population.
137    /// Use `sample_variance()` if the data is a sample.
138    pub fn variance(&self) -> f64 {
139        if self.count < 1 {
140            0.0
141        } else {
142            self.m2 / self.count as f64
143        }
144    }
145
146    /// Get the sample variance
147    ///
148    /// This is the unbiased variance estimator (Bessel's correction).
149    /// Use `variance()` for population variance.
150    pub fn sample_variance(&self) -> f64 {
151        if self.count < 2 {
152            0.0
153        } else {
154            self.m2 / (self.count - 1) as f64
155        }
156    }
157
158    /// Get the population standard deviation
159    pub fn stddev(&self) -> f64 {
160        math::sqrt(self.variance())
161    }
162
163    /// Get the sample standard deviation
164    pub fn sample_stddev(&self) -> f64 {
165        math::sqrt(self.sample_variance())
166    }
167
168    /// Get the minimum value
169    pub fn min(&self) -> Option<f64> {
170        if self.count == 0 {
171            None
172        } else {
173            Some(self.min)
174        }
175    }
176
177    /// Get the maximum value
178    pub fn max(&self) -> Option<f64> {
179        if self.count == 0 {
180            None
181        } else {
182            Some(self.max)
183        }
184    }
185
186    /// Get the range (max - min)
187    pub fn range(&self) -> Option<f64> {
188        if self.count == 0 {
189            None
190        } else {
191            Some(self.max - self.min)
192        }
193    }
194
195    /// Get the sum of all values
196    pub fn sum(&self) -> f64 {
197        self.mean * self.count as f64
198    }
199
200    /// Merge with another RunningStats using parallel algorithm
201    ///
202    /// Uses Chan et al.'s parallel algorithm for combining statistics.
203    pub fn merge_stats(&mut self, other: &Self) {
204        if other.count == 0 {
205            return;
206        }
207
208        if self.count == 0 {
209            *self = other.clone();
210            return;
211        }
212
213        let combined_count = self.count + other.count;
214        let delta = other.mean - self.mean;
215
216        // Combined mean
217        let combined_mean = self.mean + delta * (other.count as f64 / combined_count as f64);
218
219        // Combined M2 (Chan et al.'s parallel algorithm)
220        let combined_m2 = self.m2
221            + other.m2
222            + delta * delta * (self.count as f64 * other.count as f64 / combined_count as f64);
223
224        // Update
225        self.count = combined_count;
226        self.mean = combined_mean;
227        self.m2 = combined_m2;
228        self.min = self.min.min(other.min);
229        self.max = self.max.max(other.max);
230    }
231}
232
233impl Sketch for RunningStats {
234    type Item = f64;
235
236    fn update(&mut self, item: &Self::Item) {
237        self.add(*item);
238    }
239
240    fn merge(&mut self, other: &Self) -> Result<(), MergeError> {
241        self.merge_stats(other);
242        Ok(())
243    }
244
245    fn clear(&mut self) {
246        *self = Self::new();
247    }
248
249    fn size_bytes(&self) -> usize {
250        core::mem::size_of::<Self>()
251    }
252
253    fn count(&self) -> u64 {
254        self.count
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn test_basic() {
264        let mut stats = RunningStats::new();
265
266        stats.add(2.0);
267        stats.add(4.0);
268        stats.add(4.0);
269        stats.add(4.0);
270        stats.add(5.0);
271        stats.add(5.0);
272        stats.add(7.0);
273        stats.add(9.0);
274
275        assert_eq!(stats.len(), 8);
276        assert!((stats.mean() - 5.0).abs() < 0.001);
277        assert!((stats.variance() - 4.0).abs() < 0.001);
278        assert!((stats.stddev() - 2.0).abs() < 0.001);
279        assert_eq!(stats.min(), Some(2.0));
280        assert_eq!(stats.max(), Some(9.0));
281    }
282
283    #[test]
284    fn test_single_value() {
285        let mut stats = RunningStats::new();
286        stats.add(42.0);
287
288        assert_eq!(stats.len(), 1);
289        assert!((stats.mean() - 42.0).abs() < 0.001);
290        assert!((stats.variance() - 0.0).abs() < 0.001);
291        assert_eq!(stats.min(), Some(42.0));
292        assert_eq!(stats.max(), Some(42.0));
293    }
294
295    #[test]
296    fn test_empty() {
297        let stats = RunningStats::new();
298
299        assert!(stats.is_empty());
300        assert_eq!(stats.mean(), 0.0);
301        assert_eq!(stats.variance(), 0.0);
302        assert_eq!(stats.min(), None);
303        assert_eq!(stats.max(), None);
304    }
305
306    #[test]
307    fn test_merge() {
308        let mut stats1 = RunningStats::new();
309        let mut stats2 = RunningStats::new();
310
311        // Split data: [1,2,3] and [4,5,6]
312        for v in [1.0, 2.0, 3.0] {
313            stats1.add(v);
314        }
315        for v in [4.0, 5.0, 6.0] {
316            stats2.add(v);
317        }
318
319        stats1.merge(&stats2).unwrap();
320
321        assert_eq!(stats1.len(), 6);
322        assert!((stats1.mean() - 3.5).abs() < 0.001);
323        assert_eq!(stats1.min(), Some(1.0));
324        assert_eq!(stats1.max(), Some(6.0));
325        assert!((stats1.sum() - 21.0).abs() < 0.001);
326    }
327
328    #[test]
329    fn test_merge_empty() {
330        let mut stats1 = RunningStats::new();
331        let stats2 = RunningStats::new();
332
333        stats1.add(1.0);
334        stats1.add(2.0);
335
336        stats1.merge(&stats2).unwrap();
337
338        assert_eq!(stats1.len(), 2);
339        assert!((stats1.mean() - 1.5).abs() < 0.001);
340    }
341
342    #[test]
343    fn test_sample_variance() {
344        let mut stats = RunningStats::new();
345
346        // Dataset: [2, 4, 4, 4, 5, 5, 7, 9]
347        // Mean = 40/8 = 5.0
348        // Population variance = 32/8 = 4.0
349        // Sample variance = 32/7 ≈ 4.571 (Bessel's correction)
350        for v in [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
351            stats.add(v);
352        }
353
354        // Population variance = 4.0
355        assert!((stats.variance() - 4.0).abs() < 0.001);
356
357        // Sample variance = 4.0 * 8/7 ≈ 4.571
358        assert!((stats.sample_variance() - 4.571).abs() < 0.01);
359    }
360
361    #[test]
362    fn test_clear() {
363        let mut stats = RunningStats::new();
364
365        stats.add(1.0);
366        stats.add(2.0);
367        stats.add(3.0);
368
369        stats.clear();
370
371        assert!(stats.is_empty());
372        assert_eq!(stats.min(), None);
373    }
374
375    #[test]
376    fn test_numerical_stability() {
377        // Test with large values that could cause numerical issues
378        let mut stats = RunningStats::new();
379
380        let base = 1e12;
381        for i in 0..1000 {
382            stats.add(base + i as f64);
383        }
384
385        // Mean should be base + 499.5
386        let expected_mean = base + 499.5;
387        assert!(
388            (stats.mean() - expected_mean).abs() < 1.0,
389            "Mean: {} expected: {}",
390            stats.mean(),
391            expected_mean
392        );
393    }
394
395    #[test]
396    fn test_nan_ignored() {
397        let mut stats = RunningStats::new();
398
399        stats.add(1.0);
400        stats.add(f64::NAN);
401        stats.add(2.0);
402        stats.add(f64::NAN);
403        stats.add(3.0);
404
405        // NaNs should be ignored
406        assert_eq!(stats.len(), 3);
407        assert!((stats.mean() - 2.0).abs() < 0.001);
408        assert_eq!(stats.min(), Some(1.0));
409        assert_eq!(stats.max(), Some(3.0));
410
411        // Mean and variance should not be NaN
412        assert!(!stats.mean().is_nan());
413        assert!(!stats.variance().is_nan());
414    }
415
416    #[test]
417    fn test_infinity() {
418        let mut stats = RunningStats::new();
419
420        stats.add(1.0);
421        stats.add(f64::INFINITY);
422        stats.add(2.0);
423
424        // Infinity is a valid f64 value, should be included
425        assert_eq!(stats.len(), 3);
426        assert_eq!(stats.max(), Some(f64::INFINITY));
427    }
428}