netpulse-cli 0.1.1

A zero-config, single-binary network quality monitor with percentile stats, jitter, and MTR-style traceroute
Documentation
// src/stats/engine.rs — Statistics Engine
//
// Computes network quality metrics from a snapshot of ProbeResults:
// - RTT: min, max, mean, p50, p90, p95, p99
// - Jitter: mean absolute deviation of successive RTT differences (RFC 3393)
// - Packet loss percentage
// - Burst loss: detection of consecutive packet loss runs
// - Reorder detection: packets arriving out of sequence

use crate::probers::ProbeResult;
use serde::Serialize;

/// A complete statistical snapshot for one target at one point in time.
#[derive(Debug, Clone, Serialize)]
pub struct StatsSnapshot {
    pub target: String,
    pub sample_count: usize,
    pub loss_count: usize,
    pub loss_pct: f64,

    // RTT stats in microseconds (None if no successful probes)
    pub rtt_min_us: Option<u64>,
    pub rtt_max_us: Option<u64>,
    pub rtt_mean_us: Option<f64>,
    pub rtt_p50_us: Option<u64>,
    pub rtt_p90_us: Option<u64>,
    pub rtt_p95_us: Option<u64>,
    pub rtt_p99_us: Option<u64>,

    /// Jitter in microseconds (RFC 3393 — mean of |RTT[n] - RTT[n-1]|)
    pub jitter_us: Option<f64>,

    /// Maximum consecutive-loss streak in the sample window
    pub max_burst_loss: usize,

    /// Number of out-of-order arrivals detected
    pub reorder_count: usize,
}

/// The stats engine: stateless, operates on owned snapshots.
pub struct StatsEngine;

impl StatsEngine {
    /// Compute a full StatsSnapshot from a Vec of ProbeResults.
    pub fn compute(target: &str, samples: Vec<ProbeResult>) -> StatsSnapshot {
        let sample_count = samples.len();

        if sample_count == 0 {
            return StatsSnapshot {
                target: target.to_string(),
                sample_count: 0,
                loss_count: 0,
                loss_pct: 0.0,
                rtt_min_us: None,
                rtt_max_us: None,
                rtt_mean_us: None,
                rtt_p50_us: None,
                rtt_p90_us: None,
                rtt_p95_us: None,
                rtt_p99_us: None,
                jitter_us: None,
                max_burst_loss: 0,
                reorder_count: 0,
            };
        }

        let loss_count = samples.iter().filter(|s| s.is_loss()).count();
        let loss_pct = (loss_count as f64 / sample_count as f64) * 100.0;

        // Collect successful RTTs
        let mut rtts: Vec<u64> = samples.iter().filter_map(|s| s.rtt_us).collect();

        let (rtt_min_us, rtt_max_us, rtt_mean_us, rtt_p50_us, rtt_p90_us, rtt_p95_us, rtt_p99_us) =
            if rtts.is_empty() {
                (None, None, None, None, None, None, None)
            } else {
                rtts.sort_unstable();
                let min = *rtts.first().unwrap();
                let max = *rtts.last().unwrap();
                let mean = rtts.iter().sum::<u64>() as f64 / rtts.len() as f64;

                let p50 = percentile(&rtts, 50.0);
                let p90 = percentile(&rtts, 90.0);
                let p95 = percentile(&rtts, 95.0);
                let p99 = percentile(&rtts, 99.0);

                (
                    Some(min),
                    Some(max),
                    Some(mean),
                    Some(p50),
                    Some(p90),
                    Some(p95),
                    Some(p99),
                )
            };

        // Jitter: mean absolute deviation of successive RTT differences (RFC 3393)
        // Only computed on the ordered sample stream (not the sorted RTT list)
        let ordered_rtts: Vec<u64> = samples.iter().filter_map(|s| s.rtt_us).collect();
        let jitter_us = if ordered_rtts.len() >= 2 {
            let diffs: Vec<f64> = ordered_rtts
                .windows(2)
                .map(|w| (w[1] as f64 - w[0] as f64).abs())
                .collect();
            Some(diffs.iter().sum::<f64>() / diffs.len() as f64)
        } else {
            None
        };

        // Burst loss: find the longest streak of consecutive losses
        let max_burst_loss = {
            let mut max_streak = 0usize;
            let mut current_streak = 0usize;
            for s in &samples {
                if s.is_loss() {
                    current_streak += 1;
                    max_streak = max_streak.max(current_streak);
                } else {
                    current_streak = 0;
                }
            }
            max_streak
        };

        // Reorder detection: count packets arriving with a seq lower than max seen
        let reorder_count = {
            let mut max_seq_seen: u64 = 0;
            let mut reorders = 0usize;
            for s in &samples {
                if s.seq < max_seq_seen {
                    reorders += 1;
                } else {
                    max_seq_seen = s.seq;
                }
            }
            reorders
        };

        StatsSnapshot {
            target: target.to_string(),
            sample_count,
            loss_count,
            loss_pct,
            rtt_min_us,
            rtt_max_us,
            rtt_mean_us,
            rtt_p50_us,
            rtt_p90_us,
            rtt_p95_us,
            rtt_p99_us,
            jitter_us,
            max_burst_loss,
            reorder_count,
        }
    }
}

/// Compute a percentile value from a sorted slice using linear interpolation.
fn percentile(sorted: &[u64], pct: f64) -> u64 {
    if sorted.is_empty() {
        return 0;
    }
    if sorted.len() == 1 {
        return sorted[0];
    }

    let rank = pct / 100.0 * (sorted.len() - 1) as f64;
    let lower = rank.floor() as usize;
    let upper = rank.ceil() as usize;

    if lower == upper {
        return sorted[lower];
    }

    let frac = rank - lower as f64;
    (sorted[lower] as f64 + frac * (sorted[upper] as f64 - sorted[lower] as f64)) as u64
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::probers::ProbeResult;
    use chrono::Utc;

    fn make_sample(rtt_us: Option<u64>, seq: u64) -> ProbeResult {
        ProbeResult {
            target: "8.8.8.8".to_string(),
            rtt_us,
            timestamp: Utc::now(),
            seq,
            responder_ip: None,
        }
    }

    #[test]
    fn test_percentiles_known_values() {
        // RTTs: 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 ms (in µs)
        let samples: Vec<ProbeResult> =
            (1..=10).map(|i| make_sample(Some(i * 10_000), i)).collect();

        let snap = StatsEngine::compute("test", samples);
        assert_eq!(snap.rtt_min_us, Some(10_000));
        assert_eq!(snap.rtt_max_us, Some(100_000));
        // With 10 samples [10k..100k], rank = 0.5 * 9 = 4.5 → interpolates to 55_000
        assert_eq!(snap.rtt_p50_us, Some(55_000));
        assert_eq!(snap.loss_pct, 0.0);
    }

    #[test]
    fn test_loss_counting() {
        let mut samples: Vec<ProbeResult> = (0..8).map(|i| make_sample(Some(10_000), i)).collect();
        samples.push(make_sample(None, 8));
        samples.push(make_sample(None, 9));

        let snap = StatsEngine::compute("test", samples);
        assert_eq!(snap.loss_count, 2);
        assert!((snap.loss_pct - 20.0).abs() < 0.001);
    }

    #[test]
    fn test_burst_loss() {
        let samples = vec![
            make_sample(Some(10_000), 0),
            make_sample(None, 1),
            make_sample(None, 2),
            make_sample(None, 3),
            make_sample(Some(10_000), 4),
        ];
        let snap = StatsEngine::compute("test", samples);
        assert_eq!(snap.max_burst_loss, 3);
    }

    #[test]
    fn test_jitter() {
        // RTTs: 10ms, 20ms, 15ms → diffs: 10, 5 → jitter = 7.5ms
        let samples = vec![
            make_sample(Some(10_000), 0),
            make_sample(Some(20_000), 1),
            make_sample(Some(15_000), 2),
        ];
        let snap = StatsEngine::compute("test", samples);
        let jitter = snap.jitter_us.unwrap();
        assert!((jitter - 7_500.0).abs() < 1.0, "jitter was {}", jitter);
    }

    #[test]
    fn test_empty_samples() {
        let snap = StatsEngine::compute("test", vec![]);
        assert_eq!(snap.sample_count, 0);
        assert!(snap.rtt_min_us.is_none());
    }
}