use std::sync::Arc;
use std::time::Instant;
use iroh_metrics::{Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, MetricsGroup};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use crate::peers::PeerTable;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, EncodeLabelValue)]
pub enum DropReason {
Firewall,
SendFailure,
NoPeer,
Malformed,
Spoof,
}
impl DropReason {
const ALL: [DropReason; 5] = [
DropReason::Firewall,
DropReason::SendFailure,
DropReason::NoPeer,
DropReason::Malformed,
DropReason::Spoof,
];
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, EncodeLabelSet)]
pub struct DropLabels {
pub reason: DropReason,
}
#[derive(Debug, Clone, Serialize)]
pub struct MetricsSnapshot {
pub packets_rx: u64,
pub packets_tx: u64,
pub bytes_rx: u64,
pub bytes_tx: u64,
pub drops: Vec<(String, u64)>,
pub uptime_secs: u64,
}
#[derive(Debug, MetricsGroup)]
#[metrics(name = "rayfish", default)]
pub struct ForwardMetrics {
pub packets_rx: Counter,
pub packets_tx: Counter,
pub bytes_rx: Counter,
pub bytes_tx: Counter,
pub drops: Family<DropLabels, Counter>,
pub rejects_sent: Counter,
}
impl ForwardMetrics {
pub fn record_rx(&self, bytes: usize) {
self.packets_rx.inc();
self.bytes_rx.inc_by(bytes as u64);
}
pub fn record_tx(&self, bytes: usize) {
self.packets_tx.inc();
self.bytes_tx.inc_by(bytes as u64);
}
pub fn record_drop(&self, reason: DropReason) {
self.drops.get_or_create(&DropLabels { reason }).inc();
}
pub fn record_reject(&self) {
self.rejects_sent.inc();
}
fn drop_count(&self, reason: DropReason) -> u64 {
self.drops
.get(&DropLabels { reason })
.map(|c| c.get())
.unwrap_or(0)
}
fn total_drops(&self) -> u64 {
DropReason::ALL.iter().map(|r| self.drop_count(*r)).sum()
}
pub fn snapshot(&self, start: Instant) -> MetricsSnapshot {
let drops = DropReason::ALL
.iter()
.map(|r| (format!("{r:?}"), self.drop_count(*r)))
.collect();
MetricsSnapshot {
packets_rx: self.packets_rx.get(),
packets_tx: self.packets_tx.get(),
bytes_rx: self.bytes_rx.get(),
bytes_tx: self.bytes_tx.get(),
drops,
uptime_secs: start.elapsed().as_secs(),
}
}
pub fn spawn_logger(self: &Arc<Self>, token: CancellationToken) {
let stats = self.clone();
tokio::spawn(async move {
let start = Instant::now();
let mut prev_rx = 0u64;
let mut prev_tx = 0u64;
let mut prev_bytes_rx = 0u64;
let mut prev_bytes_tx = 0u64;
let mut prev_drops = 0u64;
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
let rx = stats.packets_rx.get();
let tx = stats.packets_tx.get();
let brx = stats.bytes_rx.get();
let btx = stats.bytes_tx.get();
let drops = stats.total_drops();
tracing::info!(
rx = rx - prev_rx,
tx = tx - prev_tx,
bytes_rx = brx - prev_bytes_rx,
bytes_tx = btx - prev_bytes_tx,
drops = drops - prev_drops,
"(30s)"
);
prev_rx = rx;
prev_tx = tx;
prev_bytes_rx = brx;
prev_bytes_tx = btx;
prev_drops = drops;
}
_ = token.cancelled() => {
let duration = start.elapsed();
let mins = duration.as_secs() / 60;
let secs = duration.as_secs() % 60;
let total_bytes = stats.bytes_rx.get() + stats.bytes_tx.get();
tracing::info!(
duration = format!("{}m{}s", mins, secs),
total_rx = stats.packets_rx.get(),
total_tx = stats.packets_tx.get(),
total_bytes,
"session complete"
);
return;
}
}
}
});
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, EncodeLabelSet)]
pub struct PeerLabels {
pub peer: String,
}
#[derive(Debug, MetricsGroup)]
#[metrics(name = "rayfish_peer", default)]
pub struct PeerMetrics {
pub rtt_us: Family<PeerLabels, Gauge>,
pub bytes_tx: Family<PeerLabels, Gauge>,
pub bytes_rx: Family<PeerLabels, Gauge>,
pub lost_packets: Family<PeerLabels, Gauge>,
}
impl PeerMetrics {
pub fn spawn_collector(self: &Arc<Self>, peers: PeerTable, token: CancellationToken) {
let metrics = self.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
for (ip, conn) in peers.all_connections() {
let label = PeerLabels {
peer: ip.to_string(),
};
let paths = conn.paths();
if let Some(path) = paths.iter().find(|p| p.is_selected()) {
let rtt_us = path.rtt().as_micros() as i64;
metrics.rtt_us.get_or_create(&label).set(rtt_us);
}
let stats = conn.stats();
metrics.bytes_tx.get_or_create(&label).set(stats.udp_tx.bytes as i64);
metrics.bytes_rx.get_or_create(&label).set(stats.udp_rx.bytes as i64);
metrics.lost_packets.get_or_create(&label).set(stats.lost_packets as i64);
}
}
_ = token.cancelled() => return,
}
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_rx() {
let stats = ForwardMetrics::default();
stats.record_rx(100);
stats.record_rx(200);
assert_eq!(stats.packets_rx.get(), 2);
assert_eq!(stats.bytes_rx.get(), 300);
}
#[test]
fn test_record_tx() {
let stats = ForwardMetrics::default();
stats.record_tx(500);
assert_eq!(stats.packets_tx.get(), 1);
assert_eq!(stats.bytes_tx.get(), 500);
}
#[test]
fn test_record_drop() {
let stats = ForwardMetrics::default();
stats.record_drop(DropReason::Firewall);
stats.record_drop(DropReason::NoPeer);
stats.record_drop(DropReason::Firewall);
assert_eq!(
stats
.drops
.get(&DropLabels {
reason: DropReason::Firewall
})
.unwrap()
.get(),
2
);
assert_eq!(
stats
.drops
.get(&DropLabels {
reason: DropReason::NoPeer
})
.unwrap()
.get(),
1
);
assert_eq!(stats.total_drops(), 3);
}
#[test]
fn test_snapshot() {
let stats = ForwardMetrics::default();
stats.record_rx(100);
stats.record_tx(50);
stats.record_drop(DropReason::NoPeer);
let snap = stats.snapshot(Instant::now());
assert_eq!(snap.packets_rx, 1);
assert_eq!(snap.bytes_rx, 100);
assert_eq!(snap.packets_tx, 1);
assert_eq!(snap.bytes_tx, 50);
assert_eq!(snap.drops.len(), DropReason::ALL.len());
let no_peer = snap
.drops
.iter()
.find(|(r, _)| r == "NoPeer")
.map(|(_, c)| *c);
assert_eq!(no_peer, Some(1));
}
}