atomr_telemetry/
cluster.rs1use parking_lot::RwLock;
6
7use crate::bus::{TelemetryBus, TelemetryEvent};
8#[cfg(feature = "cluster")]
9use crate::dto::ReachabilityRecord;
10use crate::dto::{ClusterMemberInfo, ClusterMembershipDiff, ClusterStateInfo};
11
12pub struct ClusterProbe {
13 bus: TelemetryBus,
14 state: RwLock<ClusterStateInfo>,
15}
16
17impl ClusterProbe {
18 pub fn new(bus: TelemetryBus) -> Self {
19 Self { bus, state: RwLock::new(ClusterStateInfo::default()) }
20 }
21
22 pub fn set_self_address(&self, addr: impl Into<String>) {
23 self.state.write().self_address = Some(addr.into());
24 }
25
26 pub fn set_leader(&self, leader: Option<String>) {
27 self.state.write().leader = leader;
28 }
29
30 pub fn update(&self, next: ClusterStateInfo) {
35 let prev = std::mem::replace(&mut *self.state.write(), next.clone());
36 let diff = compute_diff(&prev, &next);
37 if !diff.is_empty() {
38 self.bus.publish(TelemetryEvent::ClusterChanged(diff));
39 }
40 }
41
42 pub fn snapshot(&self) -> ClusterStateInfo {
43 self.state.read().clone()
44 }
45
46 pub fn member_count(&self) -> usize {
47 self.state.read().members.len()
48 }
49
50 pub fn unreachable_count(&self) -> usize {
51 self.state.read().unreachable.len()
52 }
53}
54
55fn compute_diff(prev: &ClusterStateInfo, next: &ClusterStateInfo) -> ClusterMembershipDiff {
56 let prev_by_addr: std::collections::HashMap<&str, &ClusterMemberInfo> =
57 prev.members.iter().map(|m| (m.address.as_str(), m)).collect();
58 let next_by_addr: std::collections::HashMap<&str, &ClusterMemberInfo> =
59 next.members.iter().map(|m| (m.address.as_str(), m)).collect();
60
61 let mut added = Vec::new();
62 let mut updated = Vec::new();
63 for m in &next.members {
64 match prev_by_addr.get(m.address.as_str()) {
65 None => added.push(m.clone()),
66 Some(old) if old.status != m.status || old.reachable != m.reachable => updated.push(m.clone()),
67 _ => {}
68 }
69 }
70 let removed: Vec<String> = prev
71 .members
72 .iter()
73 .filter(|m| !next_by_addr.contains_key(m.address.as_str()))
74 .map(|m| m.address.clone())
75 .collect();
76
77 let prev_unreach: std::collections::HashSet<&str> = prev.unreachable.iter().map(|s| s.as_str()).collect();
78 let next_unreach: std::collections::HashSet<&str> = next.unreachable.iter().map(|s| s.as_str()).collect();
79 let became_unreachable: Vec<String> =
80 next_unreach.difference(&prev_unreach).map(|s| s.to_string()).collect();
81 let became_reachable: Vec<String> =
82 prev_unreach.difference(&next_unreach).map(|s| s.to_string()).collect();
83
84 ClusterMembershipDiff { added, updated, removed, became_unreachable, became_reachable }
85}
86
87impl ClusterMembershipDiff {
88 fn is_empty(&self) -> bool {
89 self.added.is_empty()
90 && self.updated.is_empty()
91 && self.removed.is_empty()
92 && self.became_unreachable.is_empty()
93 && self.became_reachable.is_empty()
94 }
95}
96
97#[cfg(feature = "cluster")]
100pub fn from_cluster_state(state: &atomr_cluster::MembershipState) -> ClusterStateInfo {
101 use atomr_cluster::ReachabilityStatus;
102
103 let members: Vec<ClusterMemberInfo> = state
104 .members
105 .iter()
106 .map(|m| ClusterMemberInfo {
107 address: m.address.to_string(),
108 status: format!("{:?}", m.status),
109 roles: m.roles.clone(),
110 reachable: state.reachability.is_reachable(&m.address),
111 up_number: m.up_number,
112 })
113 .collect();
114
115 let unreachable: Vec<String> =
116 members.iter().filter(|m| !m.reachable).map(|m| m.address.clone()).collect();
117
118 let reachability_records: Vec<ReachabilityRecord> = state
119 .reachability
120 .records
121 .iter()
122 .map(|((observer, subject), status)| ReachabilityRecord {
123 observer: observer.to_string(),
124 subject: subject.to_string(),
125 status: match status {
126 ReachabilityStatus::Reachable => "reachable".into(),
127 ReachabilityStatus::Unreachable => "unreachable".into(),
128 ReachabilityStatus::Terminated => "terminated".into(),
129 _ => "unknown".into(),
130 },
131 })
132 .collect();
133
134 ClusterStateInfo {
135 self_address: None,
136 leader: None,
137 members,
138 unreachable,
139 reachability_records,
140 gossip_version: Vec::new(),
141 }
142}
143
144#[cfg(feature = "cluster")]
147pub fn from_gossip(gossip: &atomr_cluster::Gossip) -> ClusterStateInfo {
148 let mut state = from_cluster_state(&gossip.state);
149 state.gossip_version = gossip.version.versions.iter().map(|(k, v)| (k.clone(), *v)).collect();
150 state
151}
152
153impl ClusterProbe {
154 #[cfg(feature = "cluster")]
156 pub fn update_from_gossip(&self, gossip: &atomr_cluster::Gossip) {
157 self.update(from_gossip(gossip));
158 }
159
160 #[cfg(feature = "cluster")]
162 pub fn update_from_state(&self, state: &atomr_cluster::MembershipState) {
163 self.update(from_cluster_state(state));
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 fn member(addr: &str, status: &str, reachable: bool) -> ClusterMemberInfo {
172 ClusterMemberInfo {
173 address: addr.into(),
174 status: status.into(),
175 roles: vec![],
176 reachable,
177 up_number: 1,
178 }
179 }
180
181 #[test]
182 fn diffs_added_updated_removed() {
183 let prev = ClusterStateInfo {
184 members: vec![member("a", "Up", true), member("b", "Up", true)],
185 unreachable: vec![],
186 ..Default::default()
187 };
188 let next = ClusterStateInfo {
189 members: vec![member("a", "Leaving", true), member("c", "Joining", true)],
190 unreachable: vec![],
191 ..Default::default()
192 };
193 let d = compute_diff(&prev, &next);
194 assert_eq!(d.added.len(), 1);
195 assert_eq!(d.updated.len(), 1);
196 assert_eq!(d.removed.len(), 1);
197 }
198
199 #[tokio::test]
200 async fn emits_change_event() {
201 let bus = TelemetryBus::new(8);
202 let mut rx = bus.subscribe();
203 let probe = ClusterProbe::new(bus);
204 probe.update(ClusterStateInfo { members: vec![member("a", "Up", true)], ..Default::default() });
205 let e = rx.recv().await.unwrap();
206 assert_eq!(e.topic(), "cluster");
207 }
208}