Skip to main content

synapse_pingora/profiler/
distribution.rs

1//! Online statistics using Welford's algorithm and P-square percentiles.
2//!
3//! Provides streaming statistical analysis without storing all samples:
4//! - Mean and variance tracking via Welford's algorithm
5//! - Approximate percentiles (p50, p95, p99) via P-square algorithm
6//!
7//! ## Performance
8//! - Update: O(1) time and space
9//! - Z-score calculation: O(1)
10//! - Memory: ~130 bytes per Distribution
11
12use serde::{Deserialize, Serialize};
13
14// ============================================================================
15// PercentilesTracker - P-square algorithm for streaming percentiles
16// ============================================================================
17
18/// P-square algorithm for dynamic percentile estimation.
19///
20/// Maintains 5 markers for min, p50, p95, p99, max with O(1) updates.
21/// Memory: ~56 bytes
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct PercentilesTracker {
24    /// Marker heights (actual values)
25    q: [f64; 5],
26    /// Marker positions (counts)
27    n: [f64; 5],
28    /// Desired marker positions
29    n_prime: [f64; 5],
30    /// Initialization buffer (first 5 samples)
31    init_count: u8,
32    init_buffer: [f64; 5],
33}
34
35impl Default for PercentilesTracker {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl PercentilesTracker {
42    /// Create a new tracker for p50, p95, p99.
43    ///
44    /// Increments for desired positions are constants: 0, 0.5, 0.95, 0.99, 1.0
45    /// These are used inline in the update method rather than stored.
46    pub fn new() -> Self {
47        Self {
48            q: [0.0; 5],
49            n: [1.0, 2.0, 3.0, 4.0, 5.0],
50            // Desired positions for min, p50, p95, p99, max
51            n_prime: [1.0, 2.0, 3.0, 4.0, 5.0],
52            init_count: 0,
53            init_buffer: [0.0; 5],
54        }
55    }
56
57    /// Update with a new sample.
58    #[inline]
59    pub fn update(&mut self, value: f64) {
60        // SECURITY: Ignore non-finite values to prevent marker corruption (WAF-P0-2)
61        if !value.is_finite() {
62            return;
63        }
64
65        // Initialization phase: collect first 5 samples
66        if self.init_count < 5 {
67            self.init_buffer[self.init_count as usize] = value;
68            self.init_count += 1;
69            if self.init_count == 5 {
70                // Sort and initialize markers
71                self.init_buffer.sort_by(|a, b| a.partial_cmp(b).unwrap());
72                self.q = self.init_buffer;
73            }
74            return;
75        }
76
77        // Find cell k where value belongs
78        let k = if value < self.q[0] {
79            self.q[0] = value;
80            0
81        } else if value < self.q[1] {
82            0
83        } else if value < self.q[2] {
84            1
85        } else if value < self.q[3] {
86            2
87        } else if value < self.q[4] {
88            3
89        } else {
90            self.q[4] = value;
91            3
92        };
93
94        // Increment positions for markers > k
95        for i in (k + 1)..5 {
96            self.n[i] += 1.0;
97        }
98
99        // Update desired positions
100        let total = self.n[4];
101        self.n_prime[1] = 1.0 + 0.5 * total;
102        self.n_prime[2] = 1.0 + 0.95 * total;
103        self.n_prime[3] = 1.0 + 0.99 * total;
104        self.n_prime[4] = total;
105
106        // Adjust marker heights if needed (P-square adjustment)
107        for i in 1..4 {
108            let d = self.n_prime[i] - self.n[i];
109            if (d >= 1.0 && self.n[i + 1] - self.n[i] > 1.0)
110                || (d <= -1.0 && self.n[i - 1] - self.n[i] < -1.0)
111            {
112                let d_sign = if d >= 0.0 { 1.0 } else { -1.0 };
113                // Parabolic adjustment
114                let qi = self.q[i];
115                let qip1 = self.q[i + 1];
116                let qim1 = self.q[i - 1];
117                let ni = self.n[i];
118                let nip1 = self.n[i + 1];
119                let nim1 = self.n[i - 1];
120
121                let q_new = qi
122                    + d_sign / (nip1 - nim1)
123                        * ((ni - nim1 + d_sign) * (qip1 - qi) / (nip1 - ni)
124                            + (nip1 - ni - d_sign) * (qi - qim1) / (ni - nim1));
125
126                // Check bounds and use linear if parabolic fails
127                if qim1 < q_new && q_new < qip1 {
128                    self.q[i] = q_new;
129                } else {
130                    // Linear adjustment
131                    let idx = if d_sign >= 0.0 { i + 1 } else { i - 1 };
132                    self.q[i] = qi + d_sign * (self.q[idx] - qi) / (self.n[idx] - ni);
133                }
134                self.n[i] += d_sign;
135            }
136        }
137    }
138
139    /// Get percentiles (p50, p95, p99).
140    #[inline]
141    pub fn get(&self) -> (f64, f64, f64) {
142        if self.init_count < 5 {
143            // Not enough data
144            return (0.0, 0.0, 0.0);
145        }
146        (self.q[1], self.q[2], self.q[3])
147    }
148
149    /// Get minimum value seen.
150    #[inline]
151    pub fn min(&self) -> f64 {
152        if self.init_count < 5 {
153            return 0.0;
154        }
155        self.q[0]
156    }
157
158    /// Get maximum value seen.
159    #[inline]
160    pub fn max(&self) -> f64 {
161        if self.init_count < 5 {
162            return 0.0;
163        }
164        self.q[4]
165    }
166
167    /// Check if tracker has enough data for meaningful percentiles.
168    #[inline]
169    pub fn is_initialized(&self) -> bool {
170        self.init_count >= 5
171    }
172}
173
174// ============================================================================
175// Distribution - Online statistics with Welford's algorithm
176// ============================================================================
177
178/// Online statistics calculator using Welford's algorithm.
179///
180/// Tracks mean, variance, and approximate percentiles without storing all samples.
181/// Memory: ~130 bytes
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct Distribution {
184    /// Sample count
185    count: u32,
186    /// Running mean
187    mean: f64,
188    /// Running M2 (sum of squared differences from mean)
189    /// Variance = M2 / count
190    m2: f64,
191    /// Approximate percentiles using P-square algorithm
192    percentiles: PercentilesTracker,
193}
194
195impl Default for Distribution {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl Distribution {
202    /// Create a new empty distribution.
203    #[inline]
204    pub fn new() -> Self {
205        Self {
206            count: 0,
207            mean: 0.0,
208            m2: 0.0,
209            percentiles: PercentilesTracker::new(),
210        }
211    }
212
213    /// Add a sample using Welford's online algorithm.
214    /// O(1) time and space.
215    #[inline]
216    pub fn update(&mut self, value: f64) {
217        // SECURITY: Ignore non-finite values to prevent statistical poisoning (WAF-P0-1)
218        if !value.is_finite() {
219            return;
220        }
221
222        self.count += 1;
223        let delta = value - self.mean;
224        self.mean += delta / self.count as f64;
225        let delta2 = value - self.mean;
226        self.m2 += delta * delta2;
227
228        self.percentiles.update(value);
229    }
230
231    /// Get sample count.
232    #[inline]
233    pub fn count(&self) -> u32 {
234        self.count
235    }
236
237    /// Get the mean.
238    #[inline]
239    pub fn mean(&self) -> f64 {
240        self.mean
241    }
242
243    /// Compute standard deviation.
244    #[inline]
245    pub fn stddev(&self) -> f64 {
246        if self.count < 2 {
247            return 0.0;
248        }
249        (self.m2 / self.count as f64).sqrt()
250    }
251
252    /// Compute variance.
253    #[inline]
254    pub fn variance(&self) -> f64 {
255        if self.count < 2 {
256            return 0.0;
257        }
258        self.m2 / self.count as f64
259    }
260
261    /// Calculate z-score for a value.
262    /// Returns how many standard deviations from the mean.
263    #[inline]
264    pub fn z_score(&self, value: f64) -> f64 {
265        let std = self.stddev();
266        if std < 0.01 {
267            // Avoid division by zero or near-zero
268            return 0.0;
269        }
270        (value - self.mean) / std
271    }
272
273    /// Get approximate percentiles (p50, p95, p99).
274    #[inline]
275    pub fn percentiles(&self) -> (f64, f64, f64) {
276        self.percentiles.get()
277    }
278
279    /// Get minimum value seen.
280    #[inline]
281    pub fn min(&self) -> f64 {
282        self.percentiles.min()
283    }
284
285    /// Get maximum value seen.
286    #[inline]
287    pub fn max(&self) -> f64 {
288        self.percentiles.max()
289    }
290
291    /// Check if distribution has enough data for statistical analysis.
292    #[inline]
293    pub fn is_valid(&self) -> bool {
294        self.count >= 5
295    }
296}
297
298// ============================================================================
299// Tests
300// ============================================================================
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn test_distribution_welford() {
308        let mut d = Distribution::new();
309        for v in [10.0, 20.0, 30.0, 40.0, 50.0] {
310            d.update(v);
311        }
312        assert!((d.mean() - 30.0).abs() < 0.01);
313        // Population stddev of [10,20,30,40,50] is ~14.14
314        assert!((d.stddev() - 14.14).abs() < 0.5);
315    }
316
317    #[test]
318    fn test_distribution_z_score() {
319        let mut d = Distribution::new();
320        // Create distribution with mean=100, stddev≈10
321        for v in [90.0, 95.0, 100.0, 105.0, 110.0] {
322            d.update(v);
323        }
324
325        // Value at mean should have z-score ~0
326        assert!(d.z_score(100.0).abs() < 0.1);
327
328        // Value 2 stddev above mean
329        let z = d.z_score(100.0 + 2.0 * d.stddev());
330        assert!((z - 2.0).abs() < 0.1);
331    }
332
333    #[test]
334    fn test_distribution_empty() {
335        let d = Distribution::new();
336        assert_eq!(d.count(), 0);
337        assert_eq!(d.mean(), 0.0);
338        assert_eq!(d.stddev(), 0.0);
339        assert_eq!(d.variance(), 0.0);
340        assert!(!d.is_valid());
341    }
342
343    #[test]
344    fn test_distribution_single_value() {
345        let mut d = Distribution::new();
346        d.update(42.0);
347        assert_eq!(d.count(), 1);
348        assert_eq!(d.mean(), 42.0);
349        assert_eq!(d.stddev(), 0.0); // Need 2+ samples for stddev
350    }
351
352    #[test]
353    fn test_distribution_variance() {
354        let mut d = Distribution::new();
355        // Uniform distribution from 0 to 10: variance = (10-0)^2 / 12 ≈ 8.33
356        for v in [0.0, 2.0, 4.0, 6.0, 8.0, 10.0] {
357            d.update(v);
358        }
359        // Mean should be 5.0
360        assert!((d.mean() - 5.0).abs() < 0.01);
361        // Variance should be roughly 11.67 (sample variance)
362        assert!(d.variance() > 0.0);
363    }
364
365    #[test]
366    fn test_percentiles_tracker() {
367        let mut pt = PercentilesTracker::new();
368
369        // Add 100 samples (1 to 100)
370        for i in 1..=100 {
371            pt.update(i as f64);
372        }
373
374        let (p50, p95, p99) = pt.get();
375
376        // p50 should be around 50
377        assert!((p50 - 50.0).abs() < 5.0);
378        // p95 should be around 95
379        assert!((p95 - 95.0).abs() < 5.0);
380        // p99 should be around 99
381        assert!((p99 - 99.0).abs() < 3.0);
382    }
383
384    #[test]
385    fn test_percentiles_tracker_not_initialized() {
386        let mut pt = PercentilesTracker::new();
387        assert!(!pt.is_initialized());
388
389        pt.update(1.0);
390        pt.update(2.0);
391        assert!(!pt.is_initialized());
392
393        let (p50, p95, p99) = pt.get();
394        assert_eq!(p50, 0.0);
395        assert_eq!(p95, 0.0);
396        assert_eq!(p99, 0.0);
397    }
398
399    #[test]
400    fn test_percentiles_tracker_min_max() {
401        let mut pt = PercentilesTracker::new();
402        for i in 1..=10 {
403            pt.update(i as f64);
404        }
405
406        assert_eq!(pt.min(), 1.0);
407        assert_eq!(pt.max(), 10.0);
408    }
409
410    #[test]
411    fn test_distribution_z_score_edge_cases() {
412        let mut d = Distribution::new();
413        // All same values - stddev is 0
414        for _ in 0..10 {
415            d.update(50.0);
416        }
417
418        // Z-score should be 0 when stddev is 0 (avoids division by zero)
419        assert_eq!(d.z_score(100.0), 0.0);
420    }
421
422    #[test]
423    fn test_distribution_negative_values() {
424        let mut d = Distribution::new();
425        for v in [-10.0, -5.0, 0.0, 5.0, 10.0] {
426            d.update(v);
427        }
428        assert!((d.mean() - 0.0).abs() < 0.01);
429        assert!(d.stddev() > 0.0);
430    }
431
432    #[test]
433    fn test_distribution_large_values() {
434        let mut d = Distribution::new();
435        for v in [1e9, 1e9 + 1.0, 1e9 + 2.0, 1e9 + 3.0, 1e9 + 4.0] {
436            d.update(v);
437        }
438        assert!((d.mean() - (1e9 + 2.0)).abs() < 0.01);
439    }
440
441    #[test]
442    fn test_percentiles_with_outliers() {
443        let mut pt = PercentilesTracker::new();
444
445        // Add normal values
446        for i in 1..=95 {
447            pt.update(i as f64);
448        }
449        // Add outliers
450        for _ in 0..5 {
451            pt.update(1000.0);
452        }
453
454        let (p50, p95, p99) = pt.get();
455
456        // p50 should still be around 50 (median is robust to outliers)
457        assert!((p50 - 50.0).abs() < 10.0);
458    }
459
460    #[test]
461    fn test_distribution_nan_stability() {
462        let mut d = Distribution::new();
463        d.update(10.0);
464        d.update(f64::NAN);
465        d.update(f64::INFINITY);
466        d.update(20.0);
467
468        assert_eq!(d.count(), 2);
469        assert!((d.mean() - 15.0).abs() < 0.01);
470        assert!(d.stddev().is_finite());
471    }
472
473    #[test]
474    fn test_percentiles_nan_stability() {
475        let mut pt = PercentilesTracker::new();
476        for _ in 0..10 {
477            pt.update(10.0);
478            pt.update(f64::NAN);
479        }
480
481        let (p50, p95, p99) = pt.get();
482        assert!(p50.is_finite());
483        assert!(p95.is_finite());
484        assert!(p99.is_finite());
485    }
486}