Skip to main content

atomr_telemetry/
cluster.rs

1//! Cluster state probe — wraps `atomr-cluster`'s `MembershipState` /
2//! `Gossip` data structures with an owning snapshot that the dashboard
3//! can poll and an event publisher that emits diff events on updates.
4
5use parking_lot::RwLock;
6
7use crate::bus::{TelemetryBus, TelemetryEvent};
8#[cfg(feature = "cluster")]
9use crate::dto::ReachabilityRecord;
10use crate::dto::{ClusterMemberInfo, ClusterMembershipDiff, ClusterStateInfo};
11
12pub struct ClusterProbe {
13    bus: TelemetryBus,
14    state: RwLock<ClusterStateInfo>,
15}
16
17impl ClusterProbe {
18    pub fn new(bus: TelemetryBus) -> Self {
19        Self { bus, state: RwLock::new(ClusterStateInfo::default()) }
20    }
21
22    pub fn set_self_address(&self, addr: impl Into<String>) {
23        self.state.write().self_address = Some(addr.into());
24    }
25
26    pub fn set_leader(&self, leader: Option<String>) {
27        self.state.write().leader = leader;
28    }
29
30    /// Replace the current snapshot and emit a diff event describing the
31    /// change. Consumers that already track a baseline can use the diff
32    /// directly; dashboards that just want the latest value can poll
33    /// [`Self::snapshot`].
34    pub fn update(&self, next: ClusterStateInfo) {
35        let prev = std::mem::replace(&mut *self.state.write(), next.clone());
36        let diff = compute_diff(&prev, &next);
37        if !diff.is_empty() {
38            self.bus.publish(TelemetryEvent::ClusterChanged(diff));
39        }
40    }
41
42    pub fn snapshot(&self) -> ClusterStateInfo {
43        self.state.read().clone()
44    }
45
46    pub fn member_count(&self) -> usize {
47        self.state.read().members.len()
48    }
49
50    pub fn unreachable_count(&self) -> usize {
51        self.state.read().unreachable.len()
52    }
53}
54
55fn compute_diff(prev: &ClusterStateInfo, next: &ClusterStateInfo) -> ClusterMembershipDiff {
56    let prev_by_addr: std::collections::HashMap<&str, &ClusterMemberInfo> =
57        prev.members.iter().map(|m| (m.address.as_str(), m)).collect();
58    let next_by_addr: std::collections::HashMap<&str, &ClusterMemberInfo> =
59        next.members.iter().map(|m| (m.address.as_str(), m)).collect();
60
61    let mut added = Vec::new();
62    let mut updated = Vec::new();
63    for m in &next.members {
64        match prev_by_addr.get(m.address.as_str()) {
65            None => added.push(m.clone()),
66            Some(old) if old.status != m.status || old.reachable != m.reachable => updated.push(m.clone()),
67            _ => {}
68        }
69    }
70    let removed: Vec<String> = prev
71        .members
72        .iter()
73        .filter(|m| !next_by_addr.contains_key(m.address.as_str()))
74        .map(|m| m.address.clone())
75        .collect();
76
77    let prev_unreach: std::collections::HashSet<&str> = prev.unreachable.iter().map(|s| s.as_str()).collect();
78    let next_unreach: std::collections::HashSet<&str> = next.unreachable.iter().map(|s| s.as_str()).collect();
79    let became_unreachable: Vec<String> =
80        next_unreach.difference(&prev_unreach).map(|s| s.to_string()).collect();
81    let became_reachable: Vec<String> =
82        prev_unreach.difference(&next_unreach).map(|s| s.to_string()).collect();
83
84    ClusterMembershipDiff { added, updated, removed, became_unreachable, became_reachable }
85}
86
87impl ClusterMembershipDiff {
88    fn is_empty(&self) -> bool {
89        self.added.is_empty()
90            && self.updated.is_empty()
91            && self.removed.is_empty()
92            && self.became_unreachable.is_empty()
93            && self.became_reachable.is_empty()
94    }
95}
96
97/// Convert a `atomr-cluster` `MembershipState` into our serializable
98/// `ClusterStateInfo`. Feature-gated because the cluster crate is optional.
99#[cfg(feature = "cluster")]
100pub fn from_cluster_state(state: &atomr_cluster::MembershipState) -> ClusterStateInfo {
101    use atomr_cluster::ReachabilityStatus;
102
103    let members: Vec<ClusterMemberInfo> = state
104        .members
105        .iter()
106        .map(|m| ClusterMemberInfo {
107            address: m.address.to_string(),
108            status: format!("{:?}", m.status),
109            roles: m.roles.clone(),
110            reachable: state.reachability.is_reachable(&m.address),
111            up_number: m.up_number,
112        })
113        .collect();
114
115    let unreachable: Vec<String> =
116        members.iter().filter(|m| !m.reachable).map(|m| m.address.clone()).collect();
117
118    let reachability_records: Vec<ReachabilityRecord> = state
119        .reachability
120        .records
121        .iter()
122        .map(|((observer, subject), status)| ReachabilityRecord {
123            observer: observer.to_string(),
124            subject: subject.to_string(),
125            status: match status {
126                ReachabilityStatus::Reachable => "reachable".into(),
127                ReachabilityStatus::Unreachable => "unreachable".into(),
128                ReachabilityStatus::Terminated => "terminated".into(),
129                _ => "unknown".into(),
130            },
131        })
132        .collect();
133
134    ClusterStateInfo {
135        self_address: None,
136        leader: None,
137        members,
138        unreachable,
139        reachability_records,
140        gossip_version: Vec::new(),
141    }
142}
143
144/// Convert a full `Gossip` into a serializable `ClusterStateInfo` that
145/// also carries the vector-clock version vector.
146#[cfg(feature = "cluster")]
147pub fn from_gossip(gossip: &atomr_cluster::Gossip) -> ClusterStateInfo {
148    let mut state = from_cluster_state(&gossip.state);
149    state.gossip_version = gossip.version.versions.iter().map(|(k, v)| (k.clone(), *v)).collect();
150    state
151}
152
153impl ClusterProbe {
154    /// Convenience: update from a live `atomr-cluster::Gossip`.
155    #[cfg(feature = "cluster")]
156    pub fn update_from_gossip(&self, gossip: &atomr_cluster::Gossip) {
157        self.update(from_gossip(gossip));
158    }
159
160    /// Convenience: update from a `atomr-cluster::MembershipState`.
161    #[cfg(feature = "cluster")]
162    pub fn update_from_state(&self, state: &atomr_cluster::MembershipState) {
163        self.update(from_cluster_state(state));
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    fn member(addr: &str, status: &str, reachable: bool) -> ClusterMemberInfo {
172        ClusterMemberInfo {
173            address: addr.into(),
174            status: status.into(),
175            roles: vec![],
176            reachable,
177            up_number: 1,
178        }
179    }
180
181    #[test]
182    fn diffs_added_updated_removed() {
183        let prev = ClusterStateInfo {
184            members: vec![member("a", "Up", true), member("b", "Up", true)],
185            unreachable: vec![],
186            ..Default::default()
187        };
188        let next = ClusterStateInfo {
189            members: vec![member("a", "Leaving", true), member("c", "Joining", true)],
190            unreachable: vec![],
191            ..Default::default()
192        };
193        let d = compute_diff(&prev, &next);
194        assert_eq!(d.added.len(), 1);
195        assert_eq!(d.updated.len(), 1);
196        assert_eq!(d.removed.len(), 1);
197    }
198
199    #[tokio::test]
200    async fn emits_change_event() {
201        let bus = TelemetryBus::new(8);
202        let mut rx = bus.subscribe();
203        let probe = ClusterProbe::new(bus);
204        probe.update(ClusterStateInfo { members: vec![member("a", "Up", true)], ..Default::default() });
205        let e = rx.recv().await.unwrap();
206        assert_eq!(e.topic(), "cluster");
207    }
208}