Skip to main content

netpulse/stats/
engine.rs

1// src/stats/engine.rs — Statistics Engine
2//
3// Computes network quality metrics from a snapshot of ProbeResults:
4// - RTT: min, max, mean, p50, p90, p95, p99
5// - Jitter: mean absolute deviation of successive RTT differences (RFC 3393)
6// - Packet loss percentage
7// - Burst loss: detection of consecutive packet loss runs
8// - Reorder detection: packets arriving out of sequence
9
10use crate::probers::ProbeResult;
11use serde::Serialize;
12
13/// A complete statistical snapshot for one target at one point in time.
14#[derive(Debug, Clone, Serialize)]
15pub struct StatsSnapshot {
16    pub target: String,
17    pub sample_count: usize,
18    pub loss_count: usize,
19    pub loss_pct: f64,
20
21    // RTT stats in microseconds (None if no successful probes)
22    pub rtt_min_us: Option<u64>,
23    pub rtt_max_us: Option<u64>,
24    pub rtt_mean_us: Option<f64>,
25    pub rtt_p50_us: Option<u64>,
26    pub rtt_p90_us: Option<u64>,
27    pub rtt_p95_us: Option<u64>,
28    pub rtt_p99_us: Option<u64>,
29
30    /// Jitter in microseconds (RFC 3393 — mean of |RTT[n] - RTT[n-1]|)
31    pub jitter_us: Option<f64>,
32
33    /// Maximum consecutive-loss streak in the sample window
34    pub max_burst_loss: usize,
35
36    /// Number of out-of-order arrivals detected
37    pub reorder_count: usize,
38}
39
40/// The stats engine: stateless, operates on owned snapshots.
41pub struct StatsEngine;
42
43impl StatsEngine {
44    /// Compute a full StatsSnapshot from a Vec of ProbeResults.
45    pub fn compute(target: &str, samples: Vec<ProbeResult>) -> StatsSnapshot {
46        let sample_count = samples.len();
47
48        if sample_count == 0 {
49            return StatsSnapshot {
50                target: target.to_string(),
51                sample_count: 0,
52                loss_count: 0,
53                loss_pct: 0.0,
54                rtt_min_us: None,
55                rtt_max_us: None,
56                rtt_mean_us: None,
57                rtt_p50_us: None,
58                rtt_p90_us: None,
59                rtt_p95_us: None,
60                rtt_p99_us: None,
61                jitter_us: None,
62                max_burst_loss: 0,
63                reorder_count: 0,
64            };
65        }
66
67        let loss_count = samples.iter().filter(|s| s.is_loss()).count();
68        let loss_pct = (loss_count as f64 / sample_count as f64) * 100.0;
69
70        // Collect successful RTTs
71        let mut rtts: Vec<u64> = samples.iter().filter_map(|s| s.rtt_us).collect();
72
73        let (rtt_min_us, rtt_max_us, rtt_mean_us, rtt_p50_us, rtt_p90_us, rtt_p95_us, rtt_p99_us) =
74            if rtts.is_empty() {
75                (None, None, None, None, None, None, None)
76            } else {
77                rtts.sort_unstable();
78                let min = *rtts.first().unwrap();
79                let max = *rtts.last().unwrap();
80                let mean = rtts.iter().sum::<u64>() as f64 / rtts.len() as f64;
81
82                let p50 = percentile(&rtts, 50.0);
83                let p90 = percentile(&rtts, 90.0);
84                let p95 = percentile(&rtts, 95.0);
85                let p99 = percentile(&rtts, 99.0);
86
87                (
88                    Some(min),
89                    Some(max),
90                    Some(mean),
91                    Some(p50),
92                    Some(p90),
93                    Some(p95),
94                    Some(p99),
95                )
96            };
97
98        // Jitter: mean absolute deviation of successive RTT differences (RFC 3393)
99        // Only computed on the ordered sample stream (not the sorted RTT list)
100        let ordered_rtts: Vec<u64> = samples.iter().filter_map(|s| s.rtt_us).collect();
101        let jitter_us = if ordered_rtts.len() >= 2 {
102            let diffs: Vec<f64> = ordered_rtts
103                .windows(2)
104                .map(|w| (w[1] as f64 - w[0] as f64).abs())
105                .collect();
106            Some(diffs.iter().sum::<f64>() / diffs.len() as f64)
107        } else {
108            None
109        };
110
111        // Burst loss: find the longest streak of consecutive losses
112        let max_burst_loss = {
113            let mut max_streak = 0usize;
114            let mut current_streak = 0usize;
115            for s in &samples {
116                if s.is_loss() {
117                    current_streak += 1;
118                    max_streak = max_streak.max(current_streak);
119                } else {
120                    current_streak = 0;
121                }
122            }
123            max_streak
124        };
125
126        // Reorder detection: count packets arriving with a seq lower than max seen
127        let reorder_count = {
128            let mut max_seq_seen: u64 = 0;
129            let mut reorders = 0usize;
130            for s in &samples {
131                if s.seq < max_seq_seen {
132                    reorders += 1;
133                } else {
134                    max_seq_seen = s.seq;
135                }
136            }
137            reorders
138        };
139
140        StatsSnapshot {
141            target: target.to_string(),
142            sample_count,
143            loss_count,
144            loss_pct,
145            rtt_min_us,
146            rtt_max_us,
147            rtt_mean_us,
148            rtt_p50_us,
149            rtt_p90_us,
150            rtt_p95_us,
151            rtt_p99_us,
152            jitter_us,
153            max_burst_loss,
154            reorder_count,
155        }
156    }
157}
158
159/// Compute a percentile value from a sorted slice using linear interpolation.
160fn percentile(sorted: &[u64], pct: f64) -> u64 {
161    if sorted.is_empty() {
162        return 0;
163    }
164    if sorted.len() == 1 {
165        return sorted[0];
166    }
167
168    let rank = pct / 100.0 * (sorted.len() - 1) as f64;
169    let lower = rank.floor() as usize;
170    let upper = rank.ceil() as usize;
171
172    if lower == upper {
173        return sorted[lower];
174    }
175
176    let frac = rank - lower as f64;
177    (sorted[lower] as f64 + frac * (sorted[upper] as f64 - sorted[lower] as f64)) as u64
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::probers::ProbeResult;
184    use chrono::Utc;
185
186    fn make_sample(rtt_us: Option<u64>, seq: u64) -> ProbeResult {
187        ProbeResult {
188            target: "8.8.8.8".to_string(),
189            rtt_us,
190            timestamp: Utc::now(),
191            seq,
192            responder_ip: None,
193        }
194    }
195
196    #[test]
197    fn test_percentiles_known_values() {
198        // RTTs: 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 ms (in µs)
199        let samples: Vec<ProbeResult> =
200            (1..=10).map(|i| make_sample(Some(i * 10_000), i)).collect();
201
202        let snap = StatsEngine::compute("test", samples);
203        assert_eq!(snap.rtt_min_us, Some(10_000));
204        assert_eq!(snap.rtt_max_us, Some(100_000));
205        // With 10 samples [10k..100k], rank = 0.5 * 9 = 4.5 → interpolates to 55_000
206        assert_eq!(snap.rtt_p50_us, Some(55_000));
207        assert_eq!(snap.loss_pct, 0.0);
208    }
209
210    #[test]
211    fn test_loss_counting() {
212        let mut samples: Vec<ProbeResult> = (0..8).map(|i| make_sample(Some(10_000), i)).collect();
213        samples.push(make_sample(None, 8));
214        samples.push(make_sample(None, 9));
215
216        let snap = StatsEngine::compute("test", samples);
217        assert_eq!(snap.loss_count, 2);
218        assert!((snap.loss_pct - 20.0).abs() < 0.001);
219    }
220
221    #[test]
222    fn test_burst_loss() {
223        let samples = vec![
224            make_sample(Some(10_000), 0),
225            make_sample(None, 1),
226            make_sample(None, 2),
227            make_sample(None, 3),
228            make_sample(Some(10_000), 4),
229        ];
230        let snap = StatsEngine::compute("test", samples);
231        assert_eq!(snap.max_burst_loss, 3);
232    }
233
234    #[test]
235    fn test_jitter() {
236        // RTTs: 10ms, 20ms, 15ms → diffs: 10, 5 → jitter = 7.5ms
237        let samples = vec![
238            make_sample(Some(10_000), 0),
239            make_sample(Some(20_000), 1),
240            make_sample(Some(15_000), 2),
241        ];
242        let snap = StatsEngine::compute("test", samples);
243        let jitter = snap.jitter_us.unwrap();
244        assert!((jitter - 7_500.0).abs() < 1.0, "jitter was {}", jitter);
245    }
246
247    #[test]
248    fn test_empty_samples() {
249        let snap = StatsEngine::compute("test", vec![]);
250        assert_eq!(snap.sample_count, 0);
251        assert!(snap.rtt_min_us.is_none());
252    }
253}