1use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use dashmap::DashMap;
12
13use atomr_core::actor::Address;
14
15#[derive(Default, Debug)]
16struct PerAddress {
17 sent_messages: AtomicU64,
18 sent_bytes: AtomicU64,
19 received_messages: AtomicU64,
20 received_bytes: AtomicU64,
21 errors: AtomicU64,
22}
23
24#[derive(Default, Clone)]
25pub struct RemoteMetrics {
26 inner: Arc<DashMap<String, PerAddress>>,
27}
28
29#[derive(Debug, Clone, Default)]
30pub struct RemoteMetricsSnapshot {
31 pub per_address: Vec<RemoteMetricsRow>,
32}
33
34#[derive(Debug, Clone)]
35pub struct RemoteMetricsRow {
36 pub address: String,
37 pub sent_messages: u64,
38 pub sent_bytes: u64,
39 pub received_messages: u64,
40 pub received_bytes: u64,
41 pub errors: u64,
42}
43
44impl RemoteMetrics {
45 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn record_send(&self, address: &Address, bytes: usize) {
50 let e = self.inner.entry(address.to_string()).or_default();
51 e.sent_messages.fetch_add(1, Ordering::Relaxed);
52 e.sent_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
53 }
54
55 pub fn record_receive(&self, address: &Address, bytes: usize) {
56 let e = self.inner.entry(address.to_string()).or_default();
57 e.received_messages.fetch_add(1, Ordering::Relaxed);
58 e.received_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
59 }
60
61 pub fn record_error(&self, address: &Address) {
62 let e = self.inner.entry(address.to_string()).or_default();
63 e.errors.fetch_add(1, Ordering::Relaxed);
64 }
65
66 pub fn snapshot(&self) -> RemoteMetricsSnapshot {
67 let per_address = self
68 .inner
69 .iter()
70 .map(|kv| RemoteMetricsRow {
71 address: kv.key().clone(),
72 sent_messages: kv.value().sent_messages.load(Ordering::Relaxed),
73 sent_bytes: kv.value().sent_bytes.load(Ordering::Relaxed),
74 received_messages: kv.value().received_messages.load(Ordering::Relaxed),
75 received_bytes: kv.value().received_bytes.load(Ordering::Relaxed),
76 errors: kv.value().errors.load(Ordering::Relaxed),
77 })
78 .collect();
79 RemoteMetricsSnapshot { per_address }
80 }
81}