use crate::utils::misc::{Typename, get_unix_secs_now};
use scc::HashIndex as SccHashIndex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub incoming_protos: HashMap<String, u64>,
pub outgoing_protos: HashMap<String, u64>,
pub errors: HashMap<String, u64>,
pub udp: UdpStats,
pub udpps: UdpStats, pub uptime: u32,
pub tasks: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpStats {
pub incoming_packets: u64,
pub incoming_bytes: u64,
pub outgoing_packets: u64,
pub outgoing_bytes: u64,
}
impl MetricsSnapshot {
pub fn to_json_string(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|e| {
warn!("Failed to serialize metrics snapshot: {}", e);
"{}".into()
})
}
pub fn to_prometheus_string(&self) -> String {
let udp = format!(
r#"
# HELP amadeus_udp_packets_total Total number of UDP packets
# TYPE amadeus_udp_packets_total counter
amadeus_udp_packets_total{{type="incoming"}} {}
amadeus_udp_packets_total{{type="outgoing"}} {}
# HELP amadeus_udp_bytes_total Total number of UDP bytes
# TYPE amadeus_udp_bytes_total counter
amadeus_udp_bytes_total{{type="incoming"}} {}
amadeus_udp_bytes_total{{type="outgoing"}} {}
# HELP amadeus_uptime_seconds Process uptime in seconds
# TYPE amadeus_uptime_seconds gauge
amadeus_uptime_seconds {}
# HELP amadeus_tasks_active Current number of active tasks
# TYPE amadeus_tasks_active gauge
amadeus_tasks_active {}"#,
self.udp.incoming_packets,
self.udp.outgoing_packets,
self.udp.incoming_bytes,
self.udp.outgoing_bytes,
self.uptime,
self.tasks
);
let udpps = format!(
r#"
# HELP amadeus_udp_packets_per_second Total number of UDP packets
# TYPE amadeus_udp_packets_per_second gauge
amadeus_udp_packets_per_second{{type="incoming"}} {}
amadeus_udp_packets_per_second{{type="outgoing"}} {}
# HELP amadeus_udp_bytes_per_second Total number of UDP bytes
# TYPE amadeus_udp_bytes_per_second gauge
amadeus_udp_bytes_per_second{{type="incoming"}} {}
amadeus_udp_bytes_per_second{{type="outgoing"}} {}"#,
self.udpps.incoming_packets,
self.udpps.outgoing_packets,
self.udpps.incoming_bytes,
self.udpps.outgoing_bytes
);
let mut protos = Vec::new();
protos.push("\n\n# HELP amadeus_incoming_protos_total Total number of proto messages handled by type".into());
protos.push("# TYPE amadeus_incoming_protos_total counter".into());
for (proto_name, count) in &self.incoming_protos {
protos.push(format!("amadeus_incoming_protos_total{{type=\"{}\"}} {}", proto_name, count));
}
let mut sent_packets = Vec::new();
sent_packets
.push("\n\n# HELP amadeus_outgoing_protos_total Total number of messages sent by protocol type".into());
sent_packets.push("# TYPE amadeus_outgoing_protos_total counter".into());
for (proto_name, count) in &self.outgoing_protos {
sent_packets.push(format!("amadeus_outgoing_protos_total{{type=\"{}\"}} {}", proto_name, count));
}
let mut errors = Vec::new();
errors.push("\n\n# HELP amadeus_packet_errors_total Total number of packet processing errors by type".into());
errors.push("# TYPE amadeus_packet_errors_total counter".into());
for (error_type, count) in &self.errors {
errors.push(format!("amadeus_packet_errors_total{{type=\"{}\"}} {}", error_type, count));
}
format!("{}{}{}{}{}", udp, udpps, protos.join("\n"), sent_packets.join("\n"), errors.join("\n"))
}
}
pub struct Metrics {
incoming_bytes: AtomicU64, incoming_packets: AtomicU64, outgoing_bytes: AtomicU64, outgoing_packets: AtomicU64,
incoming_protos: SccHashIndex<String, Arc<AtomicU64>>,
errors: SccHashIndex<String, Arc<AtomicU64>>,
outgoing_protos: SccHashIndex<String, Arc<AtomicU64>>,
tasks: AtomicU64,
start_time: u32,
}
impl Metrics {
pub fn new() -> Self {
let handled_protos = SccHashIndex::new();
let errors = SccHashIndex::new();
let sent_packets = SccHashIndex::new();
Self {
incoming_bytes: AtomicU64::new(0),
incoming_packets: AtomicU64::new(0),
outgoing_bytes: AtomicU64::new(0),
outgoing_packets: AtomicU64::new(0),
incoming_protos: handled_protos,
errors,
outgoing_protos: sent_packets,
tasks: AtomicU64::new(0),
start_time: get_unix_secs_now(),
}
}
#[inline]
pub fn add_incoming_proto(&self, name: &str) {
let name = name.to_owned();
if let Some(counter) = self.incoming_protos.peek_with(&name, |_, v| v.clone()) {
counter.fetch_add(1, Ordering::Relaxed);
} else {
let _ = self.incoming_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
}
}
#[inline]
pub fn add_outgoing_proto(&self, name: &str) {
let name = name.to_owned();
if let Some(counter) = self.outgoing_protos.peek_with(&name, |_, v| v.clone()) {
counter.fetch_add(1, Ordering::Relaxed);
} else {
let _ = self.outgoing_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
}
}
pub fn add_incoming_udp_packet(&self, len: usize) {
self.incoming_bytes.fetch_add(len as u64, Ordering::Relaxed);
self.incoming_packets.fetch_add(1, Ordering::Relaxed);
}
pub fn add_outgoing_udp_packet(&self, len: usize) {
self.outgoing_bytes.fetch_add(len as u64, Ordering::Relaxed);
self.outgoing_packets.fetch_add(1, Ordering::Relaxed);
}
pub fn add_error<E: Debug + Typename>(&self, error: &E) {
warn!(target = "metrics", "error: {error:?}");
self.add_error_by_name(error.typename());
}
fn add_error_by_name(&self, error_type: &str) {
let et_owned = error_type.to_string();
if let Some(counter) = self.errors.peek_with(&et_owned, |_, v| v.clone()) {
counter.fetch_add(1, Ordering::Relaxed);
} else {
let _ = self.errors.insert_sync(et_owned, Arc::new(AtomicU64::new(1)));
}
}
pub fn inc_tasks(&self) {
self.tasks.fetch_add(1, Ordering::Relaxed);
}
pub fn dec_tasks(&self) {
self.tasks.fetch_sub(1, Ordering::Relaxed);
}
pub fn get_snapshot(&self) -> MetricsSnapshot {
let uptime = self.get_uptime();
let mut incoming_protos = HashMap::new();
let mut outgoing_protos = HashMap::new();
let mut errors = HashMap::new();
self.incoming_protos.iter_sync(|proto_name, counter| {
incoming_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
true
});
self.outgoing_protos.iter_sync(|proto_name, counter| {
outgoing_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
true
});
self.errors.iter_sync(|error_type, counter| {
errors.insert(error_type.clone(), counter.load(Ordering::Relaxed));
true
});
let (udp, udpps) = self.get_udp_stats(uptime);
let tasks = self.tasks.load(Ordering::Relaxed);
MetricsSnapshot { incoming_protos, outgoing_protos, uptime, errors, udp, udpps, tasks }
}
pub fn get_uptime(&self) -> u32 {
let now_s = get_unix_secs_now();
now_s.saturating_sub(self.start_time)
}
fn get_udp_stats(&self, uptime_seconds: u32) -> (UdpStats, UdpStats) {
static LAST_INCOMING_BYTES: AtomicU64 = AtomicU64::new(0);
static LAST_INCOMING_PACKETS: AtomicU64 = AtomicU64::new(0);
static LAST_OUTGOING_BYTES: AtomicU64 = AtomicU64::new(0);
static LAST_OUTGOING_PACKETS: AtomicU64 = AtomicU64::new(0);
static LAST_UPTIME_SECONDS: AtomicU32 = AtomicU32::new(0);
let incoming_packets = self.incoming_packets.load(Ordering::Relaxed);
let incoming_bytes = self.incoming_bytes.load(Ordering::Relaxed);
let outgoing_packets = self.outgoing_packets.load(Ordering::Relaxed);
let outgoing_bytes = self.outgoing_bytes.load(Ordering::Relaxed);
let lus = LAST_UPTIME_SECONDS.swap(uptime_seconds, Ordering::Relaxed);
let lip = LAST_INCOMING_PACKETS.swap(incoming_packets, Ordering::Relaxed);
let lib = LAST_INCOMING_BYTES.swap(incoming_bytes, Ordering::Relaxed);
let lop = LAST_OUTGOING_PACKETS.swap(outgoing_packets, Ordering::Relaxed);
let lob = LAST_OUTGOING_BYTES.swap(outgoing_bytes, Ordering::Relaxed);
let mut seconds = (uptime_seconds.saturating_sub(lus)) as u64;
if seconds == 0 {
seconds = 1;
}
let dp_in = incoming_packets.saturating_sub(lip);
let db_in = incoming_bytes.saturating_sub(lib);
let dp_out = outgoing_packets.saturating_sub(lop);
let db_out = outgoing_bytes.saturating_sub(lob);
let udp = UdpStats { incoming_packets, incoming_bytes, outgoing_packets, outgoing_bytes };
let udpps = UdpStats {
incoming_packets: dp_in / seconds,
incoming_bytes: db_in / seconds,
outgoing_packets: dp_out / seconds,
outgoing_bytes: db_out / seconds,
};
(udp, udpps)
}
pub fn get_json(&self) -> serde_json::Value {
serde_json::to_value(self.get_snapshot()).unwrap_or_else(|e| {
warn!("Failed to serialize metrics: {}", e);
serde_json::json!({})
})
}
pub fn get_prometheus(&self) -> String {
self.get_snapshot().to_prometheus_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn udp_packet_totals_are_tracked() {
let m = Metrics::new();
m.add_incoming_udp_packet(100);
m.add_incoming_udp_packet(250);
let snapshot = m.get_snapshot();
assert_eq!(snapshot.udp.incoming_packets, 2);
assert_eq!(snapshot.udp.incoming_bytes, 350);
}
#[test]
fn protocol_counters_and_prometheus_include_counts() {
let m = Metrics::new();
m.add_incoming_proto("ping");
m.add_incoming_proto("ping");
m.add_incoming_proto("peers");
let snapshot = m.get_snapshot();
assert_eq!(snapshot.incoming_protos.get("ping"), Some(&2));
assert_eq!(snapshot.incoming_protos.get("peers"), Some(&1));
let prom = snapshot.to_prometheus_string();
assert!(prom.contains("amadeus_incoming_protos_total{type=\"ping\"} 2"));
assert!(prom.contains("amadeus_incoming_protos_total{type=\"peers\"} 1"));
}
#[derive(Debug)]
struct DummyErr;
impl crate::utils::misc::Typename for DummyErr {
fn typename(&self) -> &'static str {
"dummy"
}
}
#[test]
fn error_counters_by_typename_and_prometheus() {
let m = Metrics::new();
let e = DummyErr;
m.add_error(&e);
m.add_error(&e);
let snapshot = m.get_snapshot();
assert_eq!(snapshot.errors.get("dummy"), Some(&2));
let prom = snapshot.to_prometheus_string();
assert!(prom.contains("amadeus_packet_errors_total{type=\"dummy\"} 2"));
}
#[test]
fn uptime_is_nonnegative_and_present() {
let m = Metrics::new();
let snapshot = m.get_snapshot();
let _uptime = snapshot.uptime;
}
#[test]
fn prometheus_packet_totals_reflect_counters() {
let m = Metrics::new();
m.add_incoming_udp_packet(10);
m.add_incoming_udp_packet(20);
m.add_outgoing_udp_packet(15);
let prom = m.get_snapshot().to_prometheus_string();
assert!(prom.contains("amadeus_udp_packets_total{type=\"incoming\"} 2"));
assert!(prom.contains("amadeus_udp_bytes_total{type=\"incoming\"} 30"));
assert!(prom.contains("amadeus_udp_packets_total{type=\"outgoing\"} 1"));
assert!(prom.contains("amadeus_udp_bytes_total{type=\"outgoing\"} 15"));
}
#[test]
fn sent_packet_counters_and_prometheus_include_counts() {
let m = Metrics::new();
m.add_outgoing_proto("ping");
m.add_outgoing_proto("ping");
m.add_outgoing_proto("pong");
let snapshot = m.get_snapshot();
assert_eq!(snapshot.outgoing_protos.get("ping"), Some(&2));
assert_eq!(snapshot.outgoing_protos.get("pong"), Some(&1));
let prom = snapshot.to_prometheus_string();
assert!(prom.contains("amadeus_outgoing_protos_total{type=\"ping\"} 2"));
assert!(prom.contains("amadeus_outgoing_protos_total{type=\"pong\"} 1"));
}
#[test]
fn metrics_snapshot_serialization() {
let m = Metrics::new();
m.add_incoming_proto("test");
m.add_outgoing_proto("test");
m.add_incoming_udp_packet(100);
let snapshot = m.get_snapshot();
let json = serde_json::to_string(&snapshot).expect("Should serialize");
let deserialized: MetricsSnapshot = serde_json::from_str(&json).expect("Should deserialize");
assert_eq!(deserialized.incoming_protos.get("test"), Some(&1));
assert_eq!(deserialized.outgoing_protos.get("test"), Some(&1));
assert_eq!(deserialized.udp.incoming_packets, 1);
assert_eq!(deserialized.udp.incoming_bytes, 100);
assert_eq!(deserialized.tasks, 0);
}
#[test]
fn prometheus_generation_from_snapshot() {
let m = Metrics::new();
m.add_incoming_proto("test_proto");
m.add_incoming_udp_packet(50);
let snapshot = m.get_snapshot();
let prometheus = snapshot.to_prometheus_string();
assert!(prometheus.contains("amadeus_incoming_protos_total{type=\"test_proto\"} 1"));
assert!(prometheus.contains("amadeus_udp_packets_total{type=\"incoming\"} 1"));
assert!(prometheus.contains("amadeus_udp_bytes_total{type=\"incoming\"} 50"));
}
#[test]
fn tasks_are_tracked_correctly() {
let m = Metrics::new();
let snapshot = m.get_snapshot();
assert_eq!(snapshot.tasks, 0);
m.inc_tasks();
m.inc_tasks();
m.inc_tasks();
let snapshot = m.get_snapshot();
assert_eq!(snapshot.tasks, 3);
m.dec_tasks();
let snapshot = m.get_snapshot();
assert_eq!(snapshot.tasks, 2);
}
#[test]
fn prometheus_includes_tasks_gauge() {
let m = Metrics::new();
m.inc_tasks();
m.inc_tasks();
let snapshot = m.get_snapshot();
let prometheus = snapshot.to_prometheus_string();
assert!(prometheus.contains("amadeus_tasks_active 2"));
}
}