Skip to main content

irtt_stats/
lib.rs

1//! Statistics aggregation for `irtt-client` events.
2
3#![forbid(unsafe_code)]
4
5use std::{
6    collections::{HashMap, HashSet, VecDeque},
7    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
8};
9
10use irtt_client::{
11    ClientEvent, ClientTimestamp, OneWayDelaySample, ReceivedStatsSample, RttSample, ServerTiming,
12    SignedDuration,
13};
14
15const CONTINUOUS_SEQUENCE_LIMIT: usize = 4096;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct StatsConfig {
19    pub median: MedianMode,
20    pub rolling_count: Option<usize>,
21    pub rolling_time: Option<Duration>,
22}
23
24impl StatsConfig {
25    pub fn finite() -> Self {
26        Self {
27            median: MedianMode::ExactFinite,
28            rolling_count: None,
29            rolling_time: None,
30        }
31    }
32
33    /// Configuration for long-running use.
34    ///
35    /// This disables finite median retention and bounds sequence-adjacent IPDV
36    /// tracking. Exact arbitrary-late IPDV completion is finite-mode behavior.
37    pub fn continuous() -> Self {
38        Self {
39            median: MedianMode::Disabled,
40            rolling_count: None,
41            rolling_time: None,
42        }
43    }
44}
45
46impl Default for StatsConfig {
47    fn default() -> Self {
48        Self::finite()
49    }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum MedianMode {
54    Disabled,
55    ExactFinite,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct StatsCollector {
60    config: StatsConfig,
61    cumulative: CoreStats,
62    rolling_count: Option<VecDeque<WindowEvent>>,
63    rolling_time: Option<VecDeque<WindowEvent>>,
64}
65
66impl StatsCollector {
67    pub fn new(config: StatsConfig) -> Self {
68        Self {
69            config,
70            cumulative: CoreStats::new(config.median),
71            rolling_count: config.rolling_count.map(|_| VecDeque::new()),
72            rolling_time: config.rolling_time.map(|_| VecDeque::new()),
73        }
74    }
75
76    pub fn process(&mut self, event: &ClientEvent) -> EventStatsUpdate {
77        self.process_with_update(event)
78    }
79
80    pub fn process_with_update(&mut self, event: &ClientEvent) -> EventStatsUpdate {
81        let Some(window_event) = WindowEvent::from_client_event(event) else {
82            return EventStatsUpdate::default();
83        };
84
85        let update = self.cumulative.apply(window_event.clone());
86
87        if let (Some(limit), Some(window)) =
88            (self.config.rolling_count, self.rolling_count.as_mut())
89        {
90            window.push_back(window_event.clone());
91            while window.len() > limit {
92                window.pop_front();
93            }
94        }
95
96        if let (Some(duration), Some(window)) =
97            (self.config.rolling_time, self.rolling_time.as_mut())
98        {
99            let cutoff = window_event.at().checked_sub(duration);
100            window.push_back(window_event);
101            if let Some(cutoff) = cutoff {
102                while window.front().is_some_and(|event| event.at() < cutoff) {
103                    window.pop_front();
104                }
105            }
106        }
107
108        update
109    }
110
111    pub fn cumulative(&self) -> CumulativeSnapshot {
112        self.cumulative.snapshot()
113    }
114
115    pub fn rolling_count(&self) -> Option<RollingSnapshot> {
116        self.rolling_count.as_ref().map(snapshot_window)
117    }
118
119    pub fn rolling_time(&self) -> Option<RollingSnapshot> {
120        self.rolling_time.as_ref().map(snapshot_window)
121    }
122
123    pub fn summary(&self) -> FiniteSummary {
124        self.cumulative()
125    }
126
127    #[cfg(test)]
128    fn retained_median_samples(&self) -> usize {
129        self.cumulative.retained_median_samples()
130    }
131
132    #[cfg(test)]
133    fn retained_sequence_samples(&self) -> usize {
134        self.cumulative.retained_sequence_samples()
135    }
136}
137
138pub type RollingSnapshot = CumulativeSnapshot;
139pub type FiniteSummary = CumulativeSnapshot;
140
141#[derive(Debug, Clone, PartialEq, Eq, Default)]
142pub struct EventStatsUpdate {
143    pub contributed_sample: bool,
144    pub ipdv_pairs: Vec<IpdvPairUpdate>,
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub struct IpdvPairUpdate {
149    pub previous_logical_seq: u64,
150    pub current_logical_seq: u64,
151    pub rtt_ipdv: Duration,
152    pub send_ipdv: Option<Duration>,
153    pub receive_ipdv: Option<Duration>,
154}
155
156#[derive(Debug, Clone, PartialEq)]
157pub struct CumulativeSnapshot {
158    pub events: EventCounts,
159    pub packets: PacketCounts,
160    pub loss: LossStats,
161    pub send_call: DurationStats,
162    pub timer_error: DurationStats,
163    pub rtt: RttStats,
164    pub ipdv: IpdvStats,
165    pub one_way_delay: OneWayDelayStats,
166    pub server_processing: ServerProcessingStats,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
170pub struct EventCounts {
171    pub sent_events: u64,
172    pub echo_replies: u64,
173    pub late_unique_replies: u64,
174    pub duplicate_replies: u64,
175    pub loss_events: u64,
176    pub warning_events: u64,
177    pub untracked_late_replies: u64,
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
181pub struct PacketCounts {
182    pub packets_sent: u64,
183    pub packets_received: u64,
184    pub unique_replies: u64,
185    pub duplicates: u64,
186    pub late_packets: u64,
187    pub bytes_sent: u64,
188    pub bytes_received: u64,
189    pub server_packets_received: Option<u64>,
190    pub server_received_window: Option<u64>,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq)]
194pub struct LossStats {
195    pub lost_packets: u64,
196    pub unknown_loss_packets: u64,
197    pub upstream_loss_packets: Option<i128>,
198    pub downstream_loss_packets: Option<i128>,
199    pub packet_loss_percent: f64,
200    pub upstream_loss_percent: f64,
201    pub downstream_loss_percent: f64,
202    pub duplicate_percent: f64,
203    pub late_packets_percent: f64,
204}
205
206#[derive(Debug, Clone, PartialEq)]
207pub struct RttStats {
208    pub primary: SignedDurationStatsWithMedian,
209    pub raw: DurationStatsWithMedian,
210    pub adjusted: SignedDurationStatsWithMedian,
211}
212
213#[derive(Debug, Clone, PartialEq)]
214pub struct IpdvStats {
215    pub round_trip: DurationStatsWithMedian,
216    pub send: DurationStatsWithMedian,
217    pub receive: DurationStatsWithMedian,
218}
219
220#[derive(Debug, Clone, PartialEq)]
221pub struct OneWayDelayStats {
222    pub send_delay: DurationStatsWithMedian,
223    pub receive_delay: DurationStatsWithMedian,
224}
225
226#[derive(Debug, Clone, PartialEq)]
227pub struct ServerProcessingStats {
228    pub processing: DurationStats,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq)]
232pub struct DurationStats {
233    pub count: u64,
234    pub total_ns: u128,
235    pub min_ns: Option<u64>,
236    pub max_ns: Option<u64>,
237    pub mean_ns: f64,
238    pub variance_ns2: f64,
239}
240
241impl DurationStats {
242    pub fn stddev_ns(&self) -> f64 {
243        self.variance_ns2.sqrt()
244    }
245}
246
247#[derive(Debug, Clone, Copy, PartialEq)]
248pub struct DurationStatsWithMedian {
249    pub stats: DurationStats,
250    pub median_ns: Option<f64>,
251}
252
253impl DurationStatsWithMedian {
254    pub fn stddev_ns(&self) -> f64 {
255        self.stats.stddev_ns()
256    }
257}
258
259#[derive(Debug, Clone, Copy, PartialEq)]
260pub struct SignedDurationStats {
261    pub count: u64,
262    pub total_ns: i128,
263    pub min_ns: Option<i128>,
264    pub max_ns: Option<i128>,
265    pub mean_ns: f64,
266    pub variance_ns2: f64,
267}
268
269impl SignedDurationStats {
270    pub fn stddev_ns(&self) -> f64 {
271        self.variance_ns2.sqrt()
272    }
273}
274
275#[derive(Debug, Clone, Copy, PartialEq)]
276pub struct SignedDurationStatsWithMedian {
277    pub stats: SignedDurationStats,
278    pub median_ns: Option<f64>,
279}
280
281impl SignedDurationStatsWithMedian {
282    pub fn stddev_ns(&self) -> f64 {
283        self.stats.stddev_ns()
284    }
285}
286
287#[derive(Debug, Clone, PartialEq)]
288struct CoreStats {
289    median: MedianMode,
290    sequence_limit: Option<usize>,
291    events: EventCounts,
292    packets: PacketCounts,
293    send_call: MetricU64,
294    timer_error: MetricU64,
295    rtt_primary: MetricI128,
296    rtt_raw: MetricU64,
297    rtt_adjusted: MetricI128,
298    ipdv_round_trip: MetricU64,
299    ipdv_send: MetricU64,
300    ipdv_receive: MetricU64,
301    send_delay: MetricU64,
302    receive_delay: MetricU64,
303    server_processing: MetricU64,
304    samples: HashMap<u64, UniqueSample>,
305    sample_order: VecDeque<u64>,
306    ipdv_pairs: HashSet<u64>,
307}
308
309impl CoreStats {
310    fn new(median: MedianMode) -> Self {
311        Self {
312            median,
313            sequence_limit: if median == MedianMode::ExactFinite {
314                None
315            } else {
316                Some(CONTINUOUS_SEQUENCE_LIMIT)
317            },
318            events: EventCounts::default(),
319            packets: PacketCounts::default(),
320            send_call: MetricU64::new(false),
321            timer_error: MetricU64::new(false),
322            rtt_primary: MetricI128::new(median == MedianMode::ExactFinite),
323            rtt_raw: MetricU64::new(median == MedianMode::ExactFinite),
324            rtt_adjusted: MetricI128::new(median == MedianMode::ExactFinite),
325            ipdv_round_trip: MetricU64::new(median == MedianMode::ExactFinite),
326            ipdv_send: MetricU64::new(median == MedianMode::ExactFinite),
327            ipdv_receive: MetricU64::new(median == MedianMode::ExactFinite),
328            send_delay: MetricU64::new(median == MedianMode::ExactFinite),
329            receive_delay: MetricU64::new(median == MedianMode::ExactFinite),
330            server_processing: MetricU64::new(false),
331            samples: HashMap::new(),
332            sample_order: VecDeque::new(),
333            ipdv_pairs: HashSet::new(),
334        }
335    }
336
337    fn apply(&mut self, event: WindowEvent) -> EventStatsUpdate {
338        let mut update = EventStatsUpdate::default();
339        match event {
340            WindowEvent::Sent {
341                bytes,
342                send_call_ns,
343                timer_error_ns,
344                ..
345            } => {
346                self.events.sent_events += 1;
347                self.packets.packets_sent += 1;
348                self.packets.bytes_sent = self.packets.bytes_sent.saturating_add(bytes as u64);
349                self.send_call.push(send_call_ns);
350                self.timer_error.push(timer_error_ns);
351            }
352            WindowEvent::UniqueReply {
353                is_late, sample, ..
354            } => {
355                let sample = *sample;
356                self.events.echo_replies += u64::from(!is_late);
357                self.events.late_unique_replies += u64::from(is_late);
358                self.packets.packets_received += 1;
359                self.packets.unique_replies += 1;
360                self.packets.late_packets += u64::from(is_late);
361                self.packets.bytes_received = self
362                    .packets
363                    .bytes_received
364                    .saturating_add(sample.bytes as u64);
365                update.contributed_sample = true;
366                if let Some(count) = sample.received_count {
367                    self.packets.server_packets_received = Some(u64::from(count));
368                }
369                if let Some(window) = sample.received_window {
370                    self.packets.server_received_window = Some(window);
371                }
372
373                self.rtt_primary.push(sample.rtt_primary_ns);
374                self.rtt_raw.push(sample.rtt_raw_ns);
375                if let Some(adjusted) = sample.rtt_adjusted_ns {
376                    self.rtt_adjusted.push(adjusted);
377                }
378                if let Some(processing) = sample.server_processing_ns {
379                    self.server_processing.push(processing);
380                }
381                if let Some(delay) = sample.send_delay_ns {
382                    self.send_delay.push(delay);
383                }
384                if let Some(delay) = sample.receive_delay_ns {
385                    self.receive_delay.push(delay);
386                }
387
388                let seq = sample.logical_seq;
389                if self.samples.insert(seq, sample).is_none() {
390                    self.sample_order.push_back(seq);
391                    self.enforce_sequence_limit();
392                    if let Some(pair) = self.try_ipdv_pair(seq) {
393                        update.ipdv_pairs.push(pair);
394                    }
395                    if let Some(next) = seq.checked_add(1) {
396                        if let Some(pair) = self.try_ipdv_pair(next) {
397                            update.ipdv_pairs.push(pair);
398                        }
399                    }
400                }
401            }
402            WindowEvent::DuplicateReply { .. } => {
403                self.events.duplicate_replies += 1;
404                self.packets.packets_received += 1;
405                self.packets.duplicates += 1;
406            }
407            WindowEvent::Loss { .. } => {
408                self.events.loss_events += 1;
409            }
410            WindowEvent::Warning { .. } => {
411                self.events.warning_events += 1;
412            }
413            WindowEvent::UntrackedLate { .. } => {
414                self.events.untracked_late_replies += 1;
415            }
416        }
417        update
418    }
419
420    fn enforce_sequence_limit(&mut self) {
421        let Some(limit) = self.sequence_limit else {
422            return;
423        };
424        while self.samples.len() > limit {
425            let Some(seq) = self.sample_order.pop_front() else {
426                break;
427            };
428            if self.samples.remove(&seq).is_some() {
429                self.ipdv_pairs.remove(&seq);
430                if let Some(next) = seq.checked_add(1) {
431                    self.ipdv_pairs.remove(&next);
432                }
433            }
434        }
435    }
436
437    fn try_ipdv_pair(&mut self, current_seq: u64) -> Option<IpdvPairUpdate> {
438        let previous_seq = current_seq.checked_sub(1)?;
439
440        if !self.ipdv_pairs.insert(current_seq) {
441            return None;
442        }
443
444        let Some(previous) = self.samples.get(&previous_seq) else {
445            self.ipdv_pairs.remove(&current_seq);
446            return None;
447        };
448
449        let Some(current) = self.samples.get(&current_seq) else {
450            self.ipdv_pairs.remove(&current_seq);
451            return None;
452        };
453
454        // Compute everything before mutating metric fields, otherwise the borrow
455        // checker may quite reasonably start throwing furniture.
456        let rtt_ipdv = abs_i128_to_u64(current.rtt_primary_ns - previous.rtt_primary_ns);
457        let send_ipdv = send_ipdv_ns(previous, current).map(abs_i128_to_u64);
458        let receive_ipdv = receive_ipdv_ns(previous, current).map(abs_i128_to_u64);
459
460        self.ipdv_round_trip.push(rtt_ipdv);
461
462        if let Some(value) = send_ipdv {
463            self.ipdv_send.push(value);
464        }
465
466        if let Some(value) = receive_ipdv {
467            self.ipdv_receive.push(value);
468        }
469
470        Some(IpdvPairUpdate {
471            previous_logical_seq: previous_seq,
472            current_logical_seq: current_seq,
473            rtt_ipdv: Duration::from_nanos(rtt_ipdv),
474            send_ipdv: send_ipdv.map(Duration::from_nanos),
475            receive_ipdv: receive_ipdv.map(Duration::from_nanos),
476        })
477    }
478
479    fn snapshot(&self) -> CumulativeSnapshot {
480        let packets = self.packets;
481        CumulativeSnapshot {
482            events: self.events,
483            packets,
484            loss: loss_stats(packets),
485            send_call: self.send_call.stats(),
486            timer_error: self.timer_error.stats(),
487            rtt: RttStats {
488                primary: self.rtt_primary.stats_with_median(),
489                raw: self.rtt_raw.stats_with_median(),
490                adjusted: self.rtt_adjusted.stats_with_median(),
491            },
492            ipdv: IpdvStats {
493                round_trip: self.ipdv_round_trip.stats_with_median(),
494                send: self.ipdv_send.stats_with_median(),
495                receive: self.ipdv_receive.stats_with_median(),
496            },
497            one_way_delay: OneWayDelayStats {
498                send_delay: self.send_delay.stats_with_median(),
499                receive_delay: self.receive_delay.stats_with_median(),
500            },
501            server_processing: ServerProcessingStats {
502                processing: self.server_processing.stats(),
503            },
504        }
505    }
506
507    #[cfg(test)]
508    fn retained_median_samples(&self) -> usize {
509        self.rtt_primary.retained_samples()
510            + self.rtt_raw.retained_samples()
511            + self.rtt_adjusted.retained_samples()
512            + self.ipdv_round_trip.retained_samples()
513            + self.ipdv_send.retained_samples()
514            + self.ipdv_receive.retained_samples()
515            + self.send_delay.retained_samples()
516            + self.receive_delay.retained_samples()
517    }
518
519    #[cfg(test)]
520    fn retained_sequence_samples(&self) -> usize {
521        self.samples.len()
522    }
523}
524
525#[derive(Debug, Clone, PartialEq)]
526enum WindowEvent {
527    Sent {
528        at: Instant,
529        bytes: usize,
530        send_call_ns: u64,
531        timer_error_ns: u64,
532    },
533    UniqueReply {
534        at: Instant,
535        is_late: bool,
536        sample: Box<UniqueSample>,
537    },
538    DuplicateReply {
539        at: Instant,
540    },
541    Loss {
542        at: Instant,
543    },
544    Warning {
545        at: Instant,
546    },
547    UntrackedLate {
548        at: Instant,
549    },
550}
551
552impl WindowEvent {
553    fn from_client_event(event: &ClientEvent) -> Option<Self> {
554        match event {
555            ClientEvent::EchoSent {
556                sent_at,
557                bytes,
558                send_call,
559                timer_error,
560                ..
561            } => Some(Self::Sent {
562                at: sent_at.mono,
563                bytes: *bytes,
564                send_call_ns: duration_ns_u64(*send_call),
565                timer_error_ns: duration_ns_u64(*timer_error),
566            }),
567            ClientEvent::EchoReply {
568                logical_seq,
569                sent_at,
570                received_at,
571                rtt,
572                server_timing,
573                one_way,
574                received_stats,
575                bytes,
576                ..
577            } => Some(Self::UniqueReply {
578                at: received_at.mono,
579                is_late: false,
580                sample: Box::new(UniqueSample::new(
581                    *logical_seq,
582                    *sent_at,
583                    *received_at,
584                    *rtt,
585                    *server_timing,
586                    *one_way,
587                    *received_stats,
588                    *bytes,
589                )),
590            }),
591            ClientEvent::LateReply {
592                logical_seq: Some(logical_seq),
593                sent_at: Some(sent_at),
594                received_at,
595                rtt: Some(rtt),
596                server_timing,
597                one_way,
598                received_stats,
599                bytes,
600                ..
601            } => Some(Self::UniqueReply {
602                at: received_at.mono,
603                is_late: true,
604                sample: Box::new(UniqueSample::new(
605                    *logical_seq,
606                    *sent_at,
607                    *received_at,
608                    *rtt,
609                    *server_timing,
610                    *one_way,
611                    *received_stats,
612                    *bytes,
613                )),
614            }),
615            ClientEvent::LateReply { received_at, .. } => Some(Self::UntrackedLate {
616                at: received_at.mono,
617            }),
618            ClientEvent::DuplicateReply { received_at, .. } => Some(Self::DuplicateReply {
619                at: received_at.mono,
620            }),
621            ClientEvent::EchoLoss { timeout_at, .. } => Some(Self::Loss { at: *timeout_at }),
622            ClientEvent::Warning { .. } => Some(Self::Warning { at: Instant::now() }),
623            ClientEvent::SessionStarted { .. }
624            | ClientEvent::NoTestCompleted { .. }
625            | ClientEvent::SessionClosed { .. } => None,
626        }
627    }
628
629    fn at(&self) -> Instant {
630        match self {
631            Self::Sent { at, .. }
632            | Self::UniqueReply { at, .. }
633            | Self::DuplicateReply { at }
634            | Self::Loss { at }
635            | Self::Warning { at }
636            | Self::UntrackedLate { at } => *at,
637        }
638    }
639}
640
641#[derive(Debug, Clone, PartialEq)]
642struct UniqueSample {
643    logical_seq: u64,
644    bytes: usize,
645    rtt_primary_ns: i128,
646    rtt_raw_ns: u64,
647    rtt_adjusted_ns: Option<i128>,
648    send_delay_ns: Option<u64>,
649    receive_delay_ns: Option<u64>,
650    server_processing_ns: Option<u64>,
651    received_count: Option<u32>,
652    received_window: Option<u64>,
653    client_send_mono: Instant,
654    client_receive_mono: Instant,
655    client_send_wall_ns: Option<i128>,
656    client_receive_wall_ns: Option<i128>,
657    server_receive_mono_ns: Option<i64>,
658    server_send_mono_ns: Option<i64>,
659    server_receive_wall_ns: Option<i64>,
660    server_send_wall_ns: Option<i64>,
661}
662
663impl UniqueSample {
664    #[allow(clippy::too_many_arguments)]
665    fn new(
666        logical_seq: u64,
667        sent_at: ClientTimestamp,
668        received_at: ClientTimestamp,
669        rtt: RttSample,
670        server_timing: Option<ServerTiming>,
671        one_way: Option<OneWayDelaySample>,
672        received_stats: Option<ReceivedStatsSample>,
673        bytes: usize,
674    ) -> Self {
675        Self {
676            logical_seq,
677            bytes,
678            rtt_primary_ns: signed_duration_ns(rtt.effective_signed),
679            rtt_raw_ns: duration_ns_u64(rtt.raw),
680            rtt_adjusted_ns: rtt.adjusted_signed.map(signed_duration_ns),
681            send_delay_ns: one_way
682                .and_then(|sample| sample.client_to_server)
683                .map(duration_ns_u64),
684            receive_delay_ns: one_way
685                .and_then(|sample| sample.server_to_client)
686                .map(duration_ns_u64),
687            server_processing_ns: server_timing
688                .and_then(|timing| timing.processing)
689                .map(duration_ns_u64),
690            received_count: received_stats.and_then(|stats| stats.count),
691            received_window: received_stats.and_then(|stats| stats.window),
692            client_send_mono: sent_at.mono,
693            client_receive_mono: received_at.mono,
694            client_send_wall_ns: system_time_ns(sent_at.wall),
695            client_receive_wall_ns: system_time_ns(received_at.wall),
696            server_receive_mono_ns: server_timing.and_then(|timing| timing.receive_mono_ns),
697            server_send_mono_ns: server_timing.and_then(|timing| timing.send_mono_ns),
698            server_receive_wall_ns: server_timing.and_then(|timing| timing.receive_wall_ns),
699            server_send_wall_ns: server_timing.and_then(|timing| timing.send_wall_ns),
700        }
701    }
702}
703
704#[derive(Debug, Clone, PartialEq)]
705struct MetricU64 {
706    running: RunningU64,
707    samples: Option<Vec<u64>>,
708}
709
710impl MetricU64 {
711    fn new(retain_samples: bool) -> Self {
712        Self {
713            running: RunningU64::default(),
714            samples: retain_samples.then(Vec::new),
715        }
716    }
717
718    fn push(&mut self, value: u64) {
719        self.running.push(value);
720        if let Some(samples) = self.samples.as_mut() {
721            samples.push(value);
722        }
723    }
724
725    fn stats(&self) -> DurationStats {
726        self.running.stats()
727    }
728
729    fn stats_with_median(&self) -> DurationStatsWithMedian {
730        DurationStatsWithMedian {
731            stats: self.stats(),
732            median_ns: self
733                .samples
734                .as_ref()
735                .and_then(|samples| median_u64(samples)),
736        }
737    }
738
739    #[cfg(test)]
740    fn retained_samples(&self) -> usize {
741        self.samples.as_ref().map_or(0, Vec::len)
742    }
743}
744
745#[derive(Debug, Clone, PartialEq)]
746struct MetricI128 {
747    running: RunningI128,
748    samples: Option<Vec<i128>>,
749}
750
751impl MetricI128 {
752    fn new(retain_samples: bool) -> Self {
753        Self {
754            running: RunningI128::default(),
755            samples: retain_samples.then(Vec::new),
756        }
757    }
758
759    fn push(&mut self, value: i128) {
760        self.running.push(value);
761        if let Some(samples) = self.samples.as_mut() {
762            samples.push(value);
763        }
764    }
765
766    fn stats(&self) -> SignedDurationStats {
767        self.running.stats()
768    }
769
770    fn stats_with_median(&self) -> SignedDurationStatsWithMedian {
771        SignedDurationStatsWithMedian {
772            stats: self.stats(),
773            median_ns: self
774                .samples
775                .as_ref()
776                .and_then(|samples| median_i128(samples)),
777        }
778    }
779
780    #[cfg(test)]
781    fn retained_samples(&self) -> usize {
782        self.samples.as_ref().map_or(0, Vec::len)
783    }
784}
785
786#[derive(Debug, Clone, PartialEq, Default)]
787struct RunningU64 {
788    count: u64,
789    total: u128,
790    min: Option<u64>,
791    max: Option<u64>,
792    mean: f64,
793    m2: f64,
794}
795
796impl RunningU64 {
797    fn push(&mut self, value: u64) {
798        self.count += 1;
799        self.total = self.total.saturating_add(u128::from(value));
800        self.min = Some(self.min.map_or(value, |min| min.min(value)));
801        self.max = Some(self.max.map_or(value, |max| max.max(value)));
802        let x = value as f64;
803        let delta = x - self.mean;
804        self.mean += delta / self.count as f64;
805        let delta2 = x - self.mean;
806        self.m2 += delta * delta2;
807    }
808
809    fn stats(&self) -> DurationStats {
810        DurationStats {
811            count: self.count,
812            total_ns: self.total,
813            min_ns: self.min,
814            max_ns: self.max,
815            mean_ns: if self.count == 0 { 0.0 } else { self.mean },
816            variance_ns2: sample_variance(self.count, self.m2),
817        }
818    }
819}
820
821#[derive(Debug, Clone, PartialEq, Default)]
822struct RunningI128 {
823    count: u64,
824    total: i128,
825    min: Option<i128>,
826    max: Option<i128>,
827    mean: f64,
828    m2: f64,
829}
830
831impl RunningI128 {
832    fn push(&mut self, value: i128) {
833        self.count += 1;
834        self.total = self.total.saturating_add(value);
835        self.min = Some(self.min.map_or(value, |min| min.min(value)));
836        self.max = Some(self.max.map_or(value, |max| max.max(value)));
837        let x = value as f64;
838        let delta = x - self.mean;
839        self.mean += delta / self.count as f64;
840        let delta2 = x - self.mean;
841        self.m2 += delta * delta2;
842    }
843
844    fn stats(&self) -> SignedDurationStats {
845        SignedDurationStats {
846            count: self.count,
847            total_ns: self.total,
848            min_ns: self.min,
849            max_ns: self.max,
850            mean_ns: if self.count == 0 { 0.0 } else { self.mean },
851            variance_ns2: sample_variance(self.count, self.m2),
852        }
853    }
854}
855
856fn snapshot_window(events: &VecDeque<WindowEvent>) -> CumulativeSnapshot {
857    let mut core = CoreStats::new(MedianMode::Disabled);
858    for event in events {
859        core.apply(event.clone());
860    }
861    core.snapshot()
862}
863
864fn sample_variance(count: u64, m2: f64) -> f64 {
865    if count < 2 {
866        0.0
867    } else {
868        m2 / (count - 1) as f64
869    }
870}
871
872fn median_u64(samples: &[u64]) -> Option<f64> {
873    if samples.is_empty() {
874        return None;
875    }
876    let mut sorted = samples.to_vec();
877    sorted.sort_unstable();
878    Some(median_sorted_u64(&sorted))
879}
880
881fn median_sorted_u64(sorted: &[u64]) -> f64 {
882    let mid = sorted.len() / 2;
883    if sorted.len() % 2 == 1 {
884        sorted[mid] as f64
885    } else {
886        (sorted[mid - 1] as f64 + sorted[mid] as f64) / 2.0
887    }
888}
889
890fn median_i128(samples: &[i128]) -> Option<f64> {
891    if samples.is_empty() {
892        return None;
893    }
894    let mut sorted = samples.to_vec();
895    sorted.sort_unstable();
896    let mid = sorted.len() / 2;
897    Some(if sorted.len() % 2 == 1 {
898        sorted[mid] as f64
899    } else {
900        (sorted[mid - 1] as f64 + sorted[mid] as f64) / 2.0
901    })
902}
903
904fn loss_stats(packets: PacketCounts) -> LossStats {
905    let lost = packets.packets_sent.saturating_sub(packets.unique_replies);
906    let packet_loss_percent = if packets.packets_sent == 0 {
907        0.0
908    } else if packets.unique_replies == 0 {
909        100.0
910    } else {
911        percent(lost as f64, packets.packets_sent as f64)
912    };
913
914    let (
915        upstream_loss_packets,
916        upstream_loss_percent,
917        downstream_loss_packets,
918        downstream_loss_percent,
919    ) = if let Some(server_received) = packets.server_packets_received {
920        let upstream = i128::from(packets.packets_sent) - i128::from(server_received);
921        let downstream = i128::from(server_received) - i128::from(packets.packets_received);
922        let upstream_percent = if packets.packets_sent == 0 {
923            0.0
924        } else {
925            percent(upstream as f64, packets.packets_sent as f64)
926        };
927        let downstream_percent = if server_received == 0 {
928            0.0
929        } else {
930            percent(downstream as f64, server_received as f64)
931        };
932        (
933            Some(upstream),
934            upstream_percent,
935            Some(downstream),
936            downstream_percent,
937        )
938    } else {
939        (None, 0.0, None, 0.0)
940    };
941
942    LossStats {
943        lost_packets: lost,
944        unknown_loss_packets: lost,
945        upstream_loss_packets,
946        downstream_loss_packets,
947        packet_loss_percent,
948        upstream_loss_percent,
949        downstream_loss_percent,
950        duplicate_percent: if packets.packets_received == 0 {
951            0.0
952        } else {
953            percent(packets.duplicates as f64, packets.packets_received as f64)
954        },
955        late_packets_percent: if packets.packets_received == 0 {
956            0.0
957        } else {
958            percent(packets.late_packets as f64, packets.packets_received as f64)
959        },
960    }
961}
962
963fn percent(numerator: f64, denominator: f64) -> f64 {
964    100.0 * numerator / denominator
965}
966
967fn send_ipdv_ns(previous: &UniqueSample, current: &UniqueSample) -> Option<i128> {
968    if let (Some(prev_server), Some(cur_server)) = (
969        previous.server_receive_mono_ns,
970        current.server_receive_mono_ns,
971    ) {
972        return Some(
973            i128::from(cur_server)
974                - i128::from(prev_server)
975                - instant_diff_ns(current.client_send_mono, previous.client_send_mono),
976        );
977    }
978    if let (Some(prev_server), Some(cur_server), Some(prev_client), Some(cur_client)) = (
979        previous.server_receive_wall_ns,
980        current.server_receive_wall_ns,
981        previous.client_send_wall_ns,
982        current.client_send_wall_ns,
983    ) {
984        return Some(i128::from(cur_server) - i128::from(prev_server) - (cur_client - prev_client));
985    }
986    None
987}
988
989fn receive_ipdv_ns(previous: &UniqueSample, current: &UniqueSample) -> Option<i128> {
990    if let (Some(prev_server), Some(cur_server)) =
991        (previous.server_send_mono_ns, current.server_send_mono_ns)
992    {
993        return Some(
994            instant_diff_ns(current.client_receive_mono, previous.client_receive_mono)
995                - (i128::from(cur_server) - i128::from(prev_server)),
996        );
997    }
998    if let (Some(prev_server), Some(cur_server), Some(prev_client), Some(cur_client)) = (
999        previous.server_send_wall_ns,
1000        current.server_send_wall_ns,
1001        previous.client_receive_wall_ns,
1002        current.client_receive_wall_ns,
1003    ) {
1004        return Some(
1005            (cur_client - prev_client) - (i128::from(cur_server) - i128::from(prev_server)),
1006        );
1007    }
1008    None
1009}
1010
1011fn instant_diff_ns(current: Instant, previous: Instant) -> i128 {
1012    if let Some(diff) = current.checked_duration_since(previous) {
1013        duration_ns_i128(diff)
1014    } else {
1015        -duration_ns_i128(previous.duration_since(current))
1016    }
1017}
1018
1019fn system_time_ns(time: SystemTime) -> Option<i128> {
1020    if let Ok(duration) = time.duration_since(UNIX_EPOCH) {
1021        return Some(duration_ns_i128(duration));
1022    }
1023    UNIX_EPOCH
1024        .duration_since(time)
1025        .ok()
1026        .map(|duration| -duration_ns_i128(duration))
1027}
1028
1029fn duration_ns_u64(duration: Duration) -> u64 {
1030    u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
1031}
1032
1033fn duration_ns_i128(duration: Duration) -> i128 {
1034    i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX)
1035}
1036
1037fn signed_duration_ns(duration: SignedDuration) -> i128 {
1038    duration.ns
1039}
1040
1041fn abs_i128_to_u64(value: i128) -> u64 {
1042    u64::try_from(value.saturating_abs()).unwrap_or(u64::MAX)
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047    use super::*;
1048    use irtt_client::{
1049        ClientTimestamp, PacketMeta, ReceivedStatsSample, RttSample, ServerTiming, SignedDuration,
1050    };
1051    use std::time::SystemTime;
1052
1053    fn ts(ms: u64) -> ClientTimestamp {
1054        ClientTimestamp {
1055            mono: Instant::now() + Duration::from_millis(ms),
1056            wall: UNIX_EPOCH + Duration::from_millis(ms),
1057        }
1058    }
1059
1060    fn rtt(raw_ms: u64, effective_ms: i128) -> RttSample {
1061        RttSample {
1062            raw: Duration::from_millis(raw_ms),
1063            adjusted: u64::try_from(effective_ms).ok().map(Duration::from_millis),
1064            effective: u64::try_from(effective_ms)
1065                .ok()
1066                .map(Duration::from_millis)
1067                .unwrap_or_else(|| Duration::from_millis(raw_ms)),
1068            adjusted_signed: Some(SignedDuration {
1069                ns: effective_ms * 1_000_000,
1070            }),
1071            effective_signed: SignedDuration {
1072                ns: effective_ms * 1_000_000,
1073            },
1074        }
1075    }
1076
1077    fn sent(seq: u32, logical_seq: u64, sent_at: ClientTimestamp) -> ClientEvent {
1078        ClientEvent::EchoSent {
1079            seq,
1080            logical_seq,
1081            remote: "127.0.0.1:2112".parse().unwrap(),
1082            scheduled_at: sent_at.mono,
1083            sent_at,
1084            bytes: 32,
1085            send_call: Duration::from_micros(10),
1086            timer_error: Duration::from_micros(2),
1087        }
1088    }
1089
1090    fn reply(logical_seq: u64, raw_ms: u64, effective_ms: i128) -> ClientEvent {
1091        let sent_at = ts(logical_seq * 10);
1092        let received_at = ClientTimestamp {
1093            mono: sent_at.mono + Duration::from_millis(raw_ms),
1094            wall: sent_at.wall + Duration::from_millis(raw_ms),
1095        };
1096        ClientEvent::EchoReply {
1097            seq: logical_seq as u32,
1098            logical_seq,
1099            remote: "127.0.0.1:2112".parse().unwrap(),
1100            sent_at,
1101            received_at,
1102            rtt: rtt(raw_ms, effective_ms),
1103            server_timing: Some(ServerTiming {
1104                receive_wall_ns: Some(system_time_ns(sent_at.wall).unwrap() as i64 + 1_000_000),
1105                receive_mono_ns: Some(logical_seq as i64 * 10_000_000 + 1_000_000),
1106                send_wall_ns: Some(system_time_ns(sent_at.wall).unwrap() as i64 + 2_000_000),
1107                send_mono_ns: Some(logical_seq as i64 * 10_000_000 + 2_000_000),
1108                midpoint_wall_ns: None,
1109                midpoint_mono_ns: None,
1110                processing: Some(Duration::from_millis(1)),
1111            }),
1112            one_way: Some(OneWayDelaySample {
1113                client_to_server: Some(Duration::from_millis(1)),
1114                server_to_client: Some(Duration::from_millis(2)),
1115            }),
1116            received_stats: Some(ReceivedStatsSample {
1117                count: Some((logical_seq + 1) as u32),
1118                window: Some(0xff),
1119            }),
1120            bytes: 64,
1121            packet_meta: PacketMeta::default(),
1122        }
1123    }
1124
1125    fn late_reply(logical_seq: u64, raw_ms: u64, effective_ms: i128) -> ClientEvent {
1126        let ClientEvent::EchoReply {
1127            seq,
1128            remote,
1129            sent_at,
1130            received_at,
1131            rtt,
1132            server_timing,
1133            one_way,
1134            received_stats,
1135            bytes,
1136            packet_meta,
1137            ..
1138        } = reply(logical_seq, raw_ms, effective_ms)
1139        else {
1140            unreachable!();
1141        };
1142        ClientEvent::LateReply {
1143            seq,
1144            logical_seq: Some(logical_seq),
1145            highest_seen: seq + 1,
1146            remote,
1147            sent_at: Some(sent_at),
1148            received_at,
1149            rtt: Some(rtt),
1150            server_timing,
1151            one_way,
1152            received_stats,
1153            bytes,
1154            packet_meta,
1155        }
1156    }
1157
1158    #[test]
1159    fn running_duration_stats_use_sample_variance() {
1160        let mut metric = MetricU64::new(false);
1161        metric.push(1);
1162        metric.push(2);
1163        metric.push(3);
1164        let stats = metric.stats();
1165        assert_eq!(stats.count, 3);
1166        assert_eq!(stats.total_ns, 6);
1167        assert_eq!(stats.min_ns, Some(1));
1168        assert_eq!(stats.max_ns, Some(3));
1169        assert_eq!(stats.mean_ns, 2.0);
1170        assert_eq!(stats.variance_ns2, 1.0);
1171        assert_eq!(stats.stddev_ns(), 1.0);
1172    }
1173
1174    #[test]
1175    fn exact_median_handles_odd_and_even_samples() {
1176        assert_eq!(median_u64(&[3, 1, 2]), Some(2.0));
1177        assert_eq!(median_u64(&[4, 1, 2, 3]), Some(2.5));
1178        assert_eq!(median_i128(&[-5, 1, 3]), Some(1.0));
1179        assert_eq!(median_i128(&[-5, 1, 3, 7]), Some(2.0));
1180    }
1181
1182    #[test]
1183    fn disabled_median_avoids_finite_retention() {
1184        let mut collector = StatsCollector::new(StatsConfig::continuous());
1185        collector.process(&reply(0, 10, 9));
1186        collector.process(&reply(1, 20, 19));
1187        assert_eq!(collector.retained_median_samples(), 0);
1188        assert_eq!(collector.cumulative().rtt.primary.median_ns, None);
1189    }
1190
1191    #[test]
1192    fn continuous_mode_bounds_sequence_tracking() {
1193        let mut collector = StatsCollector::new(StatsConfig::continuous());
1194        for seq in 0..(CONTINUOUS_SEQUENCE_LIMIT as u64 + 8) {
1195            collector.process(&reply(seq, 10, 10));
1196        }
1197
1198        assert_eq!(collector.retained_median_samples(), 0);
1199        assert!(collector.retained_sequence_samples() <= CONTINUOUS_SEQUENCE_LIMIT);
1200        assert_eq!(
1201            collector.cumulative().rtt.primary.stats.count,
1202            CONTINUOUS_SEQUENCE_LIMIT as u64 + 8
1203        );
1204    }
1205
1206    #[test]
1207    fn cumulative_rtt_uses_signed_effective_and_tracks_raw() {
1208        let mut collector = StatsCollector::new(StatsConfig::finite());
1209        collector.process(&reply(0, 1, -2));
1210        collector.process(&reply(1, 10, 8));
1211
1212        let snapshot = collector.cumulative();
1213        assert_eq!(snapshot.rtt.primary.stats.count, 2);
1214        assert_eq!(snapshot.rtt.primary.stats.min_ns, Some(-2_000_000));
1215        assert_eq!(snapshot.rtt.primary.median_ns, Some(3_000_000.0));
1216        assert_eq!(snapshot.rtt.raw.stats.total_ns, 11_000_000);
1217    }
1218
1219    #[test]
1220    fn late_unique_counts_and_duplicates_do_not_update_measurements() {
1221        let mut collector = StatsCollector::new(StatsConfig::finite());
1222        collector.process(&sent(0, 0, ts(0)));
1223        collector.process(&sent(1, 1, ts(10)));
1224        collector.process(&reply(1, 10, 9));
1225        collector.process(&late_reply(0, 20, 19));
1226        collector.process(&ClientEvent::DuplicateReply {
1227            seq: 0,
1228            remote: "127.0.0.1:2112".parse().unwrap(),
1229            received_at: ts(50),
1230            bytes: 64,
1231        });
1232
1233        let snapshot = collector.cumulative();
1234        assert_eq!(snapshot.packets.packets_sent, 2);
1235        assert_eq!(snapshot.packets.packets_received, 3);
1236        assert_eq!(snapshot.packets.unique_replies, 2);
1237        assert_eq!(snapshot.packets.duplicates, 1);
1238        assert_eq!(snapshot.packets.late_packets, 1);
1239        assert_eq!(snapshot.rtt.primary.stats.count, 2);
1240        assert_eq!(snapshot.loss.lost_packets, 0);
1241        assert_eq!(snapshot.loss.duplicate_percent, 100.0 / 3.0);
1242    }
1243
1244    #[test]
1245    fn final_loss_uses_sent_minus_unique_replies_not_echo_loss_events() {
1246        let mut collector = StatsCollector::new(StatsConfig::finite());
1247        collector.process(&sent(0, 0, ts(0)));
1248        collector.process(&sent(1, 1, ts(10)));
1249        collector.process(&ClientEvent::EchoLoss {
1250            seq: 0,
1251            logical_seq: 0,
1252            sent_at: ts(0),
1253            timeout_at: Instant::now(),
1254        });
1255        collector.process(&late_reply(0, 10, 9));
1256
1257        let snapshot = collector.summary();
1258        assert_eq!(snapshot.events.loss_events, 1);
1259        assert_eq!(snapshot.packets.unique_replies, 1);
1260        assert_eq!(snapshot.loss.lost_packets, 1);
1261        assert_eq!(snapshot.loss.packet_loss_percent, 50.0);
1262    }
1263
1264    fn assert_no_ipdv_pairs(update: &EventStatsUpdate) {
1265        assert!(update.ipdv_pairs.is_empty(), "{update:?}");
1266    }
1267
1268    fn assert_one_ipdv_pair(
1269        update: &EventStatsUpdate,
1270        previous_logical_seq: u64,
1271        current_logical_seq: u64,
1272        rtt_ipdv: Duration,
1273    ) -> &IpdvPairUpdate {
1274        assert_eq!(update.ipdv_pairs.len(), 1, "{update:?}");
1275        let pair = &update.ipdv_pairs[0];
1276        assert_eq!(pair.previous_logical_seq, previous_logical_seq);
1277        assert_eq!(pair.current_logical_seq, current_logical_seq);
1278        assert_eq!(pair.rtt_ipdv, rtt_ipdv);
1279        pair
1280    }
1281
1282    #[test]
1283    fn ipdv_is_sequence_adjacent_and_gap_preserving() {
1284        let mut collector = StatsCollector::new(StatsConfig::finite());
1285        let first = collector.process(&reply(0, 10, 10));
1286        let gap = collector.process(&reply(2, 15, 15));
1287        let adjacent = collector.process(&reply(3, 12, 12));
1288
1289        let snapshot = collector.cumulative();
1290        assert!(first.contributed_sample);
1291        assert_no_ipdv_pairs(&first);
1292
1293        assert!(gap.contributed_sample);
1294        assert_no_ipdv_pairs(&gap);
1295
1296        assert!(adjacent.contributed_sample);
1297        assert_one_ipdv_pair(&adjacent, 2, 3, Duration::from_millis(3));
1298        assert_eq!(snapshot.ipdv.round_trip.stats.count, 1);
1299        assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 3_000_000);
1300    }
1301
1302    #[test]
1303    fn late_reply_can_complete_ipdv_pair() {
1304        let mut collector = StatsCollector::new(StatsConfig::finite());
1305        collector.process(&reply(1, 20, 20));
1306        let update = collector.process(&late_reply(0, 10, 10));
1307
1308        let snapshot = collector.cumulative();
1309
1310        assert!(update.contributed_sample);
1311        assert_one_ipdv_pair(&update, 0, 1, Duration::from_millis(10));
1312
1313        assert_eq!(snapshot.ipdv.round_trip.stats.count, 1);
1314        assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 10_000_000);
1315    }
1316
1317    #[test]
1318    fn update_exposes_directional_ipdv_when_available() {
1319        let mut collector = StatsCollector::new(StatsConfig::finite());
1320        collector.process(&reply(0, 10, 10));
1321        let update = collector.process(&reply(1, 13, 13));
1322
1323        assert!(update.contributed_sample);
1324
1325        let pair = assert_one_ipdv_pair(&update, 0, 1, Duration::from_millis(3));
1326        assert!(pair.send_ipdv.is_some());
1327        assert!(pair.receive_ipdv.is_some());
1328    }
1329
1330    #[test]
1331    fn gap_fill_update_exposes_both_completed_ipdv_pairs() {
1332        let mut collector = StatsCollector::new(StatsConfig::finite());
1333
1334        let first = collector.process(&reply(0, 10, 10));
1335        let gap = collector.process(&reply(2, 20, 20));
1336        let fill = collector.process(&reply(1, 13, 13));
1337
1338        assert!(first.contributed_sample);
1339        assert!(first.ipdv_pairs.is_empty());
1340
1341        assert!(gap.contributed_sample);
1342        assert!(gap.ipdv_pairs.is_empty());
1343
1344        assert!(fill.contributed_sample);
1345        assert_eq!(fill.ipdv_pairs.len(), 2);
1346
1347        assert_eq!(fill.ipdv_pairs[0].previous_logical_seq, 0);
1348        assert_eq!(fill.ipdv_pairs[0].current_logical_seq, 1);
1349        assert_eq!(fill.ipdv_pairs[0].rtt_ipdv, Duration::from_millis(3));
1350
1351        assert_eq!(fill.ipdv_pairs[1].previous_logical_seq, 1);
1352        assert_eq!(fill.ipdv_pairs[1].current_logical_seq, 2);
1353        assert_eq!(fill.ipdv_pairs[1].rtt_ipdv, Duration::from_millis(7));
1354
1355        let snapshot = collector.cumulative();
1356        assert_eq!(snapshot.ipdv.round_trip.stats.count, 2);
1357        assert_eq!(snapshot.ipdv.round_trip.stats.total_ns, 10_000_000);
1358    }
1359
1360    #[test]
1361    fn server_processing_and_one_way_require_available_samples() {
1362        let mut collector = StatsCollector::new(StatsConfig::finite());
1363        collector.process(&reply(0, 10, 9));
1364        collector.process(&ClientEvent::LateReply {
1365            seq: 9,
1366            logical_seq: None,
1367            highest_seen: 10,
1368            remote: "127.0.0.1:2112".parse().unwrap(),
1369            sent_at: None,
1370            received_at: ts(100),
1371            rtt: None,
1372            server_timing: None,
1373            one_way: None,
1374            received_stats: None,
1375            bytes: 64,
1376            packet_meta: PacketMeta::default(),
1377        });
1378
1379        let snapshot = collector.cumulative();
1380        assert_eq!(snapshot.server_processing.processing.count, 1);
1381        assert_eq!(snapshot.one_way_delay.send_delay.stats.count, 1);
1382        assert_eq!(snapshot.events.untracked_late_replies, 1);
1383    }
1384
1385    #[test]
1386    fn rolling_count_eviction_recomputes_from_bounded_events() {
1387        let mut collector = StatsCollector::new(StatsConfig {
1388            median: MedianMode::Disabled,
1389            rolling_count: Some(2),
1390            rolling_time: None,
1391        });
1392        collector.process(&sent(0, 0, ts(0)));
1393        collector.process(&reply(0, 10, 10));
1394        collector.process(&reply(1, 20, 20));
1395
1396        let rolling = collector.rolling_count().unwrap();
1397        assert_eq!(rolling.packets.packets_sent, 0);
1398        assert_eq!(rolling.packets.unique_replies, 2);
1399        assert_eq!(rolling.rtt.primary.stats.count, 2);
1400    }
1401
1402    #[test]
1403    fn rolling_time_eviction_uses_event_timestamps() {
1404        let mut collector = StatsCollector::new(StatsConfig {
1405            median: MedianMode::Disabled,
1406            rolling_count: None,
1407            rolling_time: Some(Duration::from_millis(15)),
1408        });
1409        collector.process(&sent(0, 0, ts(0)));
1410        collector.process(&sent(1, 1, ts(10)));
1411        collector.process(&sent(2, 2, ts(30)));
1412
1413        let rolling = collector.rolling_time().unwrap();
1414        assert_eq!(rolling.packets.packets_sent, 1);
1415    }
1416
1417    #[test]
1418    fn empty_and_all_lost_edges_are_defined() {
1419        let empty = StatsCollector::new(StatsConfig::finite()).summary();
1420        assert_eq!(empty.loss.packet_loss_percent, 0.0);
1421
1422        let mut collector = StatsCollector::new(StatsConfig::finite());
1423        collector.process(&sent(0, 0, ts(0)));
1424        let all_lost = collector.summary();
1425        assert_eq!(all_lost.loss.lost_packets, 1);
1426        assert_eq!(all_lost.loss.packet_loss_percent, 100.0);
1427    }
1428
1429    #[test]
1430    fn directional_loss_uses_server_received_count_when_available() {
1431        let mut collector = StatsCollector::new(StatsConfig::finite());
1432        collector.process(&sent(0, 0, ts(0)));
1433        collector.process(&sent(1, 1, ts(10)));
1434        collector.process(&reply(0, 10, 10));
1435
1436        let loss = collector.summary().loss;
1437        assert_eq!(loss.upstream_loss_packets, Some(1));
1438        assert_eq!(loss.downstream_loss_packets, Some(0));
1439        assert_eq!(loss.upstream_loss_percent, 50.0);
1440    }
1441
1442    #[test]
1443    fn single_sample_stddev_is_zero() {
1444        let mut metric = MetricU64::new(false);
1445        metric.push(42);
1446        let stats = metric.stats();
1447        assert_eq!(stats.variance_ns2, 0.0);
1448        assert_eq!(stats.stddev_ns(), 0.0);
1449    }
1450
1451    #[test]
1452    fn system_time_before_epoch_is_supported() {
1453        let before = UNIX_EPOCH - Duration::from_nanos(7);
1454        assert_eq!(system_time_ns(before), Some(-7));
1455        let after = UNIX_EPOCH + Duration::from_nanos(7);
1456        assert_eq!(system_time_ns(after), Some(7));
1457        let now = SystemTime::now();
1458        assert!(system_time_ns(now).is_some());
1459    }
1460}