irtt-stats 0.3.0

Statistics aggregation for irtt-client events
Documentation
use std::time::Duration;

use crate::ipdv::{IpdvSample, IpdvTracker};
use crate::loss::loss_stats;
use crate::normalization::{ReplySample, StatsEvent};
use crate::time_stats::TimeMetric;
use crate::{
    EventCounts, EventStatsUpdate, IpdvPairUpdate, IpdvStats, OneWayDelayStats, PacketCounts,
    RttStats, SampleMode, ServerProcessingStats, Snapshot,
};

const CONTINUOUS_SEQUENCE_LIMIT: usize = 4096;

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct CoreStats {
    events: EventCounts,
    packets: PacketCounts,
    send_call: TimeMetric,
    timer_error: TimeMetric,
    rtt_primary: TimeMetric,
    rtt_raw: TimeMetric,
    rtt_adjusted: TimeMetric,
    ipdv_round_trip: TimeMetric,
    ipdv_send: TimeMetric,
    ipdv_receive: TimeMetric,
    send_delay: TimeMetric,
    receive_delay: TimeMetric,
    server_processing: TimeMetric,
    ipdv_tracker: IpdvTracker,
}

impl CoreStats {
    pub(crate) fn new(sample_mode: SampleMode) -> Self {
        let sequence_limit = if sample_mode == SampleMode::Exact {
            None
        } else {
            Some(CONTINUOUS_SEQUENCE_LIMIT)
        };

        Self {
            events: EventCounts::default(),
            packets: PacketCounts::default(),
            send_call: TimeMetric::new(false),
            timer_error: TimeMetric::new(false),
            rtt_primary: TimeMetric::new(sample_mode == SampleMode::Exact),
            rtt_raw: TimeMetric::new(sample_mode == SampleMode::Exact),
            rtt_adjusted: TimeMetric::new(sample_mode == SampleMode::Exact),
            ipdv_round_trip: TimeMetric::new(sample_mode == SampleMode::Exact),
            ipdv_send: TimeMetric::new(sample_mode == SampleMode::Exact),
            ipdv_receive: TimeMetric::new(sample_mode == SampleMode::Exact),
            send_delay: TimeMetric::new(sample_mode == SampleMode::Exact),
            receive_delay: TimeMetric::new(sample_mode == SampleMode::Exact),
            server_processing: TimeMetric::new(false),
            ipdv_tracker: IpdvTracker::new(sequence_limit),
        }
    }

    pub(crate) fn apply(&mut self, event: StatsEvent) -> EventStatsUpdate {
        match event {
            StatsEvent::Sent {
                bytes,
                send_call_ns,
                timer_error_ns,
                ..
            } => {
                self.apply_sent(bytes, send_call_ns, timer_error_ns);
                EventStatsUpdate::default()
            }
            StatsEvent::UniqueReply {
                is_late, sample, ..
            } => self.apply_unique_reply(is_late, *sample),
            StatsEvent::DuplicateReply { bytes, .. } => {
                self.apply_duplicate_reply(bytes);
                EventStatsUpdate::default()
            }
            StatsEvent::Loss { .. } => {
                self.apply_loss();
                EventStatsUpdate::default()
            }
            StatsEvent::Warning { .. } => {
                self.apply_warning();
                EventStatsUpdate::default()
            }
            StatsEvent::UntrackedLate { bytes, .. } => {
                self.apply_untracked_late(bytes);
                EventStatsUpdate::default()
            }
        }
    }

    fn apply_sent(&mut self, bytes: usize, send_call_ns: i128, timer_error_ns: i128) {
        self.events.sent_events += 1;
        self.packets.packets_sent += 1;
        self.packets.bytes_sent = self.packets.bytes_sent.saturating_add(bytes as u64);
        self.send_call.push_ns(send_call_ns);
        self.timer_error.push_ns(timer_error_ns);
    }

    fn apply_unique_reply(&mut self, is_late: bool, sample: ReplySample) -> EventStatsUpdate {
        let mut update = EventStatsUpdate {
            contributed_sample: true,
            ..EventStatsUpdate::default()
        };

        self.account_unique_reply(is_late, &sample);
        self.record_reply_metrics(&sample);
        update.ipdv_pairs = self.apply_ipdv_sample(sample.ipdv);

        update
    }

    fn account_unique_reply(&mut self, is_late: bool, sample: &ReplySample) {
        self.events.echo_replies += u64::from(!is_late);
        self.events.late_unique_replies += u64::from(is_late);
        self.packets.packets_received += 1;
        self.packets.unique_replies += 1;
        self.packets.late_packets += u64::from(is_late);
        self.packets.bytes_received = self
            .packets
            .bytes_received
            .saturating_add(sample.bytes as u64);

        if let Some(count) = sample.received_count {
            let count = u64::from(count);
            self.packets.server_packets_received = Some(
                self.packets
                    .server_packets_received
                    .map_or(count, |current| current.max(count)),
            );
        }
        if let Some(window) = sample.received_window {
            self.packets.server_received_window = Some(window);
        }
    }

    fn record_reply_metrics(&mut self, sample: &ReplySample) {
        self.rtt_primary.push_ns(sample.ipdv.rtt_primary_ns);
        self.rtt_raw.push_ns(sample.rtt_raw_ns);
        if let Some(adjusted) = sample.rtt_adjusted_ns {
            self.rtt_adjusted.push_ns(adjusted);
        }
        if let Some(processing) = sample.server_processing_ns {
            self.server_processing.push_ns(processing);
        }
        if let Some(delay) = sample.send_delay_ns {
            self.send_delay.push_ns(delay);
        }
        if let Some(delay) = sample.receive_delay_ns {
            self.receive_delay.push_ns(delay);
        }
    }

    fn apply_ipdv_sample(&mut self, sample: IpdvSample) -> Vec<IpdvPairUpdate> {
        let mut updates = Vec::new();

        for pair in self.ipdv_tracker.insert(sample) {
            let Some(rtt_ipdv) = duration_from_non_negative_i128_ns(pair.rtt_ipdv_ns) else {
                continue;
            };
            let send_ipdv = pair
                .send_ipdv_ns
                .and_then(duration_from_non_negative_i128_ns);
            let receive_ipdv = pair
                .receive_ipdv_ns
                .and_then(duration_from_non_negative_i128_ns);

            self.ipdv_round_trip.push_ns(pair.rtt_ipdv_ns);

            if let Some(value) = pair.send_ipdv_ns {
                self.ipdv_send.push_ns(value);
            }

            if let Some(value) = pair.receive_ipdv_ns {
                self.ipdv_receive.push_ns(value);
            }

            updates.push(IpdvPairUpdate {
                previous_seq: pair.previous_seq,
                current_seq: pair.current_seq,
                rtt_ipdv,
                send_ipdv,
                receive_ipdv,
            });
        }

        updates
    }

    fn apply_duplicate_reply(&mut self, bytes: usize) {
        self.events.duplicate_replies += 1;
        self.packets.packets_received += 1;
        self.packets.duplicates += 1;
        self.packets.bytes_received = self.packets.bytes_received.saturating_add(bytes as u64);
    }

    fn apply_loss(&mut self) {
        self.events.loss_events += 1;
    }

    fn apply_warning(&mut self) {
        self.events.warning_events += 1;
    }

    fn apply_untracked_late(&mut self, bytes: usize) {
        self.events.untracked_late_replies += 1;
        self.packets.packets_received += 1;
        self.packets.late_packets += 1;
        self.packets.bytes_received = self.packets.bytes_received.saturating_add(bytes as u64);
    }

    pub(crate) fn snapshot(&self) -> Snapshot {
        let packets = self.packets;
        Snapshot {
            events: self.events,
            packets,
            loss: loss_stats(packets),
            send_call: self.send_call.stats(),
            timer_error: self.timer_error.stats(),
            rtt: self.rtt_stats(),
            ipdv: self.ipdv_stats(),
            one_way_delay: self.one_way_delay_stats(),
            server_processing: self.server_processing_stats(),
        }
    }

    fn rtt_stats(&self) -> RttStats {
        RttStats {
            primary: self.rtt_primary.stats(),
            raw: self.rtt_raw.stats(),
            adjusted: self.rtt_adjusted.stats(),
        }
    }

    fn ipdv_stats(&self) -> IpdvStats {
        IpdvStats {
            round_trip: self.ipdv_round_trip.stats(),
            send: self.ipdv_send.stats(),
            receive: self.ipdv_receive.stats(),
        }
    }

    fn one_way_delay_stats(&self) -> OneWayDelayStats {
        OneWayDelayStats {
            send_delay: self.send_delay.stats(),
            receive_delay: self.receive_delay.stats(),
        }
    }

    fn server_processing_stats(&self) -> ServerProcessingStats {
        ServerProcessingStats {
            processing: self.server_processing.stats(),
        }
    }
}

fn duration_from_non_negative_i128_ns(value: i128) -> Option<Duration> {
    u64::try_from(value).ok().map(Duration::from_nanos)
}