use core::sync::atomic::{AtomicI64, AtomicU64, Ordering};
pub mod names {
pub const CONNECTIONS_ACTIVE: &str = "connections.active";
pub const CONNECTIONS_TOTAL: &str = "connections.total";
pub const TRANSFERS_RECEIVED: &str = "transfers.received";
pub const TRANSFERS_SENT: &str = "transfers.sent";
pub const TRANSFERS_UNSETTLED: &str = "transfers.unsettled";
pub const TRANSFERS_RATE: &str = "transfers.rate";
pub const ERRORS_DECODE: &str = "errors.decode";
pub const ERRORS_UNAUTHORIZED: &str = "errors.unauthorized";
pub const TOPICS_EXPOSED: &str = "topics.exposed";
pub const TRANSFERS_DROPPED_LOOP: &str = "transfers.dropped.loop";
pub const TRANSFERS_DROPPED_HOP_CAP: &str = "transfers.dropped.hop-cap";
pub const TRANSFERS_DROPPED_MALFORMED_REPLY: &str = "transfers.dropped.malformed-reply";
pub const RPC_CALLS_TIMED_OUT: &str = "rpc.calls.timed-out";
pub const TRANSFERS_DROPPED_RECONNECT_OVERFLOW: &str = "transfers.dropped.reconnect-overflow";
}
pub mod units {
pub const COUNT: &str = "count";
pub const BYTES: &str = "bytes";
pub const MILLISECONDS: &str = "milliseconds";
pub const PER_SECOND: &str = "per-second";
}
pub const MANDATORY_METRIC_NAMES: [&str; 14] = [
names::CONNECTIONS_ACTIVE,
names::CONNECTIONS_TOTAL,
names::TRANSFERS_RECEIVED,
names::TRANSFERS_SENT,
names::TRANSFERS_UNSETTLED,
names::TRANSFERS_RATE,
names::ERRORS_DECODE,
names::ERRORS_UNAUTHORIZED,
names::TOPICS_EXPOSED,
names::TRANSFERS_DROPPED_LOOP,
names::TRANSFERS_DROPPED_HOP_CAP,
names::TRANSFERS_DROPPED_MALFORMED_REPLY,
names::RPC_CALLS_TIMED_OUT,
names::TRANSFERS_DROPPED_RECONNECT_OVERFLOW,
];
#[derive(Debug, Default)]
pub struct MetricsHub {
pub connections_active: AtomicI64,
pub connections_total: AtomicU64,
pub transfers_received: AtomicU64,
pub transfers_sent: AtomicU64,
pub transfers_unsettled: AtomicI64,
pub transfers_rate: AtomicI64,
pub errors_decode: AtomicU64,
pub errors_unauthorized: AtomicU64,
pub topics_exposed: AtomicI64,
pub transfers_dropped_loop: AtomicU64,
pub transfers_dropped_hop_cap: AtomicU64,
pub transfers_dropped_malformed_reply: AtomicU64,
pub rpc_calls_timed_out: AtomicU64,
pub transfers_dropped_reconnect_overflow: AtomicU64,
}
impl MetricsHub {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_connection_open(&self) {
self.connections_active.fetch_add(1, Ordering::Relaxed);
self.connections_total.fetch_add(1, Ordering::Relaxed);
}
pub fn on_connection_close(&self) {
self.connections_active.fetch_sub(1, Ordering::Relaxed);
}
pub fn on_transfer_received(&self) {
self.transfers_received.fetch_add(1, Ordering::Relaxed);
}
pub fn on_transfer_sent(&self, settle_started: bool) {
self.transfers_sent.fetch_add(1, Ordering::Relaxed);
if settle_started {
self.transfers_unsettled.fetch_add(1, Ordering::Relaxed);
}
}
pub fn on_settled(&self) {
self.transfers_unsettled.fetch_sub(1, Ordering::Relaxed);
}
pub fn on_decode_error(&self) {
self.errors_decode.fetch_add(1, Ordering::Relaxed);
}
pub fn on_unauthorized(&self) {
self.errors_unauthorized.fetch_add(1, Ordering::Relaxed);
}
pub fn on_topic_added(&self) {
self.topics_exposed.fetch_add(1, Ordering::Relaxed);
}
pub fn on_topic_removed(&self) {
self.topics_exposed.fetch_sub(1, Ordering::Relaxed);
}
pub fn on_dropped_loop(&self) {
self.transfers_dropped_loop.fetch_add(1, Ordering::Relaxed);
}
pub fn on_dropped_hop_cap(&self) {
self.transfers_dropped_hop_cap
.fetch_add(1, Ordering::Relaxed);
}
pub fn on_dropped_malformed_reply(&self) {
self.transfers_dropped_malformed_reply
.fetch_add(1, Ordering::Relaxed);
}
pub fn on_rpc_timeout(&self) {
self.rpc_calls_timed_out.fetch_add(1, Ordering::Relaxed);
}
pub fn on_reconnect_overflow(&self) {
self.transfers_dropped_reconnect_overflow
.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self, name: &str) -> Option<i64> {
let v = match name {
names::CONNECTIONS_ACTIVE => self.connections_active.load(Ordering::Relaxed),
names::CONNECTIONS_TOTAL => {
i64_from_u64(self.connections_total.load(Ordering::Relaxed))
}
names::TRANSFERS_RECEIVED => {
i64_from_u64(self.transfers_received.load(Ordering::Relaxed))
}
names::TRANSFERS_SENT => i64_from_u64(self.transfers_sent.load(Ordering::Relaxed)),
names::TRANSFERS_UNSETTLED => self.transfers_unsettled.load(Ordering::Relaxed),
names::TRANSFERS_RATE => self.transfers_rate.load(Ordering::Relaxed),
names::ERRORS_DECODE => i64_from_u64(self.errors_decode.load(Ordering::Relaxed)),
names::ERRORS_UNAUTHORIZED => {
i64_from_u64(self.errors_unauthorized.load(Ordering::Relaxed))
}
names::TOPICS_EXPOSED => self.topics_exposed.load(Ordering::Relaxed),
names::TRANSFERS_DROPPED_LOOP => {
i64_from_u64(self.transfers_dropped_loop.load(Ordering::Relaxed))
}
names::TRANSFERS_DROPPED_HOP_CAP => {
i64_from_u64(self.transfers_dropped_hop_cap.load(Ordering::Relaxed))
}
names::TRANSFERS_DROPPED_MALFORMED_REPLY => i64_from_u64(
self.transfers_dropped_malformed_reply
.load(Ordering::Relaxed),
),
names::RPC_CALLS_TIMED_OUT => {
i64_from_u64(self.rpc_calls_timed_out.load(Ordering::Relaxed))
}
names::TRANSFERS_DROPPED_RECONNECT_OVERFLOW => i64_from_u64(
self.transfers_dropped_reconnect_overflow
.load(Ordering::Relaxed),
),
_ => return None,
};
Some(v)
}
#[must_use]
pub const fn unit_of(name: &str) -> Option<&'static str> {
match name.as_bytes() {
b"transfers.rate" => Some(units::PER_SECOND),
b"connections.active"
| b"connections.total"
| b"transfers.received"
| b"transfers.sent"
| b"transfers.unsettled"
| b"errors.decode"
| b"errors.unauthorized"
| b"topics.exposed"
| b"transfers.dropped.loop"
| b"transfers.dropped.hop-cap"
| b"transfers.dropped.malformed-reply"
| b"rpc.calls.timed-out"
| b"transfers.dropped.reconnect-overflow" => Some(units::COUNT),
_ => None,
}
}
}
const fn i64_from_u64(v: u64) -> i64 {
if v > i64::MAX as u64 {
i64::MAX
} else {
v as i64
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn mandatory_metric_count_is_14() {
assert_eq!(MANDATORY_METRIC_NAMES.len(), 14);
}
#[test]
fn fresh_hub_all_zero() {
let h = MetricsHub::new();
for name in MANDATORY_METRIC_NAMES {
assert_eq!(h.snapshot(name), Some(0), "metric {name} initial != 0");
}
}
#[test]
fn unknown_metric_yields_none() {
let h = MetricsHub::new();
assert!(h.snapshot("not.a.metric").is_none());
}
#[test]
fn connection_open_close_balances() {
let h = MetricsHub::new();
h.on_connection_open();
h.on_connection_open();
assert_eq!(h.snapshot(names::CONNECTIONS_ACTIVE), Some(2));
assert_eq!(h.snapshot(names::CONNECTIONS_TOTAL), Some(2));
h.on_connection_close();
assert_eq!(h.snapshot(names::CONNECTIONS_ACTIVE), Some(1));
assert_eq!(h.snapshot(names::CONNECTIONS_TOTAL), Some(2));
}
#[test]
fn transfer_unsettled_tracking() {
let h = MetricsHub::new();
h.on_transfer_sent(true); h.on_transfer_sent(true);
assert_eq!(h.snapshot(names::TRANSFERS_UNSETTLED), Some(2));
h.on_settled();
assert_eq!(h.snapshot(names::TRANSFERS_UNSETTLED), Some(1));
}
#[test]
fn pre_settled_transfer_does_not_count_unsettled() {
let h = MetricsHub::new();
h.on_transfer_sent(false); assert_eq!(h.snapshot(names::TRANSFERS_SENT), Some(1));
assert_eq!(h.snapshot(names::TRANSFERS_UNSETTLED), Some(0));
}
#[test]
fn loop_and_hop_cap_drops_count_separately() {
let h = MetricsHub::new();
h.on_dropped_loop();
h.on_dropped_loop();
h.on_dropped_hop_cap();
assert_eq!(h.snapshot(names::TRANSFERS_DROPPED_LOOP), Some(2));
assert_eq!(h.snapshot(names::TRANSFERS_DROPPED_HOP_CAP), Some(1));
}
#[test]
fn rpc_failure_counters_independent() {
let h = MetricsHub::new();
h.on_rpc_timeout();
h.on_dropped_malformed_reply();
h.on_dropped_malformed_reply();
assert_eq!(h.snapshot(names::RPC_CALLS_TIMED_OUT), Some(1));
assert_eq!(
h.snapshot(names::TRANSFERS_DROPPED_MALFORMED_REPLY),
Some(2)
);
}
#[test]
fn unit_of_known_metrics() {
assert_eq!(
MetricsHub::unit_of(names::CONNECTIONS_ACTIVE),
Some(units::COUNT)
);
assert_eq!(
MetricsHub::unit_of(names::TRANSFERS_RATE),
Some(units::PER_SECOND)
);
assert_eq!(MetricsHub::unit_of("not.a.metric"), None);
}
#[test]
fn topics_added_removed_balances() {
let h = MetricsHub::new();
h.on_topic_added();
h.on_topic_added();
h.on_topic_added();
assert_eq!(h.snapshot(names::TOPICS_EXPOSED), Some(3));
h.on_topic_removed();
assert_eq!(h.snapshot(names::TOPICS_EXPOSED), Some(2));
}
#[test]
fn reconnect_overflow_counter() {
let h = MetricsHub::new();
h.on_reconnect_overflow();
h.on_reconnect_overflow();
assert_eq!(
h.snapshot(names::TRANSFERS_DROPPED_RECONNECT_OVERFLOW),
Some(2)
);
}
}