Skip to main content

atomr_remote/
metrics.rs

1//! Remote metrics extension.
2//!
3//! Lightweight per-`Address` counters for sent/received messages and
4//! bytes. The dashboard / `atomr-telemetry` consume this via
5//! [`RemoteMetrics::snapshot`].
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use dashmap::DashMap;
11
12use atomr_core::actor::Address;
13
14#[derive(Default, Debug)]
15struct PerAddress {
16    sent_messages: AtomicU64,
17    sent_bytes: AtomicU64,
18    received_messages: AtomicU64,
19    received_bytes: AtomicU64,
20    errors: AtomicU64,
21}
22
23#[derive(Default, Clone)]
24pub struct RemoteMetrics {
25    inner: Arc<DashMap<String, PerAddress>>,
26}
27
28#[derive(Debug, Clone, Default)]
29pub struct RemoteMetricsSnapshot {
30    pub per_address: Vec<RemoteMetricsRow>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RemoteMetricsRow {
35    pub address: String,
36    pub sent_messages: u64,
37    pub sent_bytes: u64,
38    pub received_messages: u64,
39    pub received_bytes: u64,
40    pub errors: u64,
41}
42
43impl RemoteMetrics {
44    pub fn new() -> Self {
45        Self::default()
46    }
47
48    pub fn record_send(&self, address: &Address, bytes: usize) {
49        let e = self.inner.entry(address.to_string()).or_default();
50        e.sent_messages.fetch_add(1, Ordering::Relaxed);
51        e.sent_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
52    }
53
54    pub fn record_receive(&self, address: &Address, bytes: usize) {
55        let e = self.inner.entry(address.to_string()).or_default();
56        e.received_messages.fetch_add(1, Ordering::Relaxed);
57        e.received_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
58    }
59
60    pub fn record_error(&self, address: &Address) {
61        let e = self.inner.entry(address.to_string()).or_default();
62        e.errors.fetch_add(1, Ordering::Relaxed);
63    }
64
65    pub fn snapshot(&self) -> RemoteMetricsSnapshot {
66        let per_address = self
67            .inner
68            .iter()
69            .map(|kv| RemoteMetricsRow {
70                address: kv.key().clone(),
71                sent_messages: kv.value().sent_messages.load(Ordering::Relaxed),
72                sent_bytes: kv.value().sent_bytes.load(Ordering::Relaxed),
73                received_messages: kv.value().received_messages.load(Ordering::Relaxed),
74                received_bytes: kv.value().received_bytes.load(Ordering::Relaxed),
75                errors: kv.value().errors.load(Ordering::Relaxed),
76            })
77            .collect();
78        RemoteMetricsSnapshot { per_address }
79    }
80}