1use crate::app::AppEvent;
2use crate::ring_buffer::RingBuffer;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub struct IntegerStats {
6 pub count: u64,
7 pub min_ms: u32,
8 pub avg_ms: u32,
9 pub max_ms: u32,
10 pub stddev_ms: u32,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct LossStats {
15 pub lost: u64,
16 pub total: u64,
17 pub percent: u32,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct GlobalStatsSnapshot {
22 pub loss: LossStats,
23 pub rtt: IntegerStats,
24 pub last_rtt_ms: u32,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct RecentStatsSnapshot {
29 pub loss: LossStats,
30 pub rtt: IntegerStats,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct StatsSnapshot {
35 pub global: Option<GlobalStatsSnapshot>,
36 pub recent: Option<RecentStatsSnapshot>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct Stats {
41 total_sent: u64,
42 total_received: u64,
43 total_lost: u64,
44 total_rtt_sum: u64,
45 min_rtt_ms: Option<u32>,
46 max_rtt_ms: Option<u32>,
47 last_rtt_ms: u32,
48 recent: RingBuffer<RecentSample>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52enum RecentSample {
53 Reply(u32),
54 Timeout,
55}
56
57impl Stats {
58 #[must_use]
59 pub fn new(last: u32) -> Self {
60 let capacity = usize::try_from(last).unwrap_or(usize::MAX);
61 Self {
62 total_sent: 0,
63 total_received: 0,
64 total_lost: 0,
65 total_rtt_sum: 0,
66 min_rtt_ms: None,
67 max_rtt_ms: None,
68 last_rtt_ms: 0,
69 recent: RingBuffer::with_capacity(capacity),
70 }
71 }
72
73 pub fn apply(&mut self, event: &AppEvent) {
74 match event {
75 AppEvent::ProbeSent { .. } => {
76 self.total_sent = self.total_sent.saturating_add(1);
77 }
78 AppEvent::ProbeReply {
79 rtt_ms,
80 duplicate,
81 late,
82 ..
83 } => {
84 if *duplicate || *late {
88 return;
89 }
90 let value = u32::try_from(*rtt_ms).unwrap_or(u32::MAX);
91 self.total_received = self.total_received.saturating_add(1);
92 self.total_rtt_sum = self.total_rtt_sum.saturating_add(u64::from(value));
93 self.last_rtt_ms = value;
94
95 self.min_rtt_ms = Some(match self.min_rtt_ms {
96 Some(current) => current.min(value),
97 None => value,
98 });
99
100 self.max_rtt_ms = Some(match self.max_rtt_ms {
101 Some(current) => current.max(value),
102 None => value,
103 });
104
105 let _ = self.recent.push(RecentSample::Reply(value));
106 }
107 AppEvent::ProbeTimeout { .. } => {
108 self.total_lost = self.total_lost.saturating_add(1);
109 let _ = self.recent.push(RecentSample::Timeout);
110 }
111 _ => {}
112 }
113 }
114
115 #[must_use]
116 pub fn snapshot(&self, include_global: bool, include_recent: bool) -> StatsSnapshot {
117 let global = include_global.then(|| self.global_snapshot());
118 let recent = include_recent.then(|| self.recent_snapshot());
119 StatsSnapshot { global, recent }
120 }
121
122 #[must_use]
123 pub fn global_snapshot(&self) -> GlobalStatsSnapshot {
124 let loss = loss_stats(self.total_lost, self.total_sent);
125 let (min_ms, avg_ms, max_ms, stddev_ms) = if self.total_received == 0 {
126 (0, 0, 0, 0)
127 } else {
128 let min = self.min_rtt_ms.unwrap_or(0);
129 let max = self.max_rtt_ms.unwrap_or(0);
130 let avg = div_u64_to_u32(self.total_rtt_sum, self.total_received);
131 let stddev = standard_deviation_ms(recent_rtt_values(self.recent.iter()));
133 (min, avg, max, stddev)
134 };
135
136 GlobalStatsSnapshot {
137 loss,
138 rtt: IntegerStats {
139 count: self.total_received,
140 min_ms,
141 avg_ms,
142 max_ms,
143 stddev_ms,
144 },
145 last_rtt_ms: self.last_rtt_ms,
146 }
147 }
148
149 #[must_use]
150 pub fn recent_snapshot(&self) -> RecentStatsSnapshot {
151 let (lost_sum, total, recent_rtt_values) = fold_recent(self.recent.iter());
152 let loss = loss_stats(lost_sum, total);
153
154 let rtt = integer_stats_from_values(&recent_rtt_values);
155
156 RecentStatsSnapshot { loss, rtt }
157 }
158}
159
160fn fold_recent<'a, I>(samples: I) -> (u64, u64, Vec<u32>)
161where
162 I: IntoIterator<Item = &'a RecentSample>,
163{
164 let mut lost_sum = 0u64;
165 let mut total = 0u64;
166 let mut rtt_values = Vec::new();
167
168 for sample in samples {
169 total = total.saturating_add(1);
170 match sample {
171 RecentSample::Reply(value) => rtt_values.push(*value),
172 RecentSample::Timeout => lost_sum = lost_sum.saturating_add(1),
173 }
174 }
175
176 (lost_sum, total, rtt_values)
177}
178
179fn recent_rtt_values<'a, I>(samples: I) -> impl Iterator<Item = u32> + 'a
180where
181 I: IntoIterator<Item = &'a RecentSample> + 'a,
182{
183 samples.into_iter().filter_map(|sample| match sample {
184 RecentSample::Reply(value) => Some(*value),
185 RecentSample::Timeout => None,
186 })
187}
188
189fn integer_stats_from_values(values: &[u32]) -> IntegerStats {
190 if values.is_empty() {
191 return IntegerStats {
192 count: 0,
193 min_ms: 0,
194 avg_ms: 0,
195 max_ms: 0,
196 stddev_ms: 0,
197 };
198 }
199
200 let mut min = u32::MAX;
201 let mut max = 0u32;
202 let mut sum = 0u64;
203 for value in values {
204 min = min.min(*value);
205 max = max.max(*value);
206 sum = sum.saturating_add(u64::from(*value));
207 }
208
209 let count = u64::try_from(values.len()).unwrap_or(u64::MAX);
210 let avg = div_u64_to_u32(sum, count);
211 let stddev = standard_deviation_ms(values.iter().copied());
212
213 IntegerStats {
214 count,
215 min_ms: min,
216 avg_ms: avg,
217 max_ms: max,
218 stddev_ms: stddev,
219 }
220}
221
222fn loss_stats(lost: u64, total: u64) -> LossStats {
223 let percent = if total == 0 {
224 0
225 } else {
226 div_u64_to_u32(lost.saturating_mul(100), total)
227 };
228 LossStats {
229 lost,
230 total,
231 percent,
232 }
233}
234
235fn standard_deviation_ms<I>(values: I) -> u32
236where
237 I: IntoIterator<Item = u32>,
238{
239 let mut n: u64 = 0;
242 let mut mean: f64 = 0.0;
243 let mut m2: f64 = 0.0;
244
245 for value in values {
246 n += 1;
247 let x = f64::from(value);
248 let delta = x - mean;
249 mean += delta / n as f64;
250 let delta2 = x - mean;
251 m2 += delta * delta2;
252 }
253
254 if n == 0 {
255 return 0;
256 }
257
258 let variance = m2 / n as f64;
259 let stddev = variance.sqrt();
260
261 if !stddev.is_finite() {
262 return 0;
263 }
264
265 u32::try_from(stddev.round() as u64).unwrap_or(u32::MAX)
266}
267
268fn div_u64_to_u32(numerator: u64, denominator: u64) -> u32 {
269 if denominator == 0 {
270 return 0;
271 }
272 let quotient = numerator / denominator;
273 u32::try_from(quotient).unwrap_or(u32::MAX)
274}
275
276#[cfg(test)]
277mod tests {
278 use std::time::Duration;
279
280 use crate::app::AppEvent;
281
282 use super::Stats;
283
284 #[test]
285 fn tracks_integer_stats_with_recent_window() {
286 let mut stats = Stats::new(3);
287
288 stats.apply(&AppEvent::ProbeSent {
289 seq: 1,
290 at: Duration::ZERO,
291 });
292
293 stats.apply(&AppEvent::ProbeReply {
294 seq: 1,
295 sent_at: Duration::ZERO,
296 received_at: Duration::from_millis(9),
297 rtt_ms: 9,
298 duplicate: false,
299 late: false,
300 });
301 stats.apply(&AppEvent::ProbeSent {
302 seq: 2,
303 at: Duration::ZERO,
304 });
305 stats.apply(&AppEvent::ProbeReply {
306 seq: 2,
307 sent_at: Duration::ZERO,
308 received_at: Duration::from_millis(20),
309 rtt_ms: 20,
310 duplicate: false,
311 late: false,
312 });
313 stats.apply(&AppEvent::ProbeSent {
314 seq: 3,
315 at: Duration::ZERO,
316 });
317 stats.apply(&AppEvent::ProbeTimeout {
318 seq: 3,
319 sent_at: Duration::ZERO,
320 deadline: Duration::from_millis(100),
321 });
322 stats.apply(&AppEvent::ProbeSent {
323 seq: 4,
324 at: Duration::ZERO,
325 });
326 stats.apply(&AppEvent::ProbeReply {
327 seq: 4,
328 sent_at: Duration::ZERO,
329 received_at: Duration::from_millis(51),
330 rtt_ms: 51,
331 duplicate: false,
332 late: false,
333 });
334
335 let global = stats.global_snapshot();
336 assert_eq!(global.loss.lost, 1);
337 assert_eq!(global.loss.total, 4);
338 assert_eq!(global.loss.percent, 25);
339 assert_eq!(global.rtt.count, 3);
340 assert_eq!(global.rtt.min_ms, 9);
341 assert_eq!(global.rtt.avg_ms, 26);
342 assert_eq!(global.rtt.max_ms, 51);
343 assert_eq!(global.last_rtt_ms, 51);
344
345 let recent = stats.recent_snapshot();
346 assert_eq!(recent.loss.lost, 1);
347 assert_eq!(recent.loss.total, 3);
348 assert_eq!(recent.loss.percent, 33);
349 assert_eq!(recent.rtt.count, 2);
350 assert_eq!(recent.rtt.min_ms, 20);
351 assert_eq!(recent.rtt.avg_ms, 35);
352 assert_eq!(recent.rtt.max_ms, 51);
353 }
354
355 #[test]
356 fn duplicate_replies_do_not_skew_stats() {
357 let mut stats = Stats::new(5);
358
359 stats.apply(&AppEvent::ProbeSent {
360 seq: 1,
361 at: Duration::ZERO,
362 });
363
364 stats.apply(&AppEvent::ProbeReply {
365 seq: 1,
366 sent_at: Duration::ZERO,
367 received_at: Duration::from_millis(10),
368 rtt_ms: 10,
369 duplicate: false,
370 late: false,
371 });
372
373 stats.apply(&AppEvent::ProbeReply {
374 seq: 1,
375 sent_at: Duration::ZERO,
376 received_at: Duration::from_millis(12),
377 rtt_ms: 12,
378 duplicate: true,
379 late: false,
380 });
381
382 let global = stats.global_snapshot();
383 assert_eq!(global.loss.total, 1);
384 assert_eq!(global.loss.lost, 0);
385 assert_eq!(global.loss.percent, 0);
386 assert_eq!(global.rtt.count, 1);
387 assert_eq!(global.rtt.avg_ms, 10);
388 assert_eq!(global.last_rtt_ms, 10);
389 }
390
391 #[test]
392 fn late_replies_do_not_change_global_loss_denominator() {
393 let mut stats = Stats::new(5);
394
395 stats.apply(&AppEvent::ProbeSent {
396 seq: 1,
397 at: Duration::ZERO,
398 });
399 stats.apply(&AppEvent::ProbeTimeout {
400 seq: 1,
401 sent_at: Duration::ZERO,
402 deadline: Duration::from_millis(1_000),
403 });
404 stats.apply(&AppEvent::ProbeReply {
405 seq: 1,
406 sent_at: Duration::ZERO,
407 received_at: Duration::from_millis(1_500),
408 rtt_ms: 1_500,
409 duplicate: false,
410 late: true,
411 });
412
413 let global = stats.global_snapshot();
414 assert_eq!(global.loss.lost, 1);
415 assert_eq!(global.loss.total, 1);
416 assert_eq!(global.loss.percent, 100);
417 assert_eq!(global.rtt.count, 0);
418 }
419}