1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use dashmap::DashMap;
11
12use atomr_core::actor::Address;
13
14#[derive(Default, Debug)]
15struct PerAddress {
16 sent_messages: AtomicU64,
17 sent_bytes: AtomicU64,
18 received_messages: AtomicU64,
19 received_bytes: AtomicU64,
20 errors: AtomicU64,
21}
22
23#[derive(Default, Clone)]
24pub struct RemoteMetrics {
25 inner: Arc<DashMap<String, PerAddress>>,
26}
27
28#[derive(Debug, Clone, Default)]
29pub struct RemoteMetricsSnapshot {
30 pub per_address: Vec<RemoteMetricsRow>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RemoteMetricsRow {
35 pub address: String,
36 pub sent_messages: u64,
37 pub sent_bytes: u64,
38 pub received_messages: u64,
39 pub received_bytes: u64,
40 pub errors: u64,
41}
42
43impl RemoteMetrics {
44 pub fn new() -> Self {
45 Self::default()
46 }
47
48 pub fn record_send(&self, address: &Address, bytes: usize) {
49 let e = self.inner.entry(address.to_string()).or_default();
50 e.sent_messages.fetch_add(1, Ordering::Relaxed);
51 e.sent_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
52 }
53
54 pub fn record_receive(&self, address: &Address, bytes: usize) {
55 let e = self.inner.entry(address.to_string()).or_default();
56 e.received_messages.fetch_add(1, Ordering::Relaxed);
57 e.received_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
58 }
59
60 pub fn record_error(&self, address: &Address) {
61 let e = self.inner.entry(address.to_string()).or_default();
62 e.errors.fetch_add(1, Ordering::Relaxed);
63 }
64
65 pub fn snapshot(&self) -> RemoteMetricsSnapshot {
66 let per_address = self
67 .inner
68 .iter()
69 .map(|kv| RemoteMetricsRow {
70 address: kv.key().clone(),
71 sent_messages: kv.value().sent_messages.load(Ordering::Relaxed),
72 sent_bytes: kv.value().sent_bytes.load(Ordering::Relaxed),
73 received_messages: kv.value().received_messages.load(Ordering::Relaxed),
74 received_bytes: kv.value().received_bytes.load(Ordering::Relaxed),
75 errors: kv.value().errors.load(Ordering::Relaxed),
76 })
77 .collect();
78 RemoteMetricsSnapshot { per_address }
79 }
80}