atomr_cluster/
membership.rs1use std::collections::BTreeSet;
4
5use atomr_core::actor::Address;
6use serde::{Deserialize, Serialize};
7
8use crate::events::ClusterEvent;
9use crate::leader::{is_converged, next_status};
10use crate::member::{Member, MemberStatus};
11use crate::reachability::Reachability;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct MembershipState {
15 pub members: Vec<Member>,
16 pub reachability: Reachability,
17}
18
19impl MembershipState {
20 pub fn new() -> Self {
21 Self::default()
22 }
23
24 pub fn add_or_update(&mut self, m: Member) {
25 if let Some(existing) = self.members.iter_mut().find(|x| x.address == m.address) {
26 *existing = m;
27 } else {
28 self.members.push(m);
29 }
30 }
31
32 pub fn remove(&mut self, addr: &Address) {
33 self.members.retain(|m| &m.address != addr);
34 }
35
36 pub fn up_members(&self) -> Vec<&Member> {
37 self.members.iter().filter(|m| matches!(m.status, MemberStatus::Up)).collect()
38 }
39
40 pub fn unreachable_addresses(&self) -> BTreeSet<String> {
41 self.members
42 .iter()
43 .filter(|m| !self.reachability.is_reachable(&m.address))
44 .map(|m| m.address.to_string())
45 .collect()
46 }
47
48 pub fn member_count(&self) -> usize {
49 self.members.len()
50 }
51
52 pub fn apply_leader_actions(&mut self) -> Vec<ClusterEvent> {
60 let converged = is_converged(self);
61 let mut events = Vec::new();
62 let mut transitions: Vec<(Address, MemberStatus)> = Vec::new();
64 for m in &self.members {
65 if let Some(next) = next_status(m.status, converged) {
66 transitions.push((m.address.clone(), next));
67 }
68 }
69 for (addr, next) in transitions {
71 if let Some(m) = self.members.iter_mut().find(|x| x.address == addr) {
72 let prev = m.status;
73 m.status = next;
74 let updated = m.clone();
75 let evt = match next {
76 MemberStatus::Up => ClusterEvent::MemberUp(updated.clone()),
77 MemberStatus::Exiting => ClusterEvent::MemberExited(updated.clone()),
78 MemberStatus::Removed => ClusterEvent::MemberRemoved(updated.clone(), prev),
79 _ => continue,
80 };
81 events.push(evt);
82 }
83 }
84 self.members.retain(|m| m.status != MemberStatus::Removed);
86 if converged {
87 events.push(ClusterEvent::Convergence(true));
88 }
89 events
90 }
91
92 pub fn join(&mut self, m: Member) -> ClusterEvent {
95 self.add_or_update(m.clone());
96 ClusterEvent::MemberJoined(m)
97 }
98
99 pub fn leave(&mut self, addr: &Address) -> Option<ClusterEvent> {
102 let m = self.members.iter_mut().find(|x| &x.address == addr)?;
103 if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp) {
104 m.status = MemberStatus::Leaving;
105 return Some(ClusterEvent::MemberLeft(m.clone()));
106 }
107 None
108 }
109
110 pub fn down(&mut self, addr: &Address) -> Option<ClusterEvent> {
119 let m = self.members.iter_mut().find(|x| &x.address == addr)?;
120 if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp | MemberStatus::Leaving) {
121 m.status = MemberStatus::Down;
122 return Some(ClusterEvent::MemberDowned(m.clone()));
123 }
124 None
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn add_and_remove() {
134 let mut s = MembershipState::new();
135 let m = Member::new(Address::local("a"), vec![]);
136 s.add_or_update(m.clone());
137 assert_eq!(s.member_count(), 1);
138 s.remove(&m.address);
139 assert_eq!(s.member_count(), 0);
140 }
141
142 #[test]
143 fn join_emits_member_joined() {
144 let mut s = MembershipState::new();
145 let evt = s.join(Member::new(Address::local("a"), vec![]));
146 assert!(matches!(evt, ClusterEvent::MemberJoined(_)));
147 assert_eq!(s.member_count(), 1);
148 }
149
150 #[test]
151 fn leader_actions_promote_joining_to_up_when_converged() {
152 let mut s = MembershipState::new();
153 s.join(Member::new(Address::local("a"), vec![]));
154 let events = s.apply_leader_actions();
156 let names: Vec<_> = events
157 .iter()
158 .filter_map(|e| match e {
159 ClusterEvent::MemberUp(m) => Some(m.address.to_string()),
160 _ => None,
161 })
162 .collect();
163 assert_eq!(names, vec!["akka://a".to_string()]);
164 }
165
166 #[test]
167 fn leader_actions_remove_down_members() {
168 let mut s = MembershipState::new();
169 let mut m = Member::new(Address::local("a"), vec![]);
170 m.status = MemberStatus::Down;
171 s.add_or_update(m);
172 let _ = s.apply_leader_actions();
173 assert_eq!(s.member_count(), 0);
174 }
175
176 #[test]
177 fn leave_marks_up_member_as_leaving() {
178 let mut s = MembershipState::new();
179 let mut m = Member::new(Address::local("a"), vec![]);
180 m.status = MemberStatus::Up;
181 s.add_or_update(m);
182 let evt = s.leave(&Address::local("a"));
183 assert!(matches!(evt, Some(ClusterEvent::MemberLeft(_))));
184 }
185
186 #[test]
187 fn leave_is_noop_for_unknown_member() {
188 let mut s = MembershipState::new();
189 let evt = s.leave(&Address::local("nope"));
190 assert!(evt.is_none());
191 }
192
193 #[test]
194 fn down_marks_up_member_as_down() {
195 let mut s = MembershipState::new();
196 let mut m = Member::new(Address::local("a"), vec![]);
197 m.status = MemberStatus::Up;
198 s.add_or_update(m);
199 let evt = s.down(&Address::local("a"));
200 assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
201 let m = s.members.first().unwrap();
202 assert_eq!(m.status, MemberStatus::Down);
203 }
204
205 #[test]
206 fn down_accepts_leaving_member() {
207 let mut s = MembershipState::new();
208 let mut m = Member::new(Address::local("a"), vec![]);
209 m.status = MemberStatus::Leaving;
210 s.add_or_update(m);
211 let evt = s.down(&Address::local("a"));
212 assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
213 }
214
215 #[test]
216 fn down_accepts_weakly_up_member() {
217 let mut s = MembershipState::new();
218 let mut m = Member::new(Address::local("a"), vec![]);
219 m.status = MemberStatus::WeaklyUp;
220 s.add_or_update(m);
221 let evt = s.down(&Address::local("a"));
222 assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
223 }
224
225 #[test]
226 fn down_is_noop_for_unknown_member() {
227 let mut s = MembershipState::new();
228 let evt = s.down(&Address::local("nope"));
229 assert!(evt.is_none());
230 }
231
232 #[test]
233 fn down_is_noop_for_already_down_member() {
234 let mut s = MembershipState::new();
235 let mut m = Member::new(Address::local("a"), vec![]);
236 m.status = MemberStatus::Down;
237 s.add_or_update(m);
238 let evt = s.down(&Address::local("a"));
239 assert!(evt.is_none());
240 }
241}