use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
const MAX_LATENCY_SAMPLES: usize = 1000;
#[derive(Debug, Default)]
pub struct ProtocolMetrics {
latency_samples: RwLock<VecDeque<u64>>,
roundtrip_samples: RwLock<VecDeque<u64>>,
messages_sent: AtomicU64,
messages_failed: AtomicU64,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
bytes_uncompressed: AtomicU64,
bytes_compressed: AtomicU64,
connection_start: RwLock<Option<Instant>>,
last_activity: RwLock<Option<Instant>>,
}
impl ProtocolMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_connection_start(&self) {
let mut start = self.connection_start.write().expect("metrics lock poisoned");
*start = Some(Instant::now());
let mut activity = self.last_activity.write().expect("metrics lock poisoned");
*activity = Some(Instant::now());
}
pub fn record_message_sent(&self, bytes: u64) {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
let mut activity = self.last_activity.write().expect("metrics lock poisoned");
*activity = Some(Instant::now());
}
pub fn record_message_failed(&self) {
self.messages_failed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_bytes_received(&self, bytes: u64) {
self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
let mut activity = self.last_activity.write().expect("metrics lock poisoned");
*activity = Some(Instant::now());
}
pub fn record_compression(&self, uncompressed: u64, compressed: u64) {
self.bytes_uncompressed.fetch_add(uncompressed, Ordering::Relaxed);
self.bytes_compressed.fetch_add(compressed, Ordering::Relaxed);
}
pub fn record_latency(&self, latency: Duration) {
let ms = latency.as_millis() as u64;
let mut samples = self.latency_samples.write().expect("metrics lock poisoned");
if samples.len() >= MAX_LATENCY_SAMPLES {
samples.pop_front();
}
samples.push_back(ms);
}
pub fn record_roundtrip(&self, roundtrip: Duration) {
let ms = roundtrip.as_millis() as u64;
let mut samples = self.roundtrip_samples.write().expect("metrics lock poisoned");
if samples.len() >= MAX_LATENCY_SAMPLES {
samples.pop_front();
}
samples.push_back(ms);
}
pub fn snapshot(&self) -> MetricsSnapshot {
let latency_samples = self.latency_samples.read().expect("metrics lock poisoned");
let roundtrip_samples = self.roundtrip_samples.read().expect("metrics lock poisoned");
let connection_start = self.connection_start.read().expect("metrics lock poisoned");
let last_activity = self.last_activity.read().expect("metrics lock poisoned");
let uptime_secs = connection_start
.map(|s| s.elapsed().as_secs())
.unwrap_or(0);
let idle_secs = last_activity
.map(|s| s.elapsed().as_secs())
.unwrap_or(0);
let bytes_uncompressed = self.bytes_uncompressed.load(Ordering::Relaxed);
let bytes_compressed = self.bytes_compressed.load(Ordering::Relaxed);
let compression_ratio = if bytes_uncompressed > 0 {
bytes_compressed as f64 / bytes_uncompressed as f64
} else {
1.0
};
MetricsSnapshot {
messages_sent: self.messages_sent.load(Ordering::Relaxed),
messages_failed: self.messages_failed.load(Ordering::Relaxed),
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
bytes_received: self.bytes_received.load(Ordering::Relaxed),
compression_ratio,
latency_p50: percentile(&latency_samples, 50),
latency_p95: percentile(&latency_samples, 95),
latency_p99: percentile(&latency_samples, 99),
roundtrip_p50: percentile(&roundtrip_samples, 50),
roundtrip_p95: percentile(&roundtrip_samples, 95),
roundtrip_p99: percentile(&roundtrip_samples, 99),
uptime_secs,
idle_secs,
}
}
pub fn reset(&self) {
self.latency_samples.write().expect("metrics lock poisoned").clear();
self.roundtrip_samples.write().expect("metrics lock poisoned").clear();
self.messages_sent.store(0, Ordering::Relaxed);
self.messages_failed.store(0, Ordering::Relaxed);
self.bytes_sent.store(0, Ordering::Relaxed);
self.bytes_received.store(0, Ordering::Relaxed);
self.bytes_uncompressed.store(0, Ordering::Relaxed);
self.bytes_compressed.store(0, Ordering::Relaxed);
*self.connection_start.write().expect("metrics lock poisoned") = None;
*self.last_activity.write().expect("metrics lock poisoned") = None;
}
}
fn percentile(samples: &VecDeque<u64>, p: u32) -> Option<u64> {
if samples.is_empty() {
return None;
}
let mut sorted: Vec<_> = samples.iter().copied().collect();
sorted.sort_unstable();
let index = ((p as f64 / 100.0) * (sorted.len() - 1) as f64).round() as usize;
Some(sorted[index])
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub messages_sent: u64,
pub messages_failed: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub compression_ratio: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub latency_p50: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub latency_p95: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub latency_p99: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roundtrip_p50: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roundtrip_p95: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roundtrip_p99: Option<u64>,
pub uptime_secs: u64,
pub idle_secs: u64,
}
impl Default for MetricsSnapshot {
fn default() -> Self {
Self {
messages_sent: 0,
messages_failed: 0,
bytes_sent: 0,
bytes_received: 0,
compression_ratio: 1.0,
latency_p50: None,
latency_p95: None,
latency_p99: None,
roundtrip_p50: None,
roundtrip_p95: None,
roundtrip_p99: None,
uptime_secs: 0,
idle_secs: 0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConnectionQuality {
Excellent,
Good,
Fair,
Poor,
Unknown,
}
impl MetricsSnapshot {
pub fn connection_quality(&self) -> ConnectionQuality {
if self.messages_sent < 10 {
return ConnectionQuality::Unknown;
}
let error_rate = if self.messages_sent > 0 {
self.messages_failed as f64 / self.messages_sent as f64
} else {
0.0
};
let latency = self.latency_p95.unwrap_or(0);
if error_rate > 0.10 || latency > 250 {
ConnectionQuality::Poor
} else if error_rate > 0.05 || latency > 100 {
ConnectionQuality::Fair
} else if error_rate > 0.01 || latency > 50 {
ConnectionQuality::Good
} else {
ConnectionQuality::Excellent
}
}
pub fn throughput_bps(&self) -> f64 {
if self.uptime_secs > 0 {
(self.bytes_sent + self.bytes_received) as f64 / self.uptime_secs as f64
} else {
0.0
}
}
pub fn messages_per_second(&self) -> f64 {
if self.uptime_secs > 0 {
self.messages_sent as f64 / self.uptime_secs as f64
} else {
0.0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_recording() {
let metrics = ProtocolMetrics::new();
metrics.record_connection_start();
metrics.record_message_sent(100);
metrics.record_message_sent(200);
metrics.record_message_failed();
metrics.record_bytes_received(150);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.messages_sent, 2);
assert_eq!(snapshot.messages_failed, 1);
assert_eq!(snapshot.bytes_sent, 300);
assert_eq!(snapshot.bytes_received, 150);
}
#[test]
fn test_latency_percentiles() {
let metrics = ProtocolMetrics::new();
for i in 1..=100 {
metrics.record_latency(Duration::from_millis(i));
}
let snapshot = metrics.snapshot();
let p50 = snapshot.latency_p50.unwrap();
let p95 = snapshot.latency_p95.unwrap();
let p99 = snapshot.latency_p99.unwrap();
assert!(p50 >= 49 && p50 <= 51, "p50 should be around 50, got {}", p50);
assert!(p95 >= 94 && p95 <= 96, "p95 should be around 95, got {}", p95);
assert!(p99 >= 98 && p99 <= 100, "p99 should be around 99, got {}", p99);
}
#[test]
fn test_compression_ratio() {
let metrics = ProtocolMetrics::new();
metrics.record_compression(1000, 400);
let snapshot = metrics.snapshot();
assert!((snapshot.compression_ratio - 0.4).abs() < 0.01);
}
#[test]
fn test_connection_quality() {
let mut snapshot = MetricsSnapshot::default();
assert_eq!(snapshot.connection_quality(), ConnectionQuality::Unknown);
snapshot.messages_sent = 100;
snapshot.messages_failed = 0;
snapshot.latency_p95 = Some(30);
assert_eq!(snapshot.connection_quality(), ConnectionQuality::Excellent);
snapshot.latency_p95 = Some(120);
assert_eq!(snapshot.connection_quality(), ConnectionQuality::Fair);
snapshot.messages_failed = 15; assert_eq!(snapshot.connection_quality(), ConnectionQuality::Poor);
}
}