1#![forbid(unsafe_code)]
4
5use std::{
6 collections::{HashMap, HashSet, VecDeque},
7 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
8};
9
10use irtt_client::{
11 ClientEvent, ClientTimestamp, OneWayDelaySample, ReceivedStatsSample, RttSample, ServerTiming,
12 SignedDuration,
13};
14
15const CONTINUOUS_SEQUENCE_LIMIT: usize = 4096;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct StatsConfig {
19 pub median: MedianMode,
20 pub rolling_count: Option<usize>,
21 pub rolling_time: Option<Duration>,
22}
23
24impl StatsConfig {
25 pub fn finite() -> Self {
26 Self {
27 median: MedianMode::ExactFinite,
28 rolling_count: None,
29 rolling_time: None,
30 }
31 }
32
33 pub fn continuous() -> Self {
38 Self {
39 median: MedianMode::Disabled,
40 rolling_count: None,
41 rolling_time: None,
42 }
43 }
44}
45
46impl Default for StatsConfig {
47 fn default() -> Self {
48 Self::finite()
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum MedianMode {
54 Disabled,
55 ExactFinite,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct StatsCollector {
60 config: StatsConfig,
61 cumulative: CoreStats,
62 rolling_count: Option<VecDeque<WindowEvent>>,
63 rolling_time: Option<VecDeque<WindowEvent>>,
64}
65
66impl StatsCollector {
67 pub fn new(config: StatsConfig) -> Self {
68 Self {
69 config,
70 cumulative: CoreStats::new(config.median),
71 rolling_count: config.rolling_count.map(|_| VecDeque::new()),
72 rolling_time: config.rolling_time.map(|_| VecDeque::new()),
73 }
74 }
75
76 pub fn process(&mut self, event: &ClientEvent) -> EventStatsUpdate {
77 self.process_with_update(event)
78 }
79
80 pub fn process_with_update(&mut self, event: &ClientEvent) -> EventStatsUpdate {
81 let Some(window_event) = WindowEvent::from_client_event(event) else {
82 return EventStatsUpdate::default();
83 };
84
85 let update = self.cumulative.apply(window_event.clone());
86
87 if let (Some(limit), Some(window)) =
88 (self.config.rolling_count, self.rolling_count.as_mut())
89 {
90 window.push_back(window_event.clone());
91 while window.len() > limit {
92 window.pop_front();
93 }
94 }
95
96 if let (Some(duration), Some(window)) =
97 (self.config.rolling_time, self.rolling_time.as_mut())
98 {
99 let cutoff = window_event.at().checked_sub(duration);
100 window.push_back(window_event);
101 if let Some(cutoff) = cutoff {
102 while window.front().is_some_and(|event| event.at() < cutoff) {
103 window.pop_front();
104 }
105 }
106 }
107
108 update
109 }
110
111 pub fn cumulative(&self) -> CumulativeSnapshot {
112 self.cumulative.snapshot()
113 }
114
115 pub fn rolling_count(&self) -> Option<RollingSnapshot> {
116 self.rolling_count.as_ref().map(snapshot_window)
117 }
118
119 pub fn rolling_time(&self) -> Option<RollingSnapshot> {
120 self.rolling_time.as_ref().map(snapshot_window)
121 }
122
123 pub fn summary(&self) -> FiniteSummary {
124 self.cumulative()
125 }
126
127 #[cfg(test)]
128 fn retained_median_samples(&self) -> usize {
129 self.cumulative.retained_median_samples()
130 }
131
132 #[cfg(test)]
133 fn retained_sequence_samples(&self) -> usize {
134 self.cumulative.retained_sequence_samples()
135 }
136}
137
138pub type RollingSnapshot = CumulativeSnapshot;
139pub type FiniteSummary = CumulativeSnapshot;
140
141#[derive(Debug, Clone, PartialEq, Eq, Default)]
142pub struct EventStatsUpdate {
143 pub contributed_sample: bool,
144 pub ipdv_pairs: Vec<IpdvPairUpdate>,
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub struct IpdvPairUpdate {
149 pub previous_logical_seq: u64,
150 pub current_logical_seq: u64,
151 pub rtt_ipdv: Duration,
152 pub send_ipdv: Option<Duration>,
153 pub receive_ipdv: Option<Duration>,
154}
155
156#[derive(Debug, Clone, PartialEq)]
157pub struct CumulativeSnapshot {
158 pub events: EventCounts,
159 pub packets: PacketCounts,
160 pub loss: LossStats,
161 pub send_call: DurationStats,
162 pub timer_error: DurationStats,
163 pub rtt: RttStats,
164 pub ipdv: IpdvStats,
165 pub one_way_delay: OneWayDelayStats,
166 pub server_processing: ServerProcessingStats,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
170pub struct EventCounts {
171 pub sent_events: u64,
172 pub echo_replies: u64,
173 pub late_unique_replies: u64,
174 pub duplicate_replies: u64,
175 pub loss_events: u64,
176 pub warning_events: u64,
177 pub untracked_late_replies: u64,
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
181pub struct PacketCounts {
182 pub packets_sent: u64,
183 pub packets_received: u64,
184 pub unique_replies: u64,
185 pub duplicates: u64,
186 pub late_packets: u64,
187 pub bytes_sent: u64,
188 pub bytes_received: u64,
189 pub server_packets_received: Option<u64>,
190 pub server_received_window: Option<u64>,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq)]
194pub struct LossStats {
195 pub lost_packets: u64,
196 pub unknown_loss_packets: u64,
197 pub upstream_loss_packets: Option<i128>,
198 pub downstream_loss_packets: Option<i128>,
199 pub packet_loss_percent: f64,
200 pub upstream_loss_percent: f64,
201 pub downstream_loss_percent: f64,
202 pub duplicate_percent: f64,
203 pub late_packets_percent: f64,
204}
205
206#[derive(Debug, Clone, PartialEq)]
207pub struct RttStats {
208 pub primary: SignedDurationStatsWithMedian,
209 pub raw: DurationStatsWithMedian,
210 pub adjusted: SignedDurationStatsWithMedian,
211}
212
213#[derive(Debug, Clone, PartialEq)]
214pub struct IpdvStats {
215 pub round_trip: DurationStatsWithMedian,
216 pub send: DurationStatsWithMedian,
217 pub receive: DurationStatsWithMedian,
218}
219
220#[derive(Debug, Clone, PartialEq)]
221pub struct OneWayDelayStats {
222 pub send_delay: DurationStatsWithMedian,
223 pub receive_delay: DurationStatsWithMedian,
224}
225
226#[derive(Debug, Clone, PartialEq)]
227pub struct ServerProcessingStats {
228 pub processing: DurationStats,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq)]
232pub struct DurationStats {
233 pub count: u64,
234 pub total_ns: u128,
235 pub min_ns: Option<u64>,
236 pub max_ns: Option<u64>,
237 pub mean_ns: f64,
238 pub variance_ns2: f64,
239}
240
241impl DurationStats {
242 pub fn stddev_ns(&self) -> f64 {
243 self.variance_ns2.sqrt()
244 }
245}
246
247#[derive(Debug, Clone, Copy, PartialEq)]
248pub struct DurationStatsWithMedian {
249 pub stats: DurationStats,
250 pub median_ns: Option<f64>,
251}
252
253impl DurationStatsWithMedian {
254 pub fn stddev_ns(&self) -> f64 {
255 self.stats.stddev_ns()
256 }
257}
258
259#[derive(Debug, Clone, Copy, PartialEq)]
260pub struct SignedDurationStats {
261 pub count: u64,
262 pub total_ns: i128,
263 pub min_ns: Option<i128>,
264 pub max_ns: Option<i128>,
265 pub mean_ns: f64,
266 pub variance_ns2: f64,
267}
268
269impl SignedDurationStats {
270 pub fn stddev_ns(&self) -> f64 {
271 self.variance_ns2.sqrt()
272 }
273}
274
275#[derive(Debug, Clone, Copy, PartialEq)]
276pub struct SignedDurationStatsWithMedian {
277 pub stats: SignedDurationStats,
278 pub median_ns: Option<f64>,
279}
280
281impl SignedDurationStatsWithMedian {
282 pub fn stddev_ns(&self) -> f64 {
283 self.stats.stddev_ns()
284 }
285}
286
287#[derive(Debug, Clone, PartialEq)]
288struct CoreStats {
289 median: MedianMode,
290 sequence_limit: Option<usize>,
291 events: EventCounts,
292 packets: PacketCounts,
293 send_call: MetricU64,
294 timer_error: MetricU64,
295 rtt_primary: MetricI128,
296 rtt_raw: MetricU64,
297 rtt_adjusted: MetricI128,
298 ipdv_round_trip: MetricU64,
299 ipdv_send: MetricU64,
300 ipdv_receive: MetricU64,
301 send_delay: MetricU64,
302 receive_delay: MetricU64,
303 server_processing: MetricU64,
304 samples: HashMap<u64, UniqueSample>,
305 sample_order: VecDeque<u64>,
306 ipdv_pairs: HashSet<u64>,
307}
308
309impl CoreStats {
310 fn new(median: MedianMode) -> Self {
311 Self {
312 median,
313 sequence_limit: if median == MedianMode::ExactFinite {
314 None
315 } else {
316 Some(CONTINUOUS_SEQUENCE_LIMIT)
317 },
318 events: EventCounts::default(),
319 packets: PacketCounts::default(),
320 send_call: MetricU64::new(false),
321 timer_error: MetricU64::new(false),
322 rtt_primary: MetricI128::new(median == MedianMode::ExactFinite),
323 rtt_raw: MetricU64::new(median == MedianMode::ExactFinite),
324 rtt_adjusted: MetricI128::new(median == MedianMode::ExactFinite),
325 ipdv_round_trip: MetricU64::new(median == MedianMode::ExactFinite),
326 ipdv_send: MetricU64::new(median == MedianMode::ExactFinite),
327 ipdv_receive: MetricU64::new(median == MedianMode::ExactFinite),
328 send_delay: MetricU64::new(median == MedianMode::ExactFinite),
329 receive_delay: MetricU64::new(median == MedianMode::ExactFinite),
330 server_processing: MetricU64::new(false),
331 samples: HashMap::new(),
332 sample_order: VecDeque::new(),
333 ipdv_pairs: HashSet::new(),
334 }
335 }
336
337 fn apply(&mut self, event: WindowEvent) -> EventStatsUpdate {
338 let mut update = EventStatsUpdate::default();
339 match event {
340 WindowEvent::Sent {
341 bytes,
342 send_call_ns,
343 timer_error_ns,
344 ..
345 } => {
346 self.events.sent_events += 1;
347 self.packets.packets_sent += 1;
348 self.packets.bytes_sent = self.packets.bytes_sent.saturating_add(bytes as u64);
349 self.send_call.push(send_call_ns);
350 self.timer_error.push(timer_error_ns);
351 }
352 WindowEvent::UniqueReply {
353 is_late, sample, ..
354 } => {
355 let sample = *sample;
356 self.events.echo_replies += u64::from(!is_late);
357 self.events.late_unique_replies += u64::from(is_late);
358 self.packets.packets_received += 1;
359 self.packets.unique_replies += 1;
360 self.packets.late_packets += u64::from(is_late);
361 self.packets.bytes_received = self
362 .packets
363 .bytes_received
364 .saturating_add(sample.bytes as u64);
365 update.contributed_sample = true;
366 if let Some(count) = sample.received_count {
367 self.packets.server_packets_received = Some(u64::from(count));
368 }
369 if let Some(window) = sample.received_window {
370 self.packets.server_received_window = Some(window);
371 }
372
373 self.rtt_primary.push(sample.rtt_primary_ns);
374 self.rtt_raw.push(sample.rtt_raw_ns);
375 if let Some(adjusted) = sample.rtt_adjusted_ns {
376 self.rtt_adjusted.push(adjusted);
377 }
378 if let Some(processing) = sample.server_processing_ns {
379 self.server_processing.push(processing);
380 }
381 if let Some(delay) = sample.send_delay_ns {
382 self.send_delay.push(delay);
383 }
384 if let Some(delay) = sample.receive_delay_ns {
385 self.receive_delay.push(delay);
386 }
387
388 let seq = sample.logical_seq;
389 if self.samples.insert(seq, sample).is_none() {
390 self.sample_order.push_back(seq);
391 self.enforce_sequence_limit();
392 if let Some(pair) = self.try_ipdv_pair(seq) {
393 update.ipdv_pairs.push(pair);
394 }
395 if let Some(next) = seq.checked_add(1) {
396 if let Some(pair) = self.try_ipdv_pair(next) {
397 update.ipdv_pairs.push(pair);
398 }
399 }
400 }
401 }
402 WindowEvent::DuplicateReply { .. } => {
403 self.events.duplicate_replies += 1;
404 self.packets.packets_received += 1;
405 self.packets.duplicates += 1;
406 }
407 WindowEvent::Loss { .. } => {
408 self.events.loss_events += 1;
409 }
410 WindowEvent::Warning { .. } => {
411 self.events.warning_events += 1;
412 }
413 WindowEvent::UntrackedLate { .. } => {
414 self.events.untracked_late_replies += 1;
415 }
416 }
417 update
418 }
419
420 fn enforce_sequence_limit(&mut self) {
421 let Some(limit) = self.sequence_limit else {
422 return;
423 };
424 while self.samples.len() > limit {
425 let Some(seq) = self.sample_order.pop_front() else {
426 break;
427 };
428 if self.samples.remove(&seq).is_some() {
429 self.ipdv_pairs.remove(&seq);
430 if let Some(next) = seq.checked_add(1) {
431 self.ipdv_pairs.remove(&next);
432 }
433 }
434 }
435 }
436
437 fn try_ipdv_pair(&mut self, current_seq: u64) -> Option<IpdvPairUpdate> {
438 let previous_seq = current_seq.checked_sub(1)?;
439
440 if !self.ipdv_pairs.insert(current_seq) {
441 return None;
442 }
443
444 let Some(previous) = self.samples.get(&previous_seq) else {
445 self.ipdv_pairs.remove(¤t_seq);
446 return None;
447 };
448
449 let Some(current) = self.samples.get(¤t_seq) else {
450 self.ipdv_pairs.remove(¤t_seq);
451 return None;
452 };
453
454 let rtt_ipdv = abs_i128_to_u64(current.rtt_primary_ns - previous.rtt_primary_ns);
457 let send_ipdv = send_ipdv_ns(previous, current).map(abs_i128_to_u64);
458 let receive_ipdv = receive_ipdv_ns(previous, current).map(abs_i128_to_u64);
459
460 self.ipdv_round_trip.push(rtt_ipdv);
461
462 if let Some(value) = send_ipdv {
463 self.ipdv_send.push(value);
464 }
465
466 if let Some(value) = receive_ipdv {
467 self.ipdv_receive.push(value);
468 }
469
470 Some(IpdvPairUpdate {
471 previous_logical_seq: previous_seq,
472 current_logical_seq: current_seq,
473 rtt_ipdv: Duration::from_nanos(rtt_ipdv),
474 send_ipdv: send_ipdv.map(Duration::from_nanos),
475 receive_ipdv: receive_ipdv.map(Duration::from_nanos),
476 })
477 }
478
479 fn snapshot(&self) -> CumulativeSnapshot {
480 let packets = self.packets;
481 CumulativeSnapshot {
482 events: self.events,
483 packets,
484 loss: loss_stats(packets),
485 send_call: self.send_call.stats(),
486 timer_error: self.timer_error.stats(),
487 rtt: RttStats {
488 primary: self.rtt_primary.stats_with_median(),
489 raw: self.rtt_raw.stats_with_median(),
490 adjusted: self.rtt_adjusted.stats_with_median(),
491 },
492 ipdv: IpdvStats {
493 round_trip: self.ipdv_round_trip.stats_with_median(),
494 send: self.ipdv_send.stats_with_median(),
495 receive: self.ipdv_receive.stats_with_median(),
496 },
497 one_way_delay: OneWayDelayStats {
498 send_delay: self.send_delay.stats_with_median(),
499 receive_delay: self.receive_delay.stats_with_median(),
500 },
501 server_processing: ServerProcessingStats {
502 processing: self.server_processing.stats(),
503 },
504 }
505 }
506
507 #[cfg(test)]
508 fn retained_median_samples(&self) -> usize {
509 self.rtt_primary.retained_samples()
510 + self.rtt_raw.retained_samples()
511 + self.rtt_adjusted.retained_samples()
512 + self.ipdv_round_trip.retained_samples()
513 + self.ipdv_send.retained_samples()
514 + self.ipdv_receive.retained_samples()
515 + self.send_delay.retained_samples()
516 + self.receive_delay.retained_samples()
517 }
518
519 #[cfg(test)]
520 fn retained_sequence_samples(&self) -> usize {
521 self.samples.len()
522 }
523}
524
525#[derive(Debug, Clone, PartialEq)]
526enum WindowEvent {
527 Sent {
528 at: Instant,
529 bytes: usize,
530 send_call_ns: u64,
531 timer_error_ns: u64,
532 },
533 UniqueReply {
534 at: Instant,
535 is_late: bool,
536 sample: Box<UniqueSample>,
537 },
538 DuplicateReply {
539 at: Instant,
540 },
541 Loss {
542 at: Instant,
543 },
544 Warning {
545 at: Instant,
546 },
547 UntrackedLate {
548 at: Instant,
549 },
550}
551
552impl WindowEvent {
553 fn from_client_event(event: &ClientEvent) -> Option<Self> {
554 match event {
555 ClientEvent::EchoSent {
556 sent_at,
557 bytes,
558 send_call,
559 timer_error,
560 ..
561 } => Some(Self::Sent {
562 at: sent_at.mono,
563 bytes: *bytes,
564 send_call_ns: duration_ns_u64(*send_call),
565 timer_error_ns: duration_ns_u64(*timer_error),
566 }),
567 ClientEvent::EchoReply {
568 logical_seq,
569 sent_at,
570 received_at,
571 rtt,
572 server_timing,
573 one_way,
574 received_stats,
575 bytes,
576 ..
577 } => Some(Self::UniqueReply {
578 at: received_at.mono,
579 is_late: false,
580 sample: Box::new(UniqueSample::new(
581 *logical_seq,
582 *sent_at,
583 *received_at,
584 *rtt,
585 *server_timing,
586 *one_way,
587 *received_stats,
588 *bytes,
589 )),
590 }),
591 ClientEvent::LateReply {
592 logical_seq: Some(logical_seq),
593 sent_at: Some(sent_at),
594 received_at,
595 rtt: Some(rtt),
596 server_timing,
597 one_way,
598 received_stats,
599 bytes,
600 ..
601 } => Some(Self::UniqueReply {
602 at: received_at.mono,
603 is_late: true,
604 sample: Box::new(UniqueSample::new(
605 *logical_seq,
606 *sent_at,
607 *received_at,
608 *rtt,
609 *server_timing,
610 *one_way,
611 *received_stats,
612 *bytes,
613 )),
614 }),
615 ClientEvent::LateReply { received_at, .. } => Some(Self::UntrackedLate {
616 at: received_at.mono,
617 }),
618 ClientEvent::DuplicateReply { received_at, .. } => Some(Self::DuplicateReply {
619 at: received_at.mono,
620 }),
621 ClientEvent::EchoLoss { timeout_at, .. } => Some(Self::Loss { at: *timeout_at }),
622 ClientEvent::Warning { .. } => Some(Self::Warning { at: Instant::now() }),
623 ClientEvent::SessionStarted { .. }
624 | ClientEvent::NoTestCompleted { .. }
625 | ClientEvent::SessionClosed { .. } => None,
626 }
627 }
628
629 fn at(&self) -> Instant {
630 match self {
631 Self::Sent { at, .. }
632 | Self::UniqueReply { at, .. }
633 | Self::DuplicateReply { at }
634 | Self::Loss { at }
635 | Self::Warning { at }
636 | Self::UntrackedLate { at } => *at,
637 }
638 }
639}
640
641#[derive(Debug, Clone, PartialEq)]
642struct UniqueSample {
643 logical_seq: u64,
644 bytes: usize,
645 rtt_primary_ns: i128,
646 rtt_raw_ns: u64,
647 rtt_adjusted_ns: Option<i128>,
648 send_delay_ns: Option<u64>,
649 receive_delay_ns: Option<u64>,
650 server_processing_ns: Option<u64>,
651 received_count: Option<u32>,
652 received_window: Option<u64>,
653 client_send_mono: Instant,
654 client_receive_mono: Instant,
655 client_send_wall_ns: Option<i128>,
656 client_receive_wall_ns: Option<i128>,
657 server_receive_mono_ns: Option<i64>,
658 server_send_mono_ns: Option<i64>,
659 server_receive_wall_ns: Option<i64>,
660 server_send_wall_ns: Option<i64>,
661}
662
663impl UniqueSample {
664 #[allow(clippy::too_many_arguments)]
665 fn new(
666 logical_seq: u64,
667 sent_at: ClientTimestamp,
668 received_at: ClientTimestamp,
669 rtt: RttSample,
670 server_timing: Option<ServerTiming>,
671 one_way: Option<OneWayDelaySample>,
672 received_stats: Option<ReceivedStatsSample>,
673 bytes: usize,
674 ) -> Self {
675 Self {
676 logical_seq,
677 bytes,
678 rtt_primary_ns: signed_duration_ns(rtt.effective_signed),
679 rtt_raw_ns: duration_ns_u64(rtt.raw),
680 rtt_adjusted_ns: rtt.adjusted_signed.map(signed_duration_ns),
681 send_delay_ns: one_way
682 .and_then(|sample| sample.client_to_server)
683 .map(duration_ns_u64),
684 receive_delay_ns: one_way
685 .and_then(|sample| sample.server_to_client)
686 .map(duration_ns_u64),
687 server_processing_ns: server_timing
688 .and_then(|timing| timing.processing)
689 .map(duration_ns_u64),
690 received_count: received_stats.and_then(|stats| stats.count),
691 received_window: received_stats.and_then(|stats| stats.window),
692 client_send_mono: sent_at.mono,
693 client_receive_mono: received_at.mono,
694 client_send_wall_ns: system_time_ns(sent_at.wall),
695 client_receive_wall_ns: system_time_ns(received_at.wall),
696 server_receive_mono_ns: server_timing.and_then(|timing| timing.receive_mono_ns),
697 server_send_mono_ns: server_timing.and_then(|timing| timing.send_mono_ns),
698 server_receive_wall_ns: server_timing.and_then(|timing| timing.receive_wall_ns),
699 server_send_wall_ns: server_timing.and_then(|timing| timing.send_wall_ns),
700 }
701 }
702}
703
704#[derive(Debug, Clone, PartialEq)]
705struct MetricU64 {
706 running: RunningU64,
707 samples: Option<Vec<u64>>,
708}
709
710impl MetricU64 {
711 fn new(retain_samples: bool) -> Self {
712 Self {
713 running: RunningU64::default(),
714 samples: retain_samples.then(Vec::new),
715 }
716 }
717
718 fn push(&mut self, value: u64) {
719 self.running.push(value);
720 if let Some(samples) = self.samples.as_mut() {
721 samples.push(value);
722 }
723 }
724
725 fn stats(&self) -> DurationStats {
726 self.running.stats()
727 }
728
729 fn stats_with_median(&self) -> DurationStatsWithMedian {
730 DurationStatsWithMedian {
731 stats: self.stats(),
732 median_ns: self
733 .samples
734 .as_ref()
735 .and_then(|samples| median_u64(samples)),
736 }
737 }
738
739 #[cfg(test)]
740 fn retained_samples(&self) -> usize {
741 self.samples.as_ref().map_or(0, Vec::len)
742 }
743}
744
745#[derive(Debug, Clone, PartialEq)]
746struct MetricI128 {
747 running: RunningI128,
748 samples: Option<Vec<i128>>,
749}
750
751impl MetricI128 {
752 fn new(retain_samples: bool) -> Self {
753 Self {
754 running: RunningI128::default(),
755 samples: retain_samples.then(Vec::new),
756 }
757 }
758
759 fn push(&mut self, value: i128) {
760 self.running.push(value);
761 if let Some(samples) = self.samples.as_mut() {
762 samples.push(value);
763 }
764 }
765
766 fn stats(&self) -> SignedDurationStats {
767 self.running.stats()
768 }
769
770 fn stats_with_median(&self) -> SignedDurationStatsWithMedian {
771 SignedDurationStatsWithMedian {
772 stats: self.stats(),
773 median_ns: self
774 .samples
775 .as_ref()
776 .and_then(|samples| median_i128(samples)),
777 }
778 }
779
780 #[cfg(test)]
781 fn retained_samples(&self) -> usize {
782 self.samples.as_ref().map_or(0, Vec::len)
783 }
784}
785
786#[derive(Debug, Clone, PartialEq, Default)]
787struct RunningU64 {
788 count: u64,
789 total: u128,
790 min: Option<u64>,
791 max: Option<u64>,
792 mean: f64,
793 m2: f64,
794}
795
796impl RunningU64 {
797 fn push(&mut self, value: u64) {
798 self.count += 1;
799 self.total = self.total.saturating_add(u128::from(value));
800 self.min = Some(self.min.map_or(value, |min| min.min(value)));
801 self.max = Some(self.max.map_or(value, |max| max.max(value)));
802 let x = value as f64;
803 let delta = x - self.mean;
804 self.mean += delta / self.count as f64;
805 let delta2 = x - self.mean;
806 self.m2 += delta * delta2;
807 }
808
809 fn stats(&self) -> DurationStats {
810 DurationStats {
811 count: self.count,
812 total_ns: self.total,
813 min_ns: self.min,
814 max_ns: self.max,
815 mean_ns: if self.count == 0 { 0.0 } else { self.mean },
816 variance_ns2: sample_variance(self.count, self.m2),
817 }
818 }
819}
820
821#[derive(Debug, Clone, PartialEq, Default)]
822struct RunningI128 {
823 count: u64,
824 total: i128,
825 min: Option<i128>,
826 max: Option<i128>,
827 mean: f64,
828 m2: f64,
829}
830
831impl RunningI128 {
832 fn push(&mut self, value: i128) {
833 self.count += 1;
834 self.total = self.total.saturating_add(value);
835 self.min = Some(self.min.map_or(value, |min| min.min(value)));
836 self.max = Some(self.max.map_or(value, |max| max.max(value)));
837 let x = value as f64;
838 let delta = x - self.mean;
839 self.mean += delta / self.count as f64;
840 let delta2 = x - self.mean;
841 self.m2 += delta * delta2;
842 }
843
844 fn stats(&self) -> SignedDurationStats {
845 SignedDurationStats {
846 count: self.count,
847 total_ns: self.total,
848 min_ns: self.min,
849 max_ns: self.max,
850 mean_ns: if self.count == 0 { 0.0 } else { self.mean },
851 variance_ns2: sample_variance(self.count, self.m2),
852 }
853 }
854}
855
856fn snapshot_window(events: &VecDeque<WindowEvent>) -> CumulativeSnapshot {
857 let mut core = CoreStats::new(MedianMode::Disabled);
858 for event in events {
859 core.apply(event.clone());
860 }
861 core.snapshot()
862}
863
864fn sample_variance(count: u64, m2: f64) -> f64 {
865 if count < 2 {
866 0.0
867 } else {
868 m2 / (count - 1) as f64
869 }
870}
871
872fn median_u64(samples: &[u64]) -> Option<f64> {
873 if samples.is_empty() {
874 return None;
875 }
876 let mut sorted = samples.to_vec();
877 sorted.sort_unstable();
878 Some(median_sorted_u64(&sorted))
879}
880
881fn median_sorted_u64(sorted: &[u64]) -> f64 {
882 let mid = sorted.len() / 2;
883 if sorted.len() % 2 == 1 {
884 sorted[mid] as f64
885 } else {
886 (sorted[mid - 1] as f64 + sorted[mid] as f64) / 2.0
887 }
888}
889
890fn median_i128(samples: &[i128]) -> Option<f64> {
891 if samples.is_empty() {
892 return None;
893 }
894 let mut sorted = samples.to_vec();
895 sorted.sort_unstable();
896 let mid = sorted.len() / 2;
897 Some(if sorted.len() % 2 == 1 {
898 sorted[mid] as f64
899 } else {
900 (sorted[mid - 1] as f64 + sorted[mid] as f64) / 2.0
901 })
902}
903
904fn loss_stats(packets: PacketCounts) -> LossStats {
905 let lost = packets.packets_sent.saturating_sub(packets.unique_replies);
906 let packet_loss_percent = if packets.packets_sent == 0 {
907 0.0
908 } else if packets.unique_replies == 0 {
909 100.0
910 } else {
911 percent(lost as f64, packets.packets_sent as f64)
912 };
913
914 let (
915 upstream_loss_packets,
916 upstream_loss_percent,
917 downstream_loss_packets,
918 downstream_loss_percent,
919 ) = if let Some(server_received) = packets.server_packets_received {
920 let upstream = i128::from(packets.packets_sent) - i128::from(server_received);
921 let downstream = i128::from(server_received) - i128::from(packets.packets_received);
922 let upstream_percent = if packets.packets_sent == 0 {
923 0.0
924 } else {
925 percent(upstream as f64, packets.packets_sent as f64)
926 };
927 let downstream_percent = if server_received == 0 {
928 0.0
929 } else {
930 percent(downstream as f64, server_received as f64)
931 };
932 (
933 Some(upstream),
934 upstream_percent,
935 Some(downstream),
936 downstream_percent,
937 )
938 } else {
939 (None, 0.0, None, 0.0)
940 };
941
942 LossStats {
943 lost_packets: lost,
944 unknown_loss_packets: lost,
945 upstream_loss_packets,
946 downstream_loss_packets,
947 packet_loss_percent,
948 upstream_loss_percent,
949 downstream_loss_percent,
950 duplicate_percent: if packets.packets_received == 0 {
951 0.0
952 } else {
953 percent(packets.duplicates as f64, packets.packets_received as f64)
954 },
955 late_packets_percent: if packets.packets_received == 0 {
956 0.0
957 } else {
958 percent(packets.late_packets as f64, packets.packets_received as f64)
959 },
960 }
961}
962
963fn percent(numerator: f64, denominator: f64) -> f64 {
964 100.0 * numerator / denominator
965}
966
967fn send_ipdv_ns(previous: &UniqueSample, current: &UniqueSample) -> Option<i128> {
968 if let (Some(prev_server), Some(cur_server)) = (
969 previous.server_receive_mono_ns,
970 current.server_receive_mono_ns,
971 ) {
972 return Some(
973 i128::from(cur_server)
974 - i128::from(prev_server)
975 - instant_diff_ns(current.client_send_mono, previous.client_send_mono),
976 );
977 }
978 if let (Some(prev_server), Some(cur_server), Some(prev_client), Some(cur_client)) = (
979 previous.server_receive_wall_ns,
980 current.server_receive_wall_ns,
981 previous.client_send_wall_ns,
982 current.client_send_wall_ns,
983 ) {
984 return Some(i128::from(cur_server) - i128::from(prev_server) - (cur_client - prev_client));
985 }
986 None
987}
988
989fn receive_ipdv_ns(previous: &UniqueSample, current: &UniqueSample) -> Option<i128> {
990 if let (Some(prev_server), Some(cur_server)) =
991 (previous.server_send_mono_ns, current.server_send_mono_ns)
992 {
993 return Some(
994 instant_diff_ns(current.client_receive_mono, previous.client_receive_mono)
995 - (i128::from(cur_server) - i128::from(prev_server)),
996 );
997 }
998 if let (Some(prev_server), Some(cur_server), Some(prev_client), Some(cur_client)) = (
999 previous.server_send_wall_ns,
1000 current.server_send_wall_ns,
1001 previous.client_receive_wall_ns,
1002 current.client_receive_wall_ns,
1003 ) {
1004 return Some(
1005 (cur_client - prev_client) - (i128::from(cur_server) - i128::from(prev_server)),
1006 );
1007 }
1008 None
1009}
1010
1011fn instant_diff_ns(current: Instant, previous: Instant) -> i128 {
1012 if let Some(diff) = current.checked_duration_since(previous) {
1013 duration_ns_i128(diff)
1014 } else {
1015 -duration_ns_i128(previous.duration_since(current))
1016 }
1017}
1018
1019fn system_time_ns(time: SystemTime) -> Option<i128> {
1020 if let Ok(duration) = time.duration_since(UNIX_EPOCH) {
1021 return Some(duration_ns_i128(duration));
1022 }
1023 UNIX_EPOCH
1024 .duration_since(time)
1025 .ok()
1026 .map(|duration| -duration_ns_i128(duration))
1027}
1028
1029fn duration_ns_u64(duration: Duration) -> u64 {
1030 u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
1031}
1032
1033fn duration_ns_i128(duration: Duration) -> i128 {
1034 i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX)
1035}
1036
1037fn signed_duration_ns(duration: SignedDuration) -> i128 {
1038 duration.ns
1039}
1040
1041fn abs_i128_to_u64(value: i128) -> u64 {
1042 u64::try_from(value.saturating_abs()).unwrap_or(u64::MAX)
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use super::*;
1048 use irtt_client::{
1049 ClientTimestamp, PacketMeta, ReceivedStatsSample, RttSample, ServerTiming, SignedDuration,
1050 };
1051 use std::time::SystemTime;
1052
1053 fn ts(ms: u64) -> ClientTimestamp {
1054 ClientTimestamp {
1055 mono: Instant::now() + Duration::from_millis(ms),
1056 wall: UNIX_EPOCH + Duration::from_millis(ms),
1057 }
1058 }
1059
1060 fn rtt(raw_ms: u64, effective_ms: i128) -> RttSample {
1061 RttSample {
1062 raw: Duration::from_millis(raw_ms),
1063 adjusted: u64::try_from(effective_ms).ok().map(Duration::from_millis),
1064 effective: u64::try_from(effective_ms)
1065 .ok()
1066 .map(Duration::from_millis)
1067 .unwrap_or_else(|| Duration::from_millis(raw_ms)),
1068 adjusted_signed: Some(SignedDuration {
1069 ns: effective_ms * 1_000_000,
1070 }),
1071 effective_signed: SignedDuration {
1072 ns: effective_ms * 1_000_000,
1073 },
1074 }
1075 }
1076
1077 fn sent(seq: u32, logical_seq: u64, sent_at: ClientTimestamp) -> ClientEvent {
1078 ClientEvent::EchoSent {
1079 seq,
1080 logical_seq,
1081 remote: "127.0.0.1:2112".parse().unwrap(),
1082 scheduled_at: sent_at.mono,
1083 sent_at,
1084 bytes: 32,
1085 send_call: Duration::from_micros(10),
1086 timer_error: Duration::from_micros(2),
1087 }
1088 }
1089
1090 fn reply(logical_seq: u64, raw_ms: u64, effective_ms: i128) -> ClientEvent {
1091 let sent_at = ts(logical_seq * 10);
1092 let received_at = ClientTimestamp {
1093 mono: sent_at.mono + Duration::from_millis(raw_ms),
1094 wall: sent_at.wall + Duration::from_millis(raw_ms),
1095 };
1096 ClientEvent::EchoReply {
1097 seq: logical_seq as u32,
1098 logical_seq,
1099 remote: "127.0.0.1:2112".parse().unwrap(),
1100 sent_at,
1101 received_at,
1102 rtt: rtt(raw_ms, effective_ms),
1103 server_timing: Some(ServerTiming {
1104 receive_wall_ns: Some(system_time_ns(sent_at.wall).unwrap() as i64 + 1_000_000),
1105 receive_mono_ns: Some(logical_seq as i64 * 10_000_000 + 1_000_000),
1106 send_wall_ns: Some(system_time_ns(sent_at.wall).unwrap() as i64 + 2_000_000),
1107 send_mono_ns: Some(logical_seq as i64 * 10_000_000 + 2_000_000),
1108 midpoint_wall_ns: None,
1109 midpoint_mono_ns: None,
1110 processing: Some(Duration::from_millis(1)),
1111 }),
1112 one_way: Some(OneWayDelaySample {
1113 client_to_server: Some(Duration::from_millis(1)),
1114 server_to_client: Some(Duration::from_millis(2)),
1115 }),
1116 received_stats: Some(ReceivedStatsSample {
1117 count: Some((logical_seq + 1) as u32),
1118 window: Some(0xff),
1119 }),
1120 bytes: 64,
1121 packet_meta: PacketMeta::default(),
1122 }
1123 }
1124
1125 fn late_reply(logical_seq: u64, raw_ms: u64, effective_ms: i128) -> ClientEvent {
1126 let ClientEvent::EchoReply {
1127 seq,
1128 remote,
1129 sent_at,
1130 received_at,
1131 rtt,
1132 server_timing,
1133 one_way,
1134 received_stats,
1135 bytes,
1136 packet_meta,
1137 ..
1138 } = reply(logical_seq, raw_ms, effective_ms)
1139 else {
1140 unreachable!();
1141 };
1142 ClientEvent::LateReply {
1143 seq,
1144 logical_seq: Some(logical_seq),
1145 highest_seen: seq + 1,
1146 remote,
1147 sent_at: Some(sent_at),
1148 received_at,
1149 rtt: Some(rtt),
1150 server_timing,
1151 one_way,
1152 received_stats,
1153 bytes,
1154 packet_meta,
1155 }
1156 }
1157
1158 #[test]
1159 fn running_duration_stats_use_sample_variance() {
1160 let mut metric = MetricU64::new(false);
1161 metric.push(1);
1162 metric.push(2);
1163 metric.push(3);
1164 let stats = metric.stats();
1165 assert_eq!(stats.count, 3);
1166 assert_eq!(stats.total_ns, 6);
1167 assert_eq!(stats.min_ns, Some(1));
1168 assert_eq!(stats.max_ns, Some(3));
1169 assert_eq!(stats.mean_ns, 2.0);
1170 assert_eq!(stats.variance_ns2, 1.0);
1171 assert_eq!(stats.stddev_ns(), 1.0);
1172 }
1173
1174 #[test]
1175 fn exact_median_handles_odd_and_even_samples() {
1176 assert_eq!(median_u64(&[3, 1, 2]), Some(2.0));
1177 assert_eq!(median_u64(&[4, 1, 2, 3]), Some(2.5));
1178 assert_eq!(median_i128(&[-5, 1, 3]), Some(1.0));
1179 assert_eq!(median_i128(&[-5, 1, 3, 7]), Some(2.0));
1180 }
1181
1182 #[test]
1183 fn disabled_median_avoids_finite_retention() {
1184 let mut collector = StatsCollector::new(StatsConfig::continuous());
1185 collector.process(&reply(0, 10, 9));
1186 collector.process(&reply(1, 20, 19));
1187 assert_eq!(collector.retained_median_samples(), 0);
1188 assert_eq!(collector.cumulative().rtt.primary.median_ns, None);
1189 }
1190
1191 #[test]
1192 fn continuous_mode_bounds_sequence_tracking() {
1193 let mut collector = StatsCollector::new(StatsConfig::continuous());
1194 for seq in 0..(CONTINUOUS_SEQUENCE_LIMIT as u64 + 8) {
1195 collector.process(&reply(seq, 10, 10));
1196 }
1197
1198 assert_eq!(collector.retained_median_samples(), 0);
1199 assert!(collector.retained_sequence_samples() <= CONTINUOUS_SEQUENCE_LIMIT);
1200 assert_eq!(
1201 collector.cumulative().rtt.primary.stats.count,
1202 CONTINUOUS_SEQUENCE_LIMIT as u64 + 8
1203 );
1204 }
1205
1206 #[test]
1207 fn cumulative_rtt_uses_signed_effective_and_tracks_raw() {
1208 let mut collector = StatsCollector::new(StatsConfig::finite());
1209 collector.process(&reply(0, 1, -2));
1210 collector.process(&reply(1, 10, 8));
1211
1212 let snapshot = collector.cumulative();
1213 assert_eq!(snapshot.rtt.primary.stats.count, 2);
1214 assert_eq!(snapshot.rtt.primary.stats.min_ns, Some(-2_000_000));
1215 assert_eq!(snapshot.rtt.primary.median_ns, Some(3_000_000.0));
1216 assert_eq!(snapshot.rtt.raw.stats.total_ns, 11_000_000);
1217 }
1218
1219 #[test]
1220 fn late_unique_counts_and_duplicates_do_not_update_measurements() {
1221 let mut collector = StatsCollector::new(StatsConfig::finite());
1222 collector.process(&sent(0, 0, ts(0)));
1223 collector.process(&sent(1, 1, ts(10)));
1224 collector.process(&reply(1, 10, 9));
1225 collector.process(&late_reply(0, 20, 19));
1226 collector.process(&ClientEvent::DuplicateReply {
1227 seq: 0,
1228 remote: "127.0.0.1:2112".parse().unwrap(),
1229 received_at: ts(50),
1230 bytes: 64,
1231 });
1232
1233 let snapshot = collector.cumulative();
1234 assert_eq!(snapshot.packets.packets_sent, 2);
1235 assert_eq!(snapshot.packets.packets_received, 3);
1236 assert_eq!(snapshot.packets.unique_replies, 2);
1237 assert_eq!(snapshot.packets.duplicates, 1);
1238 assert_eq!(snapshot.packets.late_packets, 1);
1239 assert_eq!(snapshot.rtt.primary.stats.count, 2);
1240 assert_eq!(snapshot.loss.lost_packets, 0);
1241 assert_eq!(snapshot.loss.duplicate_percent, 100.0 / 3.0);
1242 }
1243
1244 #[test]
1245 fn final_loss_uses_sent_minus_unique_replies_not_echo_loss_events() {
1246 let mut collector = StatsCollector::new(StatsConfig::finite());
1247 collector.process(&sent(0, 0, ts(0)));
1248 collector.process(&sent(1, 1, ts(10)));
1249 collector.process(&ClientEvent::EchoLoss {
1250 seq: 0,
1251 logical_seq: 0,
1252 sent_at: ts(0),
1253 timeout_at: Instant::now(),
1254 });
1255 collector.process(&late_reply(0, 10, 9));
1256
1257 let snapshot = collector.summary();
1258 assert_eq!(snapshot.events.loss_events, 1);
1259 assert_eq!(snapshot.packets.unique_replies, 1);
1260 assert_eq!(snapshot.loss.lost_packets, 1);
1261 assert_eq!(snapshot.loss.packet_loss_percent, 50.0);
1262 }
1263
1264 fn assert_no_ipdv_pairs(update: &EventStatsUpdate) {
1265 assert!(update.ipdv_pairs.is_empty(), "{update:?}");
1266 }
1267
1268 fn assert_one_ipdv_pair(
1269 update: &EventStatsUpdate,
1270 previous_logical_seq: u64,
1271 current_logical_seq: u64,
1272 rtt_ipdv: Duration,
1273 ) -> &IpdvPairUpdate {
1274 assert_eq!(update.ipdv_pairs.len(), 1, "{update:?}");
1275 let pair = &update.ipdv_pairs[0];
1276 assert_eq!(pair.previous_logical_seq, previous_logical_seq);
1277 assert_eq!(pair.current_logical_seq, current_logical_seq);
1278 assert_eq!(pair.rtt_ipdv, rtt_ipdv);
1279 pair
1280 }
1281
1282 #[test]
1283 fn ipdv_is_sequence_adjacent_and_gap_preserving() {
1284 let mut collector = StatsCollector::new(StatsConfig::finite());
1285 let first = collector.process(&reply(0, 10, 10));
1286 let gap = collector.process(&reply(2, 15, 15));
1287 let adjacent = collector.process(&reply(3, 12, 12));
1288
1289 let snapshot = collector.cumulative();
1290 assert!(first.contributed_sample);
1291 assert_no_ipdv_pairs(&first);
1292
1293 assert!(gap.contributed_sample);
1294 assert_no_ipdv_pairs(&gap);
1295
1296 assert!(adjacent.contributed_sample);
1297 assert_one_ipdv_pair(&adjacent, 2, 3, Duration::from_millis(3));
1298 assert_eq!(snapshot.ipdv.round_trip.stats.count, 1);
1299 assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 3_000_000);
1300 }
1301
1302 #[test]
1303 fn late_reply_can_complete_ipdv_pair() {
1304 let mut collector = StatsCollector::new(StatsConfig::finite());
1305 collector.process(&reply(1, 20, 20));
1306 let update = collector.process(&late_reply(0, 10, 10));
1307
1308 let snapshot = collector.cumulative();
1309
1310 assert!(update.contributed_sample);
1311 assert_one_ipdv_pair(&update, 0, 1, Duration::from_millis(10));
1312
1313 assert_eq!(snapshot.ipdv.round_trip.stats.count, 1);
1314 assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 10_000_000);
1315 }
1316
1317 #[test]
1318 fn update_exposes_directional_ipdv_when_available() {
1319 let mut collector = StatsCollector::new(StatsConfig::finite());
1320 collector.process(&reply(0, 10, 10));
1321 let update = collector.process(&reply(1, 13, 13));
1322
1323 assert!(update.contributed_sample);
1324
1325 let pair = assert_one_ipdv_pair(&update, 0, 1, Duration::from_millis(3));
1326 assert!(pair.send_ipdv.is_some());
1327 assert!(pair.receive_ipdv.is_some());
1328 }
1329
1330 #[test]
1331 fn gap_fill_update_exposes_both_completed_ipdv_pairs() {
1332 let mut collector = StatsCollector::new(StatsConfig::finite());
1333
1334 let first = collector.process(&reply(0, 10, 10));
1335 let gap = collector.process(&reply(2, 20, 20));
1336 let fill = collector.process(&reply(1, 13, 13));
1337
1338 assert!(first.contributed_sample);
1339 assert!(first.ipdv_pairs.is_empty());
1340
1341 assert!(gap.contributed_sample);
1342 assert!(gap.ipdv_pairs.is_empty());
1343
1344 assert!(fill.contributed_sample);
1345 assert_eq!(fill.ipdv_pairs.len(), 2);
1346
1347 assert_eq!(fill.ipdv_pairs[0].previous_logical_seq, 0);
1348 assert_eq!(fill.ipdv_pairs[0].current_logical_seq, 1);
1349 assert_eq!(fill.ipdv_pairs[0].rtt_ipdv, Duration::from_millis(3));
1350
1351 assert_eq!(fill.ipdv_pairs[1].previous_logical_seq, 1);
1352 assert_eq!(fill.ipdv_pairs[1].current_logical_seq, 2);
1353 assert_eq!(fill.ipdv_pairs[1].rtt_ipdv, Duration::from_millis(7));
1354
1355 let snapshot = collector.cumulative();
1356 assert_eq!(snapshot.ipdv.round_trip.stats.count, 2);
1357 assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 10_000_000);
1358 }
1359
1360 #[test]
1361 fn server_processing_and_one_way_require_available_samples() {
1362 let mut collector = StatsCollector::new(StatsConfig::finite());
1363 collector.process(&reply(0, 10, 9));
1364 collector.process(&ClientEvent::LateReply {
1365 seq: 9,
1366 logical_seq: None,
1367 highest_seen: 10,
1368 remote: "127.0.0.1:2112".parse().unwrap(),
1369 sent_at: None,
1370 received_at: ts(100),
1371 rtt: None,
1372 server_timing: None,
1373 one_way: None,
1374 received_stats: None,
1375 bytes: 64,
1376 packet_meta: PacketMeta::default(),
1377 });
1378
1379 let snapshot = collector.cumulative();
1380 assert_eq!(snapshot.server_processing.processing.count, 1);
1381 assert_eq!(snapshot.one_way_delay.send_delay.stats.count, 1);
1382 assert_eq!(snapshot.events.untracked_late_replies, 1);
1383 }
1384
1385 #[test]
1386 fn rolling_count_eviction_recomputes_from_bounded_events() {
1387 let mut collector = StatsCollector::new(StatsConfig {
1388 median: MedianMode::Disabled,
1389 rolling_count: Some(2),
1390 rolling_time: None,
1391 });
1392 collector.process(&sent(0, 0, ts(0)));
1393 collector.process(&reply(0, 10, 10));
1394 collector.process(&reply(1, 20, 20));
1395
1396 let rolling = collector.rolling_count().unwrap();
1397 assert_eq!(rolling.packets.packets_sent, 0);
1398 assert_eq!(rolling.packets.unique_replies, 2);
1399 assert_eq!(rolling.rtt.primary.stats.count, 2);
1400 }
1401
1402 #[test]
1403 fn rolling_time_eviction_uses_event_timestamps() {
1404 let mut collector = StatsCollector::new(StatsConfig {
1405 median: MedianMode::Disabled,
1406 rolling_count: None,
1407 rolling_time: Some(Duration::from_millis(15)),
1408 });
1409 collector.process(&sent(0, 0, ts(0)));
1410 collector.process(&sent(1, 1, ts(10)));
1411 collector.process(&sent(2, 2, ts(30)));
1412
1413 let rolling = collector.rolling_time().unwrap();
1414 assert_eq!(rolling.packets.packets_sent, 1);
1415 }
1416
1417 #[test]
1418 fn empty_and_all_lost_edges_are_defined() {
1419 let empty = StatsCollector::new(StatsConfig::finite()).summary();
1420 assert_eq!(empty.loss.packet_loss_percent, 0.0);
1421
1422 let mut collector = StatsCollector::new(StatsConfig::finite());
1423 collector.process(&sent(0, 0, ts(0)));
1424 let all_lost = collector.summary();
1425 assert_eq!(all_lost.loss.lost_packets, 1);
1426 assert_eq!(all_lost.loss.packet_loss_percent, 100.0);
1427 }
1428
1429 #[test]
1430 fn directional_loss_uses_server_received_count_when_available() {
1431 let mut collector = StatsCollector::new(StatsConfig::finite());
1432 collector.process(&sent(0, 0, ts(0)));
1433 collector.process(&sent(1, 1, ts(10)));
1434 collector.process(&reply(0, 10, 10));
1435
1436 let loss = collector.summary().loss;
1437 assert_eq!(loss.upstream_loss_packets, Some(1));
1438 assert_eq!(loss.downstream_loss_packets, Some(0));
1439 assert_eq!(loss.upstream_loss_percent, 50.0);
1440 }
1441
1442 #[test]
1443 fn single_sample_stddev_is_zero() {
1444 let mut metric = MetricU64::new(false);
1445 metric.push(42);
1446 let stats = metric.stats();
1447 assert_eq!(stats.variance_ns2, 0.0);
1448 assert_eq!(stats.stddev_ns(), 0.0);
1449 }
1450
1451 #[test]
1452 fn system_time_before_epoch_is_supported() {
1453 let before = UNIX_EPOCH - Duration::from_nanos(7);
1454 assert_eq!(system_time_ns(before), Some(-7));
1455 let after = UNIX_EPOCH + Duration::from_nanos(7);
1456 assert_eq!(system_time_ns(after), Some(7));
1457 let now = SystemTime::now();
1458 assert!(system_time_ns(now).is_some());
1459 }
1460}