use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Default)]
pub struct ChannelMetrics {
messages_sent: AtomicU64,
messages_received: AtomicU64,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
send_errors: AtomicU64,
receive_errors: AtomicU64,
queue_depth: AtomicU64,
peak_queue_depth: AtomicU64,
latency_sum_us: AtomicU64,
latency_count: AtomicU64,
min_latency_us: AtomicU64,
max_latency_us: AtomicU64,
latency_histogram: RwLock<LatencyHistogram>,
start_time: RwLock<Option<Instant>>,
}
impl ChannelMetrics {
pub fn new() -> Self {
Self {
min_latency_us: AtomicU64::new(u64::MAX),
..Default::default()
}
}
pub fn record_send(&self, bytes: usize) {
self.ensure_started();
self.messages_sent.fetch_add(1, Ordering::Relaxed);
self.bytes_sent.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn record_recv(&self, bytes: usize) {
self.ensure_started();
self.messages_received.fetch_add(1, Ordering::Relaxed);
self.bytes_received
.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn record_send_error(&self) {
self.send_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn record_recv_error(&self) {
self.receive_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn record_latency(&self, latency: Duration) {
let us = latency.as_micros() as u64;
self.latency_sum_us.fetch_add(us, Ordering::Relaxed);
self.latency_count.fetch_add(1, Ordering::Relaxed);
let mut current_min = self.min_latency_us.load(Ordering::Relaxed);
while us < current_min {
match self.min_latency_us.compare_exchange_weak(
current_min,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_min = x,
}
}
let mut current_max = self.max_latency_us.load(Ordering::Relaxed);
while us > current_max {
match self.max_latency_us.compare_exchange_weak(
current_max,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_max = x,
}
}
self.latency_histogram.write().record(us);
}
pub fn set_queue_depth(&self, depth: u64) {
self.queue_depth.store(depth, Ordering::Relaxed);
let mut current_peak = self.peak_queue_depth.load(Ordering::Relaxed);
while depth > current_peak {
match self.peak_queue_depth.compare_exchange_weak(
current_peak,
depth,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_peak = x,
}
}
}
pub fn messages_sent(&self) -> u64 {
self.messages_sent.load(Ordering::Relaxed)
}
pub fn messages_received(&self) -> u64 {
self.messages_received.load(Ordering::Relaxed)
}
pub fn bytes_sent(&self) -> u64 {
self.bytes_sent.load(Ordering::Relaxed)
}
pub fn bytes_received(&self) -> u64 {
self.bytes_received.load(Ordering::Relaxed)
}
pub fn send_errors(&self) -> u64 {
self.send_errors.load(Ordering::Relaxed)
}
pub fn receive_errors(&self) -> u64 {
self.receive_errors.load(Ordering::Relaxed)
}
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Ordering::Relaxed)
}
pub fn peak_queue_depth(&self) -> u64 {
self.peak_queue_depth.load(Ordering::Relaxed)
}
pub fn avg_latency_us(&self) -> u64 {
let count = self.latency_count.load(Ordering::Relaxed);
if count == 0 {
return 0;
}
self.latency_sum_us.load(Ordering::Relaxed) / count
}
pub fn min_latency_us(&self) -> Option<u64> {
let min = self.min_latency_us.load(Ordering::Relaxed);
if min == u64::MAX {
None
} else {
Some(min)
}
}
pub fn max_latency_us(&self) -> u64 {
self.max_latency_us.load(Ordering::Relaxed)
}
pub fn latency_percentile(&self, percentile: u8) -> u64 {
self.latency_histogram.read().percentile(percentile)
}
pub fn elapsed(&self) -> Duration {
self.start_time
.read()
.map(|t| t.elapsed())
.unwrap_or_default()
}
pub fn send_throughput(&self) -> f64 {
let elapsed = self.elapsed().as_secs_f64();
if elapsed == 0.0 {
return 0.0;
}
self.messages_sent() as f64 / elapsed
}
pub fn recv_throughput(&self) -> f64 {
let elapsed = self.elapsed().as_secs_f64();
if elapsed == 0.0 {
return 0.0;
}
self.messages_received() as f64 / elapsed
}
pub fn send_bandwidth(&self) -> f64 {
let elapsed = self.elapsed().as_secs_f64();
if elapsed == 0.0 {
return 0.0;
}
self.bytes_sent() as f64 / elapsed
}
pub fn recv_bandwidth(&self) -> f64 {
let elapsed = self.elapsed().as_secs_f64();
if elapsed == 0.0 {
return 0.0;
}
self.bytes_received() as f64 / elapsed
}
pub fn reset(&self) {
self.messages_sent.store(0, Ordering::Relaxed);
self.messages_received.store(0, Ordering::Relaxed);
self.bytes_sent.store(0, Ordering::Relaxed);
self.bytes_received.store(0, Ordering::Relaxed);
self.send_errors.store(0, Ordering::Relaxed);
self.receive_errors.store(0, Ordering::Relaxed);
self.queue_depth.store(0, Ordering::Relaxed);
self.peak_queue_depth.store(0, Ordering::Relaxed);
self.latency_sum_us.store(0, Ordering::Relaxed);
self.latency_count.store(0, Ordering::Relaxed);
self.min_latency_us.store(u64::MAX, Ordering::Relaxed);
self.max_latency_us.store(0, Ordering::Relaxed);
self.latency_histogram.write().reset();
*self.start_time.write() = Some(Instant::now());
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
messages_sent: self.messages_sent(),
messages_received: self.messages_received(),
bytes_sent: self.bytes_sent(),
bytes_received: self.bytes_received(),
send_errors: self.send_errors(),
receive_errors: self.receive_errors(),
queue_depth: self.queue_depth(),
peak_queue_depth: self.peak_queue_depth(),
avg_latency_us: self.avg_latency_us(),
min_latency_us: self.min_latency_us(),
max_latency_us: self.max_latency_us(),
p50_latency_us: self.latency_percentile(50),
p95_latency_us: self.latency_percentile(95),
p99_latency_us: self.latency_percentile(99),
elapsed_secs: self.elapsed().as_secs_f64(),
send_throughput: self.send_throughput(),
recv_throughput: self.recv_throughput(),
send_bandwidth: self.send_bandwidth(),
recv_bandwidth: self.recv_bandwidth(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(&self.snapshot()).unwrap_or_default()
}
pub fn to_json_pretty(&self) -> String {
serde_json::to_string_pretty(&self.snapshot()).unwrap_or_default()
}
pub fn to_prometheus(&self, prefix: &str) -> String {
let snapshot = self.snapshot();
let mut output = String::new();
output.push_str(&format!(
"# HELP {prefix}_messages_sent_total Total messages sent\n"
));
output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
output.push_str(&format!(
"{prefix}_messages_sent_total {}\n",
snapshot.messages_sent
));
output.push_str(&format!(
"# HELP {prefix}_messages_received_total Total messages received\n"
));
output.push_str(&format!(
"# TYPE {prefix}_messages_received_total counter\n"
));
output.push_str(&format!(
"{prefix}_messages_received_total {}\n",
snapshot.messages_received
));
output.push_str(&format!(
"# HELP {prefix}_bytes_sent_total Total bytes sent\n"
));
output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
output.push_str(&format!(
"{prefix}_bytes_sent_total {}\n",
snapshot.bytes_sent
));
output.push_str(&format!(
"# HELP {prefix}_bytes_received_total Total bytes received\n"
));
output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
output.push_str(&format!(
"{prefix}_bytes_received_total {}\n",
snapshot.bytes_received
));
output.push_str(&format!(
"# HELP {prefix}_send_errors_total Total send errors\n"
));
output.push_str(&format!("# TYPE {prefix}_send_errors_total counter\n"));
output.push_str(&format!(
"{prefix}_send_errors_total {}\n",
snapshot.send_errors
));
output.push_str(&format!(
"# HELP {prefix}_receive_errors_total Total receive errors\n"
));
output.push_str(&format!("# TYPE {prefix}_receive_errors_total counter\n"));
output.push_str(&format!(
"{prefix}_receive_errors_total {}\n",
snapshot.receive_errors
));
output.push_str(&format!(
"# HELP {prefix}_queue_depth Current queue depth\n"
));
output.push_str(&format!("# TYPE {prefix}_queue_depth gauge\n"));
output.push_str(&format!("{prefix}_queue_depth {}\n", snapshot.queue_depth));
output.push_str(&format!(
"# HELP {prefix}_latency_microseconds Latency in microseconds\n"
));
output.push_str(&format!("# TYPE {prefix}_latency_microseconds summary\n"));
output.push_str(&format!(
"{prefix}_latency_microseconds{{quantile=\"0.5\"}} {}\n",
snapshot.p50_latency_us
));
output.push_str(&format!(
"{prefix}_latency_microseconds{{quantile=\"0.95\"}} {}\n",
snapshot.p95_latency_us
));
output.push_str(&format!(
"{prefix}_latency_microseconds{{quantile=\"0.99\"}} {}\n",
snapshot.p99_latency_us
));
output.push_str(&format!(
"# HELP {prefix}_throughput_messages_per_second Message throughput\n"
));
output.push_str(&format!(
"# TYPE {prefix}_throughput_messages_per_second gauge\n"
));
output.push_str(&format!(
"{prefix}_throughput_messages_per_second{{direction=\"send\"}} {:.2}\n",
snapshot.send_throughput
));
output.push_str(&format!(
"{prefix}_throughput_messages_per_second{{direction=\"recv\"}} {:.2}\n",
snapshot.recv_throughput
));
output
}
fn ensure_started(&self) {
let mut start = self.start_time.write();
if start.is_none() {
*start = Some(Instant::now());
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub messages_sent: u64,
pub messages_received: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub send_errors: u64,
pub receive_errors: u64,
pub queue_depth: u64,
pub peak_queue_depth: u64,
pub avg_latency_us: u64,
pub min_latency_us: Option<u64>,
pub max_latency_us: u64,
pub p50_latency_us: u64,
pub p95_latency_us: u64,
pub p99_latency_us: u64,
pub elapsed_secs: f64,
pub send_throughput: f64,
pub recv_throughput: f64,
pub send_bandwidth: f64,
pub recv_bandwidth: f64,
}
#[derive(Debug, Default)]
struct LatencyHistogram {
buckets: [u64; 7],
samples: Vec<u64>,
max_samples: usize,
}
impl LatencyHistogram {
#[allow(dead_code)]
fn new() -> Self {
Self {
buckets: [0; 7],
samples: Vec::new(),
max_samples: 10000,
}
}
fn record(&mut self, latency_us: u64) {
let bucket = match latency_us {
0..=10 => 0,
11..=100 => 1,
101..=1000 => 2,
1001..=10000 => 3,
10001..=100000 => 4,
100001..=1000000 => 5,
_ => 6,
};
self.buckets[bucket] += 1;
if self.samples.len() < self.max_samples {
self.samples.push(latency_us);
} else {
let idx = rand_usize() % (self.samples.len() + 1);
if idx < self.samples.len() {
self.samples[idx] = latency_us;
}
}
}
fn percentile(&self, p: u8) -> u64 {
if self.samples.is_empty() {
return 0;
}
let mut sorted = self.samples.clone();
sorted.sort_unstable();
let idx = ((p as f64 / 100.0) * (sorted.len() - 1) as f64) as usize;
sorted[idx]
}
fn reset(&mut self) {
self.buckets = [0; 7];
self.samples.clear();
}
}
fn rand_usize() -> usize {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
RandomState::new().build_hasher().finish() as usize
}
pub trait MeteredChannel {
fn metrics(&self) -> &ChannelMetrics;
}
pub struct MeteredWrapper<C> {
inner: C,
metrics: ChannelMetrics,
}
impl<C> MeteredWrapper<C> {
pub fn new(channel: C) -> Self {
Self {
inner: channel,
metrics: ChannelMetrics::new(),
}
}
pub fn inner(&self) -> &C {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut C {
&mut self.inner
}
pub fn into_inner(self) -> C {
self.inner
}
}
impl<C> MeteredChannel for MeteredWrapper<C> {
fn metrics(&self) -> &ChannelMetrics {
&self.metrics
}
}
pub trait WithMetrics: Sized {
fn with_metrics(self) -> MeteredWrapper<Self> {
MeteredWrapper::new(self)
}
}
impl<T> WithMetrics for T {}
pub struct MeteredSender<S> {
inner: S,
metrics: std::sync::Arc<ChannelMetrics>,
}
impl<S> MeteredSender<S> {
pub fn new(sender: S, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
Self {
inner: sender,
metrics,
}
}
pub fn inner(&self) -> &S {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn metrics(&self) -> &ChannelMetrics {
&self.metrics
}
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S: Clone> Clone for MeteredSender<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
}
}
}
pub struct MeteredReceiver<R> {
inner: R,
metrics: std::sync::Arc<ChannelMetrics>,
}
impl<R> MeteredReceiver<R> {
pub fn new(receiver: R, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
Self {
inner: receiver,
metrics,
}
}
pub fn inner(&self) -> &R {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut R {
&mut self.inner
}
pub fn metrics(&self) -> &ChannelMetrics {
&self.metrics
}
pub fn into_inner(self) -> R {
self.inner
}
}
pub trait IntoMetered: Sized {
fn metered(self, metrics: std::sync::Arc<ChannelMetrics>) -> MeteredSender<Self> {
MeteredSender::new(self, metrics)
}
}
impl<T> IntoMetered for T {}
pub fn metered_pair<S, R>(
sender: S,
receiver: R,
) -> (
MeteredSender<S>,
MeteredReceiver<R>,
std::sync::Arc<ChannelMetrics>,
) {
let metrics = std::sync::Arc::new(ChannelMetrics::new());
let metered_sender = MeteredSender::new(sender, metrics.clone());
let metered_receiver = MeteredReceiver::new(receiver, metrics.clone());
(metered_sender, metered_receiver, metrics)
}
#[derive(Debug, Default)]
pub struct AggregatedMetrics {
channels: parking_lot::RwLock<Vec<std::sync::Arc<ChannelMetrics>>>,
}
impl AggregatedMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn register(&self, metrics: std::sync::Arc<ChannelMetrics>) {
self.channels.write().push(metrics);
}
pub fn total_messages_sent(&self) -> u64 {
self.channels.read().iter().map(|m| m.messages_sent()).sum()
}
pub fn total_messages_received(&self) -> u64 {
self.channels
.read()
.iter()
.map(|m| m.messages_received())
.sum()
}
pub fn total_bytes_sent(&self) -> u64 {
self.channels.read().iter().map(|m| m.bytes_sent()).sum()
}
pub fn total_bytes_received(&self) -> u64 {
self.channels
.read()
.iter()
.map(|m| m.bytes_received())
.sum()
}
pub fn total_send_errors(&self) -> u64 {
self.channels.read().iter().map(|m| m.send_errors()).sum()
}
pub fn total_receive_errors(&self) -> u64 {
self.channels
.read()
.iter()
.map(|m| m.receive_errors())
.sum()
}
pub fn channel_count(&self) -> usize {
self.channels.read().len()
}
pub fn snapshots(&self) -> Vec<MetricsSnapshot> {
self.channels.read().iter().map(|m| m.snapshot()).collect()
}
pub fn to_json(&self) -> String {
let aggregate = serde_json::json!({
"channel_count": self.channel_count(),
"total_messages_sent": self.total_messages_sent(),
"total_messages_received": self.total_messages_received(),
"total_bytes_sent": self.total_bytes_sent(),
"total_bytes_received": self.total_bytes_received(),
"total_send_errors": self.total_send_errors(),
"total_receive_errors": self.total_receive_errors(),
"channels": self.snapshots(),
});
serde_json::to_string_pretty(&aggregate).unwrap_or_default()
}
pub fn to_prometheus(&self, prefix: &str) -> String {
let mut output = String::new();
output.push_str(&format!(
"# HELP {prefix}_channels_total Number of registered channels\n"
));
output.push_str(&format!("# TYPE {prefix}_channels_total gauge\n"));
output.push_str(&format!(
"{prefix}_channels_total {}\n",
self.channel_count()
));
output.push_str(&format!(
"# HELP {prefix}_messages_sent_total Total messages sent across all channels\n"
));
output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
output.push_str(&format!(
"{prefix}_messages_sent_total {}\n",
self.total_messages_sent()
));
output.push_str(&format!(
"# HELP {prefix}_messages_received_total Total messages received across all channels\n"
));
output.push_str(&format!(
"# TYPE {prefix}_messages_received_total counter\n"
));
output.push_str(&format!(
"{prefix}_messages_received_total {}\n",
self.total_messages_received()
));
output.push_str(&format!(
"# HELP {prefix}_bytes_sent_total Total bytes sent across all channels\n"
));
output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
output.push_str(&format!(
"{prefix}_bytes_sent_total {}\n",
self.total_bytes_sent()
));
output.push_str(&format!(
"# HELP {prefix}_bytes_received_total Total bytes received across all channels\n"
));
output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
output.push_str(&format!(
"{prefix}_bytes_received_total {}\n",
self.total_bytes_received()
));
output
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_metrics() {
let metrics = ChannelMetrics::new();
metrics.record_send(100);
metrics.record_send(200);
metrics.record_recv(150);
assert_eq!(metrics.messages_sent(), 2);
assert_eq!(metrics.messages_received(), 1);
assert_eq!(metrics.bytes_sent(), 300);
assert_eq!(metrics.bytes_received(), 150);
}
#[test]
fn test_error_tracking() {
let metrics = ChannelMetrics::new();
metrics.record_send_error();
metrics.record_send_error();
metrics.record_recv_error();
assert_eq!(metrics.send_errors(), 2);
assert_eq!(metrics.receive_errors(), 1);
}
#[test]
fn test_latency_tracking() {
let metrics = ChannelMetrics::new();
metrics.record_latency(Duration::from_micros(100));
metrics.record_latency(Duration::from_micros(200));
metrics.record_latency(Duration::from_micros(300));
assert_eq!(metrics.avg_latency_us(), 200);
assert_eq!(metrics.min_latency_us(), Some(100));
assert_eq!(metrics.max_latency_us(), 300);
}
#[test]
fn test_queue_depth() {
let metrics = ChannelMetrics::new();
metrics.set_queue_depth(5);
assert_eq!(metrics.queue_depth(), 5);
assert_eq!(metrics.peak_queue_depth(), 5);
metrics.set_queue_depth(10);
assert_eq!(metrics.queue_depth(), 10);
assert_eq!(metrics.peak_queue_depth(), 10);
metrics.set_queue_depth(3);
assert_eq!(metrics.queue_depth(), 3);
assert_eq!(metrics.peak_queue_depth(), 10); }
#[test]
fn test_snapshot() {
let metrics = ChannelMetrics::new();
metrics.record_send(100);
metrics.record_recv(50);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.messages_sent, 1);
assert_eq!(snapshot.messages_received, 1);
assert_eq!(snapshot.bytes_sent, 100);
assert_eq!(snapshot.bytes_received, 50);
}
#[test]
fn test_json_export() {
let metrics = ChannelMetrics::new();
metrics.record_send(100);
let json = metrics.to_json();
assert!(json.contains("messages_sent"));
assert!(json.contains("1"));
}
#[test]
fn test_prometheus_export() {
let metrics = ChannelMetrics::new();
metrics.record_send(100);
let prom = metrics.to_prometheus("ipckit");
assert!(prom.contains("ipckit_messages_sent_total 1"));
}
#[test]
fn test_reset() {
let metrics = ChannelMetrics::new();
metrics.record_send(100);
metrics.record_recv(50);
metrics.reset();
assert_eq!(metrics.messages_sent(), 0);
assert_eq!(metrics.messages_received(), 0);
assert_eq!(metrics.bytes_sent(), 0);
assert_eq!(metrics.bytes_received(), 0);
}
#[test]
fn test_with_metrics() {
struct DummyChannel;
let wrapped = DummyChannel.with_metrics();
wrapped.metrics().record_send(100);
assert_eq!(wrapped.metrics().messages_sent(), 1);
}
#[test]
fn test_metered_sender_receiver() {
struct DummySender;
struct DummyReceiver;
let (sender, receiver, metrics) = metered_pair(DummySender, DummyReceiver);
sender.metrics().record_send(100);
assert_eq!(receiver.metrics().messages_sent(), 1);
assert_eq!(metrics.messages_sent(), 1);
}
#[test]
fn test_aggregated_metrics() {
let agg = AggregatedMetrics::new();
let m1 = std::sync::Arc::new(ChannelMetrics::new());
let m2 = std::sync::Arc::new(ChannelMetrics::new());
m1.record_send(100);
m1.record_send(200);
m2.record_send(50);
agg.register(m1);
agg.register(m2);
assert_eq!(agg.channel_count(), 2);
assert_eq!(agg.total_messages_sent(), 3);
assert_eq!(agg.total_bytes_sent(), 350);
}
}