stochastic-routing-extended 1.0.2

SRX (Stochastic Routing eXtended) — a next-generation VPN protocol with stochastic routing, DPI evasion, post-quantum cryptography, and multi-transport channel splitting
Documentation
//! Lightweight metrics for SRX pipeline and transport observability.
//!
//! All counters are lock-free atomics, safe to read from any thread.
//! Designed for embedding into `SrxPipeline` and `TransportManager`.

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use crate::replay_storage::{ReplayStoreMetricsSnapshot, replay_store_metrics_snapshot};
use crate::transport::TransportKind;

/// Atomic counter-pair for bytes and packets.
#[derive(Debug)]
pub struct Counter {
    bytes: AtomicU64,
    packets: AtomicU64,
}

impl Counter {
    pub fn new() -> Self {
        Self {
            bytes: AtomicU64::new(0),
            packets: AtomicU64::new(0),
        }
    }

    /// Record one operation of `size` bytes.
    pub fn record(&self, size: u64) {
        self.bytes.fetch_add(size, Ordering::Relaxed);
        self.packets.fetch_add(1, Ordering::Relaxed);
    }

    pub fn bytes(&self) -> u64 {
        self.bytes.load(Ordering::Relaxed)
    }

    pub fn packets(&self) -> u64 {
        self.packets.load(Ordering::Relaxed)
    }

    /// Reset to zero and return previous values.
    pub fn reset(&self) -> (u64, u64) {
        let b = self.bytes.swap(0, Ordering::Relaxed);
        let p = self.packets.swap(0, Ordering::Relaxed);
        (b, p)
    }
}

impl Default for Counter {
    fn default() -> Self {
        Self::new()
    }
}

/// Exponentially weighted moving average for latency tracking.
#[derive(Debug)]
pub struct LatencyTracker {
    /// Smoothed RTT in microseconds (EWMA).
    srtt_us: AtomicU64,
    /// Minimum observed latency in microseconds.
    min_us: AtomicU64,
    /// Maximum observed latency in microseconds.
    max_us: AtomicU64,
    /// Number of samples.
    samples: AtomicU64,
}

impl LatencyTracker {
    pub fn new() -> Self {
        Self {
            srtt_us: AtomicU64::new(0),
            min_us: AtomicU64::new(u64::MAX),
            max_us: AtomicU64::new(0),
            samples: AtomicU64::new(0),
        }
    }

    /// Record a latency sample.
    pub fn record(&self, duration: Duration) {
        let us = duration.as_micros() as u64;
        self.samples.fetch_add(1, Ordering::Relaxed);

        // Update min (CAS loop).
        let mut current_min = self.min_us.load(Ordering::Relaxed);
        while us < current_min {
            match self.min_us.compare_exchange_weak(
                current_min,
                us,
                Ordering::Relaxed,
                Ordering::Relaxed,
            ) {
                Ok(_) => break,
                Err(v) => current_min = v,
            }
        }

        // Update max (CAS loop).
        let mut current_max = self.max_us.load(Ordering::Relaxed);
        while us > current_max {
            match self.max_us.compare_exchange_weak(
                current_max,
                us,
                Ordering::Relaxed,
                Ordering::Relaxed,
            ) {
                Ok(_) => break,
                Err(v) => current_max = v,
            }
        }

        // EWMA: srtt = 7/8 * srtt + 1/8 * sample (avoid floats by using fixed-point).
        let old = self.srtt_us.load(Ordering::Relaxed);
        if old == 0 {
            self.srtt_us.store(us, Ordering::Relaxed);
        } else {
            // new = old * 7/8 + us * 1/8 = (old * 7 + us) / 8
            let new_srtt = (old * 7 + us) / 8;
            self.srtt_us.store(new_srtt, Ordering::Relaxed);
        }
    }

    /// Smoothed RTT.
    pub fn srtt(&self) -> Duration {
        Duration::from_micros(self.srtt_us.load(Ordering::Relaxed))
    }

    /// Minimum observed latency.
    pub fn min(&self) -> Duration {
        let v = self.min_us.load(Ordering::Relaxed);
        if v == u64::MAX {
            Duration::ZERO
        } else {
            Duration::from_micros(v)
        }
    }

    /// Maximum observed latency.
    pub fn max(&self) -> Duration {
        Duration::from_micros(self.max_us.load(Ordering::Relaxed))
    }

    /// Number of samples recorded.
    pub fn samples(&self) -> u64 {
        self.samples.load(Ordering::Relaxed)
    }
}

impl Default for LatencyTracker {
    fn default() -> Self {
        Self::new()
    }
}

/// Per-transport metrics.
#[derive(Debug)]
pub struct TransportMetrics {
    pub kind: TransportKind,
    pub sent: Counter,
    pub received: Counter,
    pub errors: AtomicU64,
    pub reconnections: AtomicU64,
    pub latency: LatencyTracker,
}

impl TransportMetrics {
    pub fn new(kind: TransportKind) -> Self {
        Self {
            kind,
            sent: Counter::new(),
            received: Counter::new(),
            errors: AtomicU64::new(0),
            reconnections: AtomicU64::new(0),
            latency: LatencyTracker::new(),
        }
    }

    pub fn record_send(&self, bytes: u64, latency: Duration) {
        self.sent.record(bytes);
        self.latency.record(latency);
    }

    pub fn record_recv(&self, bytes: u64) {
        self.received.record(bytes);
    }

    pub fn record_error(&self) {
        self.errors.fetch_add(1, Ordering::Relaxed);
    }

    pub fn record_reconnect(&self) {
        self.reconnections.fetch_add(1, Ordering::Relaxed);
    }

    pub fn errors(&self) -> u64 {
        self.errors.load(Ordering::Relaxed)
    }

    pub fn reconnections(&self) -> u64 {
        self.reconnections.load(Ordering::Relaxed)
    }
}

/// Aggregate pipeline metrics across all transports.
#[derive(Debug)]
pub struct PipelineMetrics {
    /// Total bytes encrypted and sent.
    pub total_sent: Counter,
    /// Total bytes received and decrypted.
    pub total_received: Counter,
    /// Number of re-key operations performed.
    pub rekeys: AtomicU64,
    /// Number of cover traffic packets sent.
    pub cover_packets: AtomicU64,
    /// Number of signal packets sent/received.
    pub signals: AtomicU64,
    /// Pipeline start time.
    started_at: Instant,
}

impl PipelineMetrics {
    pub fn new() -> Self {
        Self {
            total_sent: Counter::new(),
            total_received: Counter::new(),
            rekeys: AtomicU64::new(0),
            cover_packets: AtomicU64::new(0),
            signals: AtomicU64::new(0),
            started_at: Instant::now(),
        }
    }

    pub fn record_rekey(&self) {
        self.rekeys.fetch_add(1, Ordering::Relaxed);
    }

    pub fn record_cover_packet(&self) {
        self.cover_packets.fetch_add(1, Ordering::Relaxed);
    }

    pub fn record_signal(&self) {
        self.signals.fetch_add(1, Ordering::Relaxed);
    }

    pub fn rekeys(&self) -> u64 {
        self.rekeys.load(Ordering::Relaxed)
    }

    pub fn cover_packets(&self) -> u64 {
        self.cover_packets.load(Ordering::Relaxed)
    }

    pub fn signals(&self) -> u64 {
        self.signals.load(Ordering::Relaxed)
    }

    /// Uptime since pipeline creation.
    pub fn uptime(&self) -> Duration {
        self.started_at.elapsed()
    }

    /// Snapshot of all metrics for logging/display.
    pub fn snapshot(&self) -> MetricsSnapshot {
        MetricsSnapshot {
            uptime: self.uptime(),
            sent_bytes: self.total_sent.bytes(),
            sent_packets: self.total_sent.packets(),
            recv_bytes: self.total_received.bytes(),
            recv_packets: self.total_received.packets(),
            rekeys: self.rekeys(),
            cover_packets: self.cover_packets(),
            signals: self.signals(),
            replay_store: replay_store_metrics_snapshot(),
        }
    }
}

impl Default for PipelineMetrics {
    fn default() -> Self {
        Self::new()
    }
}

/// Point-in-time snapshot of pipeline metrics.
#[derive(Debug, Clone)]
pub struct MetricsSnapshot {
    pub uptime: Duration,
    pub sent_bytes: u64,
    pub sent_packets: u64,
    pub recv_bytes: u64,
    pub recv_packets: u64,
    pub rekeys: u64,
    pub cover_packets: u64,
    pub signals: u64,
    pub replay_store: ReplayStoreMetricsSnapshot,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn counter_records_and_resets() {
        let c = Counter::new();
        c.record(100);
        c.record(200);
        assert_eq!(c.bytes(), 300);
        assert_eq!(c.packets(), 2);
        let (b, p) = c.reset();
        assert_eq!(b, 300);
        assert_eq!(p, 2);
        assert_eq!(c.bytes(), 0);
        assert_eq!(c.packets(), 0);
    }

    #[test]
    fn latency_tracker_ewma() {
        let t = LatencyTracker::new();
        t.record(Duration::from_millis(100));
        assert_eq!(t.srtt(), Duration::from_millis(100));
        assert_eq!(t.min(), Duration::from_millis(100));
        assert_eq!(t.max(), Duration::from_millis(100));

        // Second sample: (100*7 + 200) / 8 = 112.5 ms ≈ 112ms (integer)
        t.record(Duration::from_millis(200));
        let srtt = t.srtt().as_millis();
        assert!((112..=113).contains(&srtt), "srtt={srtt}");
        assert_eq!(t.min(), Duration::from_millis(100));
        assert_eq!(t.max(), Duration::from_millis(200));
        assert_eq!(t.samples(), 2);
    }

    #[test]
    fn latency_tracker_min_no_samples() {
        let t = LatencyTracker::new();
        assert_eq!(t.min(), Duration::ZERO);
        assert_eq!(t.samples(), 0);
    }

    #[test]
    fn transport_metrics_record() {
        let m = TransportMetrics::new(TransportKind::Tcp);
        m.record_send(1024, Duration::from_millis(5));
        m.record_recv(512);
        m.record_error();
        m.record_reconnect();

        assert_eq!(m.sent.bytes(), 1024);
        assert_eq!(m.sent.packets(), 1);
        assert_eq!(m.received.bytes(), 512);
        assert_eq!(m.received.packets(), 1);
        assert_eq!(m.errors(), 1);
        assert_eq!(m.reconnections(), 1);
        assert!(m.latency.srtt().as_millis() >= 4);
    }

    #[test]
    fn pipeline_metrics_snapshot() {
        let m = PipelineMetrics::new();
        m.total_sent.record(100);
        m.total_sent.record(200);
        m.total_received.record(150);
        m.record_rekey();
        m.record_cover_packet();
        m.record_signal();

        let snap = m.snapshot();
        assert_eq!(snap.sent_bytes, 300);
        assert_eq!(snap.sent_packets, 2);
        assert_eq!(snap.recv_bytes, 150);
        assert_eq!(snap.recv_packets, 1);
        assert_eq!(snap.rekeys, 1);
        assert_eq!(snap.cover_packets, 1);
        assert_eq!(snap.signals, 1);
        assert!(snap.replay_store.cas_attempts >= snap.replay_store.cas_successes);
        assert!(snap.replay_store.file.attempts >= snap.replay_store.file.successes);
        assert!(snap.uptime.as_nanos() > 0);
    }

    #[test]
    fn counter_default() {
        let c = Counter::default();
        assert_eq!(c.bytes(), 0);
        assert_eq!(c.packets(), 0);
    }

    #[test]
    fn pipeline_metrics_uptime_advances() {
        let m = PipelineMetrics::new();
        std::thread::sleep(Duration::from_millis(5));
        assert!(m.uptime().as_millis() >= 4);
    }
}