Skip to main content

prettyping_rs/
stats.rs

1use crate::app::AppEvent;
2use crate::ring_buffer::RingBuffer;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub struct IntegerStats {
6    pub count: u64,
7    pub min_ms: u32,
8    pub avg_ms: u32,
9    pub max_ms: u32,
10    pub stddev_ms: u32,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct LossStats {
15    pub lost: u64,
16    pub total: u64,
17    pub percent: u32,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct GlobalStatsSnapshot {
22    pub loss: LossStats,
23    pub rtt: IntegerStats,
24    pub last_rtt_ms: u32,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct RecentStatsSnapshot {
29    pub loss: LossStats,
30    pub rtt: IntegerStats,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct StatsSnapshot {
35    pub global: Option<GlobalStatsSnapshot>,
36    pub recent: Option<RecentStatsSnapshot>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct Stats {
41    total_sent: u64,
42    total_received: u64,
43    total_lost: u64,
44    total_rtt_sum: u64,
45    min_rtt_ms: Option<u32>,
46    max_rtt_ms: Option<u32>,
47    last_rtt_ms: u32,
48    recent: RingBuffer<RecentSample>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52enum RecentSample {
53    Reply(u32),
54    Timeout,
55}
56
57impl Stats {
58    #[must_use]
59    pub fn new(last: u32) -> Self {
60        let capacity = usize::try_from(last).unwrap_or(usize::MAX);
61        Self {
62            total_sent: 0,
63            total_received: 0,
64            total_lost: 0,
65            total_rtt_sum: 0,
66            min_rtt_ms: None,
67            max_rtt_ms: None,
68            last_rtt_ms: 0,
69            recent: RingBuffer::with_capacity(capacity),
70        }
71    }
72
73    pub fn apply(&mut self, event: &AppEvent) {
74        match event {
75            AppEvent::ProbeSent { .. } => {
76                self.total_sent = self.total_sent.saturating_add(1);
77            }
78            AppEvent::ProbeReply {
79                rtt_ms,
80                duplicate,
81                late,
82                ..
83            } => {
84                // Duplicate replies and late replies should not mutate statistics.
85                // Late replies already missed the configured timeout and are tracked
86                // separately at the app-report layer.
87                if *duplicate || *late {
88                    return;
89                }
90                let value = u32::try_from(*rtt_ms).unwrap_or(u32::MAX);
91                self.total_received = self.total_received.saturating_add(1);
92                self.total_rtt_sum = self.total_rtt_sum.saturating_add(u64::from(value));
93                self.last_rtt_ms = value;
94
95                self.min_rtt_ms = Some(match self.min_rtt_ms {
96                    Some(current) => current.min(value),
97                    None => value,
98                });
99
100                self.max_rtt_ms = Some(match self.max_rtt_ms {
101                    Some(current) => current.max(value),
102                    None => value,
103                });
104
105                let _ = self.recent.push(RecentSample::Reply(value));
106            }
107            AppEvent::ProbeTimeout { .. } => {
108                self.total_lost = self.total_lost.saturating_add(1);
109                let _ = self.recent.push(RecentSample::Timeout);
110            }
111            _ => {}
112        }
113    }
114
115    #[must_use]
116    pub fn snapshot(&self, include_global: bool, include_recent: bool) -> StatsSnapshot {
117        let global = include_global.then(|| self.global_snapshot());
118        let recent = include_recent.then(|| self.recent_snapshot());
119        StatsSnapshot { global, recent }
120    }
121
122    #[must_use]
123    pub fn global_snapshot(&self) -> GlobalStatsSnapshot {
124        let loss = loss_stats(self.total_lost, self.total_sent);
125        let (min_ms, avg_ms, max_ms, stddev_ms) = if self.total_received == 0 {
126            (0, 0, 0, 0)
127        } else {
128            let min = self.min_rtt_ms.unwrap_or(0);
129            let max = self.max_rtt_ms.unwrap_or(0);
130            let avg = div_u64_to_u32(self.total_rtt_sum, self.total_received);
131            // We don't track full history of RTTs, so compute stddev over the recent window.
132            let stddev = standard_deviation_ms(recent_rtt_values(self.recent.iter()));
133            (min, avg, max, stddev)
134        };
135
136        GlobalStatsSnapshot {
137            loss,
138            rtt: IntegerStats {
139                count: self.total_received,
140                min_ms,
141                avg_ms,
142                max_ms,
143                stddev_ms,
144            },
145            last_rtt_ms: self.last_rtt_ms,
146        }
147    }
148
149    #[must_use]
150    pub fn recent_snapshot(&self) -> RecentStatsSnapshot {
151        let (lost_sum, total, recent_rtt_values) = fold_recent(self.recent.iter());
152        let loss = loss_stats(lost_sum, total);
153
154        let rtt = integer_stats_from_values(&recent_rtt_values);
155
156        RecentStatsSnapshot { loss, rtt }
157    }
158}
159
160fn fold_recent<'a, I>(samples: I) -> (u64, u64, Vec<u32>)
161where
162    I: IntoIterator<Item = &'a RecentSample>,
163{
164    let mut lost_sum = 0u64;
165    let mut total = 0u64;
166    let mut rtt_values = Vec::new();
167
168    for sample in samples {
169        total = total.saturating_add(1);
170        match sample {
171            RecentSample::Reply(value) => rtt_values.push(*value),
172            RecentSample::Timeout => lost_sum = lost_sum.saturating_add(1),
173        }
174    }
175
176    (lost_sum, total, rtt_values)
177}
178
179fn recent_rtt_values<'a, I>(samples: I) -> impl Iterator<Item = u32> + 'a
180where
181    I: IntoIterator<Item = &'a RecentSample> + 'a,
182{
183    samples.into_iter().filter_map(|sample| match sample {
184        RecentSample::Reply(value) => Some(*value),
185        RecentSample::Timeout => None,
186    })
187}
188
189fn integer_stats_from_values(values: &[u32]) -> IntegerStats {
190    if values.is_empty() {
191        return IntegerStats {
192            count: 0,
193            min_ms: 0,
194            avg_ms: 0,
195            max_ms: 0,
196            stddev_ms: 0,
197        };
198    }
199
200    let mut min = u32::MAX;
201    let mut max = 0u32;
202    let mut sum = 0u64;
203    for value in values {
204        min = min.min(*value);
205        max = max.max(*value);
206        sum = sum.saturating_add(u64::from(*value));
207    }
208
209    let count = u64::try_from(values.len()).unwrap_or(u64::MAX);
210    let avg = div_u64_to_u32(sum, count);
211    let stddev = standard_deviation_ms(values.iter().copied());
212
213    IntegerStats {
214        count,
215        min_ms: min,
216        avg_ms: avg,
217        max_ms: max,
218        stddev_ms: stddev,
219    }
220}
221
222fn loss_stats(lost: u64, total: u64) -> LossStats {
223    let percent = if total == 0 {
224        0
225    } else {
226        div_u64_to_u32(lost.saturating_mul(100), total)
227    };
228    LossStats {
229        lost,
230        total,
231        percent,
232    }
233}
234
235fn standard_deviation_ms<I>(values: I) -> u32
236where
237    I: IntoIterator<Item = u32>,
238{
239    // Population standard deviation, rounded to the nearest integer millisecond.
240    // Uses Welford's online algorithm for numerical stability.
241    let mut n: u64 = 0;
242    let mut mean: f64 = 0.0;
243    let mut m2: f64 = 0.0;
244
245    for value in values {
246        n += 1;
247        let x = f64::from(value);
248        let delta = x - mean;
249        mean += delta / n as f64;
250        let delta2 = x - mean;
251        m2 += delta * delta2;
252    }
253
254    if n == 0 {
255        return 0;
256    }
257
258    let variance = m2 / n as f64;
259    let stddev = variance.sqrt();
260
261    if !stddev.is_finite() {
262        return 0;
263    }
264
265    u32::try_from(stddev.round() as u64).unwrap_or(u32::MAX)
266}
267
268fn div_u64_to_u32(numerator: u64, denominator: u64) -> u32 {
269    if denominator == 0 {
270        return 0;
271    }
272    let quotient = numerator / denominator;
273    u32::try_from(quotient).unwrap_or(u32::MAX)
274}
275
276#[cfg(test)]
277mod tests {
278    use std::time::Duration;
279
280    use crate::app::AppEvent;
281
282    use super::Stats;
283
284    #[test]
285    fn tracks_integer_stats_with_recent_window() {
286        let mut stats = Stats::new(3);
287
288        stats.apply(&AppEvent::ProbeSent {
289            seq: 1,
290            at: Duration::ZERO,
291        });
292
293        stats.apply(&AppEvent::ProbeReply {
294            seq: 1,
295            sent_at: Duration::ZERO,
296            received_at: Duration::from_millis(9),
297            rtt_ms: 9,
298            duplicate: false,
299            late: false,
300        });
301        stats.apply(&AppEvent::ProbeSent {
302            seq: 2,
303            at: Duration::ZERO,
304        });
305        stats.apply(&AppEvent::ProbeReply {
306            seq: 2,
307            sent_at: Duration::ZERO,
308            received_at: Duration::from_millis(20),
309            rtt_ms: 20,
310            duplicate: false,
311            late: false,
312        });
313        stats.apply(&AppEvent::ProbeSent {
314            seq: 3,
315            at: Duration::ZERO,
316        });
317        stats.apply(&AppEvent::ProbeTimeout {
318            seq: 3,
319            sent_at: Duration::ZERO,
320            deadline: Duration::from_millis(100),
321        });
322        stats.apply(&AppEvent::ProbeSent {
323            seq: 4,
324            at: Duration::ZERO,
325        });
326        stats.apply(&AppEvent::ProbeReply {
327            seq: 4,
328            sent_at: Duration::ZERO,
329            received_at: Duration::from_millis(51),
330            rtt_ms: 51,
331            duplicate: false,
332            late: false,
333        });
334
335        let global = stats.global_snapshot();
336        assert_eq!(global.loss.lost, 1);
337        assert_eq!(global.loss.total, 4);
338        assert_eq!(global.loss.percent, 25);
339        assert_eq!(global.rtt.count, 3);
340        assert_eq!(global.rtt.min_ms, 9);
341        assert_eq!(global.rtt.avg_ms, 26);
342        assert_eq!(global.rtt.max_ms, 51);
343        assert_eq!(global.last_rtt_ms, 51);
344
345        let recent = stats.recent_snapshot();
346        assert_eq!(recent.loss.lost, 1);
347        assert_eq!(recent.loss.total, 3);
348        assert_eq!(recent.loss.percent, 33);
349        assert_eq!(recent.rtt.count, 2);
350        assert_eq!(recent.rtt.min_ms, 20);
351        assert_eq!(recent.rtt.avg_ms, 35);
352        assert_eq!(recent.rtt.max_ms, 51);
353    }
354
355    #[test]
356    fn duplicate_replies_do_not_skew_stats() {
357        let mut stats = Stats::new(5);
358
359        stats.apply(&AppEvent::ProbeSent {
360            seq: 1,
361            at: Duration::ZERO,
362        });
363
364        stats.apply(&AppEvent::ProbeReply {
365            seq: 1,
366            sent_at: Duration::ZERO,
367            received_at: Duration::from_millis(10),
368            rtt_ms: 10,
369            duplicate: false,
370            late: false,
371        });
372
373        stats.apply(&AppEvent::ProbeReply {
374            seq: 1,
375            sent_at: Duration::ZERO,
376            received_at: Duration::from_millis(12),
377            rtt_ms: 12,
378            duplicate: true,
379            late: false,
380        });
381
382        let global = stats.global_snapshot();
383        assert_eq!(global.loss.total, 1);
384        assert_eq!(global.loss.lost, 0);
385        assert_eq!(global.loss.percent, 0);
386        assert_eq!(global.rtt.count, 1);
387        assert_eq!(global.rtt.avg_ms, 10);
388        assert_eq!(global.last_rtt_ms, 10);
389    }
390
391    #[test]
392    fn late_replies_do_not_change_global_loss_denominator() {
393        let mut stats = Stats::new(5);
394
395        stats.apply(&AppEvent::ProbeSent {
396            seq: 1,
397            at: Duration::ZERO,
398        });
399        stats.apply(&AppEvent::ProbeTimeout {
400            seq: 1,
401            sent_at: Duration::ZERO,
402            deadline: Duration::from_millis(1_000),
403        });
404        stats.apply(&AppEvent::ProbeReply {
405            seq: 1,
406            sent_at: Duration::ZERO,
407            received_at: Duration::from_millis(1_500),
408            rtt_ms: 1_500,
409            duplicate: false,
410            late: true,
411        });
412
413        let global = stats.global_snapshot();
414        assert_eq!(global.loss.lost, 1);
415        assert_eq!(global.loss.total, 1);
416        assert_eq!(global.loss.percent, 100);
417        assert_eq!(global.rtt.count, 0);
418    }
419}