atomr_telemetry/
remote.rs1use std::sync::atomic::{AtomicU64, Ordering};
6
7use dashmap::DashMap;
8
9use crate::bus::{TelemetryBus, TelemetryEvent};
10use crate::dto::{RemoteAssociationInfo, RemoteSnapshot};
11
12struct AssociationCounters {
13 state: parking_lot::RwLock<String>,
14 inbound_bytes: AtomicU64,
15 outbound_bytes: AtomicU64,
16}
17
18pub struct RemoteProbe {
19 bus: TelemetryBus,
20 associations: DashMap<String, AssociationCounters>,
21}
22
23impl RemoteProbe {
24 pub fn new(bus: TelemetryBus) -> Self {
25 Self { bus, associations: DashMap::new() }
26 }
27
28 pub fn record_association(&self, remote_address: &str, state: &str) {
29 self.associations.insert(
30 remote_address.to_string(),
31 AssociationCounters {
32 state: parking_lot::RwLock::new(state.to_string()),
33 inbound_bytes: AtomicU64::new(0),
34 outbound_bytes: AtomicU64::new(0),
35 },
36 );
37 self.bus.publish(TelemetryEvent::RemoteAssociation(RemoteAssociationInfo {
38 remote_address: remote_address.to_string(),
39 state: state.to_string(),
40 inbound_bytes: 0,
41 outbound_bytes: 0,
42 }));
43 }
44
45 pub fn set_state(&self, remote_address: &str, state: &str) {
46 if let Some(entry) = self.associations.get(remote_address) {
47 *entry.state.write() = state.to_string();
48 }
49 }
50
51 pub fn record_inbound_bytes(&self, remote_address: &str, bytes: u64) {
52 if let Some(entry) = self.associations.get(remote_address) {
53 entry.inbound_bytes.fetch_add(bytes, Ordering::Relaxed);
54 }
55 }
56
57 pub fn record_outbound_bytes(&self, remote_address: &str, bytes: u64) {
58 if let Some(entry) = self.associations.get(remote_address) {
59 entry.outbound_bytes.fetch_add(bytes, Ordering::Relaxed);
60 }
61 }
62
63 pub fn remove(&self, remote_address: &str) {
64 self.associations.remove(remote_address);
65 }
66
67 pub fn association_count(&self) -> usize {
68 self.associations.len()
69 }
70
71 pub fn snapshot(&self) -> RemoteSnapshot {
72 let associations: Vec<RemoteAssociationInfo> = self
73 .associations
74 .iter()
75 .map(|e| RemoteAssociationInfo {
76 remote_address: e.key().clone(),
77 state: e.value().state.read().clone(),
78 inbound_bytes: e.value().inbound_bytes.load(Ordering::Relaxed),
79 outbound_bytes: e.value().outbound_bytes.load(Ordering::Relaxed),
80 })
81 .collect();
82 RemoteSnapshot { associations }
83 }
84}
85
86#[cfg(feature = "remote")]
91pub fn refresh_from_endpoint_manager(probe: &RemoteProbe, manager: &atomr_remote::EndpointManager) {
92 use std::collections::HashSet;
93 let states = manager.peer_states();
94 let live: HashSet<String> = states.iter().map(|(k, _, _)| k.clone()).collect();
95 for (addr, state, _attempt) in &states {
96 probe.associations.entry(addr.clone()).or_insert_with(|| AssociationCounters {
97 state: parking_lot::RwLock::new((*state).to_string()),
98 inbound_bytes: AtomicU64::new(0),
99 outbound_bytes: AtomicU64::new(0),
100 });
101 probe.set_state(addr, state);
102 }
103 let snap = manager.metrics().snapshot();
104 for row in snap.per_address {
105 if let Some(entry) = probe.associations.get(&row.address) {
106 entry.inbound_bytes.store(row.received_bytes, Ordering::Relaxed);
107 entry.outbound_bytes.store(row.sent_bytes, Ordering::Relaxed);
108 }
109 }
110 let stale: Vec<String> =
111 probe.associations.iter().map(|e| e.key().clone()).filter(|k| !live.contains(k)).collect();
112 for k in stale {
113 probe.remove(&k);
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 #[test]
122 fn tracks_associations_and_bytes() {
123 let bus = TelemetryBus::new(8);
124 let p = RemoteProbe::new(bus);
125 p.record_association("akka://A@host:1", "active");
126 p.record_inbound_bytes("akka://A@host:1", 100);
127 p.record_outbound_bytes("akka://A@host:1", 200);
128 let snap = p.snapshot();
129 assert_eq!(snap.associations.len(), 1);
130 assert_eq!(snap.associations[0].inbound_bytes, 100);
131 assert_eq!(snap.associations[0].outbound_bytes, 200);
132 }
133}