Skip to main content

atomr_telemetry/
remote.rs

1//! Remote probe — snapshots the remote `EndpointRegistry` plus inbound /
2//! outbound byte counters. Cooperates with the (optional)
3//! `atomr-remote` crate.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use dashmap::DashMap;
8
9use crate::bus::{TelemetryBus, TelemetryEvent};
10use crate::dto::{RemoteAssociationInfo, RemoteSnapshot};
11
12struct AssociationCounters {
13    state: parking_lot::RwLock<String>,
14    inbound_bytes: AtomicU64,
15    outbound_bytes: AtomicU64,
16}
17
18pub struct RemoteProbe {
19    bus: TelemetryBus,
20    associations: DashMap<String, AssociationCounters>,
21}
22
23impl RemoteProbe {
24    pub fn new(bus: TelemetryBus) -> Self {
25        Self { bus, associations: DashMap::new() }
26    }
27
28    pub fn record_association(&self, remote_address: &str, state: &str) {
29        self.associations.insert(
30            remote_address.to_string(),
31            AssociationCounters {
32                state: parking_lot::RwLock::new(state.to_string()),
33                inbound_bytes: AtomicU64::new(0),
34                outbound_bytes: AtomicU64::new(0),
35            },
36        );
37        self.bus.publish(TelemetryEvent::RemoteAssociation(RemoteAssociationInfo {
38            remote_address: remote_address.to_string(),
39            state: state.to_string(),
40            inbound_bytes: 0,
41            outbound_bytes: 0,
42        }));
43    }
44
45    pub fn set_state(&self, remote_address: &str, state: &str) {
46        if let Some(entry) = self.associations.get(remote_address) {
47            *entry.state.write() = state.to_string();
48        }
49    }
50
51    pub fn record_inbound_bytes(&self, remote_address: &str, bytes: u64) {
52        if let Some(entry) = self.associations.get(remote_address) {
53            entry.inbound_bytes.fetch_add(bytes, Ordering::Relaxed);
54        }
55    }
56
57    pub fn record_outbound_bytes(&self, remote_address: &str, bytes: u64) {
58        if let Some(entry) = self.associations.get(remote_address) {
59            entry.outbound_bytes.fetch_add(bytes, Ordering::Relaxed);
60        }
61    }
62
63    pub fn remove(&self, remote_address: &str) {
64        self.associations.remove(remote_address);
65    }
66
67    pub fn association_count(&self) -> usize {
68        self.associations.len()
69    }
70
71    pub fn snapshot(&self) -> RemoteSnapshot {
72        let associations: Vec<RemoteAssociationInfo> = self
73            .associations
74            .iter()
75            .map(|e| RemoteAssociationInfo {
76                remote_address: e.key().clone(),
77                state: e.value().state.read().clone(),
78                inbound_bytes: e.value().inbound_bytes.load(Ordering::Relaxed),
79                outbound_bytes: e.value().outbound_bytes.load(Ordering::Relaxed),
80            })
81            .collect();
82        RemoteSnapshot { associations }
83    }
84}
85
86/// Populate the probe from a live [`atomr_remote::EndpointManager`].
87/// Creates entries for every known remote with the manager's reported
88/// association state, and pulls byte counters from
89/// [`atomr_remote::RemoteMetrics`].
90#[cfg(feature = "remote")]
91pub fn refresh_from_endpoint_manager(probe: &RemoteProbe, manager: &atomr_remote::EndpointManager) {
92    use std::collections::HashSet;
93    let states = manager.peer_states();
94    let live: HashSet<String> = states.iter().map(|(k, _, _)| k.clone()).collect();
95    for (addr, state, _attempt) in &states {
96        probe.associations.entry(addr.clone()).or_insert_with(|| AssociationCounters {
97            state: parking_lot::RwLock::new((*state).to_string()),
98            inbound_bytes: AtomicU64::new(0),
99            outbound_bytes: AtomicU64::new(0),
100        });
101        probe.set_state(addr, state);
102    }
103    let snap = manager.metrics().snapshot();
104    for row in snap.per_address {
105        if let Some(entry) = probe.associations.get(&row.address) {
106            entry.inbound_bytes.store(row.received_bytes, Ordering::Relaxed);
107            entry.outbound_bytes.store(row.sent_bytes, Ordering::Relaxed);
108        }
109    }
110    let stale: Vec<String> =
111        probe.associations.iter().map(|e| e.key().clone()).filter(|k| !live.contains(k)).collect();
112    for k in stale {
113        probe.remove(&k);
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn tracks_associations_and_bytes() {
123        let bus = TelemetryBus::new(8);
124        let p = RemoteProbe::new(bus);
125        p.record_association("akka://A@host:1", "active");
126        p.record_inbound_bytes("akka://A@host:1", 100);
127        p.record_outbound_bytes("akka://A@host:1", 200);
128        let snap = p.snapshot();
129        assert_eq!(snap.associations.len(), 1);
130        assert_eq!(snap.associations[0].inbound_bytes, 100);
131        assert_eq!(snap.associations[0].outbound_bytes, 200);
132    }
133}