use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use crate::replay_storage::{ReplayStoreMetricsSnapshot, replay_store_metrics_snapshot};
use crate::transport::TransportKind;
#[derive(Debug)]
pub struct Counter {
bytes: AtomicU64,
packets: AtomicU64,
}
impl Counter {
pub fn new() -> Self {
Self {
bytes: AtomicU64::new(0),
packets: AtomicU64::new(0),
}
}
pub fn record(&self, size: u64) {
self.bytes.fetch_add(size, Ordering::Relaxed);
self.packets.fetch_add(1, Ordering::Relaxed);
}
pub fn bytes(&self) -> u64 {
self.bytes.load(Ordering::Relaxed)
}
pub fn packets(&self) -> u64 {
self.packets.load(Ordering::Relaxed)
}
pub fn reset(&self) -> (u64, u64) {
let b = self.bytes.swap(0, Ordering::Relaxed);
let p = self.packets.swap(0, Ordering::Relaxed);
(b, p)
}
}
impl Default for Counter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct LatencyTracker {
srtt_us: AtomicU64,
min_us: AtomicU64,
max_us: AtomicU64,
samples: AtomicU64,
}
impl LatencyTracker {
pub fn new() -> Self {
Self {
srtt_us: AtomicU64::new(0),
min_us: AtomicU64::new(u64::MAX),
max_us: AtomicU64::new(0),
samples: AtomicU64::new(0),
}
}
pub fn record(&self, duration: Duration) {
let us = duration.as_micros() as u64;
self.samples.fetch_add(1, Ordering::Relaxed);
let mut current_min = self.min_us.load(Ordering::Relaxed);
while us < current_min {
match self.min_us.compare_exchange_weak(
current_min,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(v) => current_min = v,
}
}
let mut current_max = self.max_us.load(Ordering::Relaxed);
while us > current_max {
match self.max_us.compare_exchange_weak(
current_max,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(v) => current_max = v,
}
}
let old = self.srtt_us.load(Ordering::Relaxed);
if old == 0 {
self.srtt_us.store(us, Ordering::Relaxed);
} else {
let new_srtt = (old * 7 + us) / 8;
self.srtt_us.store(new_srtt, Ordering::Relaxed);
}
}
pub fn srtt(&self) -> Duration {
Duration::from_micros(self.srtt_us.load(Ordering::Relaxed))
}
pub fn min(&self) -> Duration {
let v = self.min_us.load(Ordering::Relaxed);
if v == u64::MAX {
Duration::ZERO
} else {
Duration::from_micros(v)
}
}
pub fn max(&self) -> Duration {
Duration::from_micros(self.max_us.load(Ordering::Relaxed))
}
pub fn samples(&self) -> u64 {
self.samples.load(Ordering::Relaxed)
}
}
impl Default for LatencyTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct TransportMetrics {
pub kind: TransportKind,
pub sent: Counter,
pub received: Counter,
pub errors: AtomicU64,
pub reconnections: AtomicU64,
pub latency: LatencyTracker,
}
impl TransportMetrics {
pub fn new(kind: TransportKind) -> Self {
Self {
kind,
sent: Counter::new(),
received: Counter::new(),
errors: AtomicU64::new(0),
reconnections: AtomicU64::new(0),
latency: LatencyTracker::new(),
}
}
pub fn record_send(&self, bytes: u64, latency: Duration) {
self.sent.record(bytes);
self.latency.record(latency);
}
pub fn record_recv(&self, bytes: u64) {
self.received.record(bytes);
}
pub fn record_error(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
pub fn record_reconnect(&self) {
self.reconnections.fetch_add(1, Ordering::Relaxed);
}
pub fn errors(&self) -> u64 {
self.errors.load(Ordering::Relaxed)
}
pub fn reconnections(&self) -> u64 {
self.reconnections.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct PipelineMetrics {
pub total_sent: Counter,
pub total_received: Counter,
pub rekeys: AtomicU64,
pub cover_packets: AtomicU64,
pub signals: AtomicU64,
started_at: Instant,
}
impl PipelineMetrics {
pub fn new() -> Self {
Self {
total_sent: Counter::new(),
total_received: Counter::new(),
rekeys: AtomicU64::new(0),
cover_packets: AtomicU64::new(0),
signals: AtomicU64::new(0),
started_at: Instant::now(),
}
}
pub fn record_rekey(&self) {
self.rekeys.fetch_add(1, Ordering::Relaxed);
}
pub fn record_cover_packet(&self) {
self.cover_packets.fetch_add(1, Ordering::Relaxed);
}
pub fn record_signal(&self) {
self.signals.fetch_add(1, Ordering::Relaxed);
}
pub fn rekeys(&self) -> u64 {
self.rekeys.load(Ordering::Relaxed)
}
pub fn cover_packets(&self) -> u64 {
self.cover_packets.load(Ordering::Relaxed)
}
pub fn signals(&self) -> u64 {
self.signals.load(Ordering::Relaxed)
}
pub fn uptime(&self) -> Duration {
self.started_at.elapsed()
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
uptime: self.uptime(),
sent_bytes: self.total_sent.bytes(),
sent_packets: self.total_sent.packets(),
recv_bytes: self.total_received.bytes(),
recv_packets: self.total_received.packets(),
rekeys: self.rekeys(),
cover_packets: self.cover_packets(),
signals: self.signals(),
replay_store: replay_store_metrics_snapshot(),
}
}
}
impl Default for PipelineMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MetricsSnapshot {
pub uptime: Duration,
pub sent_bytes: u64,
pub sent_packets: u64,
pub recv_bytes: u64,
pub recv_packets: u64,
pub rekeys: u64,
pub cover_packets: u64,
pub signals: u64,
pub replay_store: ReplayStoreMetricsSnapshot,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn counter_records_and_resets() {
let c = Counter::new();
c.record(100);
c.record(200);
assert_eq!(c.bytes(), 300);
assert_eq!(c.packets(), 2);
let (b, p) = c.reset();
assert_eq!(b, 300);
assert_eq!(p, 2);
assert_eq!(c.bytes(), 0);
assert_eq!(c.packets(), 0);
}
#[test]
fn latency_tracker_ewma() {
let t = LatencyTracker::new();
t.record(Duration::from_millis(100));
assert_eq!(t.srtt(), Duration::from_millis(100));
assert_eq!(t.min(), Duration::from_millis(100));
assert_eq!(t.max(), Duration::from_millis(100));
t.record(Duration::from_millis(200));
let srtt = t.srtt().as_millis();
assert!((112..=113).contains(&srtt), "srtt={srtt}");
assert_eq!(t.min(), Duration::from_millis(100));
assert_eq!(t.max(), Duration::from_millis(200));
assert_eq!(t.samples(), 2);
}
#[test]
fn latency_tracker_min_no_samples() {
let t = LatencyTracker::new();
assert_eq!(t.min(), Duration::ZERO);
assert_eq!(t.samples(), 0);
}
#[test]
fn transport_metrics_record() {
let m = TransportMetrics::new(TransportKind::Tcp);
m.record_send(1024, Duration::from_millis(5));
m.record_recv(512);
m.record_error();
m.record_reconnect();
assert_eq!(m.sent.bytes(), 1024);
assert_eq!(m.sent.packets(), 1);
assert_eq!(m.received.bytes(), 512);
assert_eq!(m.received.packets(), 1);
assert_eq!(m.errors(), 1);
assert_eq!(m.reconnections(), 1);
assert!(m.latency.srtt().as_millis() >= 4);
}
#[test]
fn pipeline_metrics_snapshot() {
let m = PipelineMetrics::new();
m.total_sent.record(100);
m.total_sent.record(200);
m.total_received.record(150);
m.record_rekey();
m.record_cover_packet();
m.record_signal();
let snap = m.snapshot();
assert_eq!(snap.sent_bytes, 300);
assert_eq!(snap.sent_packets, 2);
assert_eq!(snap.recv_bytes, 150);
assert_eq!(snap.recv_packets, 1);
assert_eq!(snap.rekeys, 1);
assert_eq!(snap.cover_packets, 1);
assert_eq!(snap.signals, 1);
assert!(snap.replay_store.cas_attempts >= snap.replay_store.cas_successes);
assert!(snap.replay_store.file.attempts >= snap.replay_store.file.successes);
assert!(snap.uptime.as_nanos() > 0);
}
#[test]
fn counter_default() {
let c = Counter::default();
assert_eq!(c.bytes(), 0);
assert_eq!(c.packets(), 0);
}
#[test]
fn pipeline_metrics_uptime_advances() {
let m = PipelineMetrics::new();
std::thread::sleep(Duration::from_millis(5));
assert!(m.uptime().as_millis() >= 4);
}
}