irtt-stats 0.3.0

Statistics aggregation for irtt-client events
Documentation
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use irtt_client::{
    ClientEvent, ClientTimestamp, OneWayDelaySample, ReceivedStatsSample, RttSample, ServerTiming,
    SignedDuration,
};

use crate::ipdv::IpdvSample;

#[derive(Debug, Clone, PartialEq)]
pub(crate) enum StatsEvent {
    Sent {
        at: Instant,
        bytes: usize,
        send_call_ns: i128,
        timer_error_ns: i128,
    },
    UniqueReply {
        at: Instant,
        is_late: bool,
        sample: Box<ReplySample>,
    },
    DuplicateReply {
        at: Instant,
        bytes: usize,
    },
    Loss {
        at: Instant,
    },
    Warning {
        at: Instant,
    },
    UntrackedLate {
        at: Instant,
        bytes: usize,
    },
}

pub(crate) fn normalize_event(event: &ClientEvent) -> Option<StatsEvent> {
    match event {
        ClientEvent::EchoSent {
            sent_at,
            bytes,
            send_call,
            timer_error,
            ..
        } => Some(StatsEvent::Sent {
            at: sent_at.mono,
            bytes: *bytes,
            send_call_ns: duration_ns_i128(*send_call),
            timer_error_ns: duration_ns_i128(*timer_error),
        }),
        ClientEvent::EchoReply {
            seq,
            sent_at,
            received_at,
            rtt,
            server_timing,
            one_way,
            received_stats,
            bytes,
            ..
        } => Some(StatsEvent::UniqueReply {
            at: received_at.mono,
            is_late: false,
            sample: Box::new(ReplySample::from_reply_parts(
                *seq,
                *sent_at,
                *received_at,
                *rtt,
                *server_timing,
                *one_way,
                *received_stats,
                *bytes,
            )),
        }),
        ClientEvent::LateReply {
            seq,
            sent_at: Some(sent_at),
            received_at,
            rtt: Some(rtt),
            server_timing,
            one_way,
            received_stats,
            bytes,
            ..
        } => Some(StatsEvent::UniqueReply {
            at: received_at.mono,
            is_late: true,
            sample: Box::new(ReplySample::from_reply_parts(
                *seq,
                *sent_at,
                *received_at,
                *rtt,
                *server_timing,
                *one_way,
                *received_stats,
                *bytes,
            )),
        }),
        ClientEvent::LateReply {
            received_at, bytes, ..
        } => Some(StatsEvent::UntrackedLate {
            at: received_at.mono,
            bytes: *bytes,
        }),
        ClientEvent::DuplicateReply {
            received_at, bytes, ..
        } => Some(StatsEvent::DuplicateReply {
            at: received_at.mono,
            bytes: *bytes,
        }),
        ClientEvent::EchoLoss { timeout_at, .. } => Some(StatsEvent::Loss { at: *timeout_at }),
        ClientEvent::Warning { at, .. } => Some(StatsEvent::Warning { at: at.mono }),
        ClientEvent::SessionStarted { .. }
        | ClientEvent::NoTestCompleted { .. }
        | ClientEvent::SessionClosed { .. } => None,
    }
}

impl StatsEvent {
    pub(crate) fn at(&self) -> Instant {
        match self {
            Self::Sent { at, .. }
            | Self::UniqueReply { at, .. }
            | Self::DuplicateReply { at, .. }
            | Self::Loss { at }
            | Self::Warning { at }
            | Self::UntrackedLate { at, .. } => *at,
        }
    }
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ReplySample {
    pub(crate) bytes: usize,
    pub(crate) rtt_raw_ns: i128,
    pub(crate) rtt_adjusted_ns: Option<i128>,
    pub(crate) send_delay_ns: Option<i128>,
    pub(crate) receive_delay_ns: Option<i128>,
    pub(crate) server_processing_ns: Option<i128>,
    pub(crate) received_count: Option<u32>,
    pub(crate) received_window: Option<u64>,
    pub(crate) ipdv: IpdvSample,
}

impl ReplySample {
    #[allow(clippy::too_many_arguments)]
    fn from_reply_parts(
        seq: u32,
        sent_at: ClientTimestamp,
        received_at: ClientTimestamp,
        rtt: RttSample,
        server_timing: Option<ServerTiming>,
        one_way: Option<OneWayDelaySample>,
        received_stats: Option<ReceivedStatsSample>,
        bytes: usize,
    ) -> Self {
        let rtt_primary_ns = signed_duration_ns(rtt.effective);
        Self {
            bytes,
            rtt_raw_ns: duration_ns_i128(rtt.raw),
            rtt_adjusted_ns: rtt.adjusted.map(signed_duration_ns),
            send_delay_ns: one_way
                .and_then(|sample| sample.client_to_server)
                .map(signed_duration_ns),
            receive_delay_ns: one_way
                .and_then(|sample| sample.server_to_client)
                .map(signed_duration_ns),
            server_processing_ns: server_timing
                .and_then(|timing| timing.processing)
                .map(duration_ns_i128),
            received_count: received_stats.and_then(|stats| stats.count),
            received_window: received_stats.and_then(|stats| stats.window),
            ipdv: IpdvSample {
                seq,
                rtt_primary_ns,
                client_send_mono: sent_at.mono,
                client_receive_mono: received_at.mono,
                client_send_wall_ns: system_time_ns(sent_at.wall),
                client_receive_wall_ns: system_time_ns(received_at.wall),
                server_receive_mono_ns: server_timing.and_then(|timing| timing.receive_mono_ns),
                server_send_mono_ns: server_timing.and_then(|timing| timing.send_mono_ns),
                server_receive_wall_ns: server_timing.and_then(|timing| timing.receive_wall_ns),
                server_send_wall_ns: server_timing.and_then(|timing| timing.send_wall_ns),
            },
        }
    }
}

fn system_time_ns(time: SystemTime) -> Option<i128> {
    if let Ok(duration) = time.duration_since(UNIX_EPOCH) {
        return Some(duration_ns_i128(duration));
    }
    UNIX_EPOCH
        .duration_since(time)
        .ok()
        .map(|duration| -duration_ns_i128(duration))
}

fn duration_ns_i128(duration: Duration) -> i128 {
    i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX)
}

fn signed_duration_ns(duration: SignedDuration) -> i128 {
    duration.as_nanos()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn system_time_before_epoch_is_supported() {
        let before = UNIX_EPOCH - Duration::from_nanos(7);
        assert_eq!(system_time_ns(before), Some(-7));
        let after = UNIX_EPOCH + Duration::from_nanos(7);
        assert_eq!(system_time_ns(after), Some(7));
        let now = SystemTime::now();
        assert!(system_time_ns(now).is_some());
    }
}