Skip to main content

atomr_remote/
metrics.rs

1//! Remote metrics extension.
2//! akka.net: `Remote/RemoteMetricsExtension.cs`.
3//!
4//! Lightweight per-`Address` counters for sent/received messages and
5//! bytes. The dashboard / `atomr-telemetry` consume this via
6//! [`RemoteMetrics::snapshot`].
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use dashmap::DashMap;
12
13use atomr_core::actor::Address;
14
15#[derive(Default, Debug)]
16struct PerAddress {
17    sent_messages: AtomicU64,
18    sent_bytes: AtomicU64,
19    received_messages: AtomicU64,
20    received_bytes: AtomicU64,
21    errors: AtomicU64,
22}
23
24#[derive(Default, Clone)]
25pub struct RemoteMetrics {
26    inner: Arc<DashMap<String, PerAddress>>,
27}
28
29#[derive(Debug, Clone, Default)]
30pub struct RemoteMetricsSnapshot {
31    pub per_address: Vec<RemoteMetricsRow>,
32}
33
34#[derive(Debug, Clone)]
35pub struct RemoteMetricsRow {
36    pub address: String,
37    pub sent_messages: u64,
38    pub sent_bytes: u64,
39    pub received_messages: u64,
40    pub received_bytes: u64,
41    pub errors: u64,
42}
43
44impl RemoteMetrics {
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    pub fn record_send(&self, address: &Address, bytes: usize) {
50        let e = self.inner.entry(address.to_string()).or_default();
51        e.sent_messages.fetch_add(1, Ordering::Relaxed);
52        e.sent_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
53    }
54
55    pub fn record_receive(&self, address: &Address, bytes: usize) {
56        let e = self.inner.entry(address.to_string()).or_default();
57        e.received_messages.fetch_add(1, Ordering::Relaxed);
58        e.received_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
59    }
60
61    pub fn record_error(&self, address: &Address) {
62        let e = self.inner.entry(address.to_string()).or_default();
63        e.errors.fetch_add(1, Ordering::Relaxed);
64    }
65
66    pub fn snapshot(&self) -> RemoteMetricsSnapshot {
67        let per_address = self
68            .inner
69            .iter()
70            .map(|kv| RemoteMetricsRow {
71                address: kv.key().clone(),
72                sent_messages: kv.value().sent_messages.load(Ordering::Relaxed),
73                sent_bytes: kv.value().sent_bytes.load(Ordering::Relaxed),
74                received_messages: kv.value().received_messages.load(Ordering::Relaxed),
75                received_bytes: kv.value().received_bytes.load(Ordering::Relaxed),
76                errors: kv.value().errors.load(Ordering::Relaxed),
77            })
78            .collect();
79        RemoteMetricsSnapshot { per_address }
80    }
81}