1use atomr_core::actor::Address;
13
14use crate::member::{Member, MemberStatus};
15use crate::membership::MembershipState;
16
17pub fn elect_leader(state: &MembershipState) -> Option<Address> {
24 let mut eligible: Vec<&Member> = state
25 .members
26 .iter()
27 .filter(|m| matches!(m.status, MemberStatus::Up | MemberStatus::Leaving))
28 .filter(|m| state.reachability.is_reachable(&m.address))
29 .collect();
30 eligible.sort_by_key(|a| a.address.to_string());
31 eligible.first().map(|m| m.address.clone())
32}
33
34pub fn next_status(current: MemberStatus, converged: bool) -> Option<MemberStatus> {
43 match (current, converged) {
44 (MemberStatus::Joining, true) => Some(MemberStatus::Up),
45 (MemberStatus::Leaving, true) => Some(MemberStatus::Exiting),
46 (MemberStatus::Exiting, true) => Some(MemberStatus::Removed),
47 (MemberStatus::Down, _) => Some(MemberStatus::Removed),
48 _ => None,
49 }
50}
51
52pub fn is_converged(state: &MembershipState) -> bool {
64 state.members.iter().all(|m| {
65 if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
66 return true;
67 }
68 state.reachability.is_reachable(&m.address)
69 })
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75
76 fn member(addr: &str, status: MemberStatus) -> Member {
77 let mut m = Member::new(Address::local(addr), vec![]);
78 m.status = status;
79 m
80 }
81
82 #[test]
83 fn leader_is_lowest_address_up_member() {
84 let mut s = MembershipState::new();
85 s.add_or_update(member("c", MemberStatus::Up));
86 s.add_or_update(member("a", MemberStatus::Up));
87 s.add_or_update(member("b", MemberStatus::Up));
88 assert_eq!(elect_leader(&s), Some(Address::local("a")));
89 }
90
91 #[test]
92 fn leader_skips_non_up_members() {
93 let mut s = MembershipState::new();
94 s.add_or_update(member("a", MemberStatus::Joining));
95 s.add_or_update(member("b", MemberStatus::Up));
96 assert_eq!(elect_leader(&s), Some(Address::local("b")));
97 }
98
99 #[test]
100 fn leader_skips_unreachable_members() {
101 let mut s = MembershipState::new();
102 s.add_or_update(member("a", MemberStatus::Up));
103 s.add_or_update(member("b", MemberStatus::Up));
104 s.reachability.unreachable(Address::local("b"), Address::local("a"));
106 assert_eq!(elect_leader(&s), Some(Address::local("b")));
107 }
108
109 #[test]
110 fn no_leader_when_no_eligible_members() {
111 let s = MembershipState::new();
112 assert_eq!(elect_leader(&s), None);
113 }
114
115 #[test]
116 fn next_status_transitions() {
117 assert_eq!(next_status(MemberStatus::Joining, true), Some(MemberStatus::Up));
118 assert_eq!(next_status(MemberStatus::Joining, false), None);
119 assert_eq!(next_status(MemberStatus::Leaving, true), Some(MemberStatus::Exiting));
120 assert_eq!(next_status(MemberStatus::Exiting, true), Some(MemberStatus::Removed));
121 assert_eq!(next_status(MemberStatus::Down, false), Some(MemberStatus::Removed));
122 assert_eq!(next_status(MemberStatus::Up, true), None);
123 }
124
125 #[test]
126 fn convergence_holds_when_everyone_reachable() {
127 let mut s = MembershipState::new();
128 s.add_or_update(member("a", MemberStatus::Up));
129 s.add_or_update(member("b", MemberStatus::Joining));
130 assert!(is_converged(&s));
131 }
132
133 #[test]
134 fn convergence_fails_when_a_member_is_unreachable() {
135 let mut s = MembershipState::new();
136 s.add_or_update(member("a", MemberStatus::Up));
137 s.add_or_update(member("b", MemberStatus::Up));
138 s.reachability.unreachable(Address::local("a"), Address::local("b"));
139 assert!(!is_converged(&s));
140 }
141
142 #[test]
143 fn down_members_do_not_block_convergence() {
144 let mut s = MembershipState::new();
145 s.add_or_update(member("a", MemberStatus::Up));
146 s.add_or_update(member("b", MemberStatus::Down));
147 s.reachability.unreachable(Address::local("a"), Address::local("b"));
149 assert!(is_converged(&s));
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct LeaderHandoverEvent {
161 pub from: Option<Address>,
162 pub to: Option<Address>,
163}
164
165#[derive(Debug, Default, Clone)]
176pub struct LeaderHandover {
177 previous: Option<Address>,
178}
179
180impl LeaderHandover {
181 pub fn new() -> Self {
182 Self::default()
183 }
184
185 pub fn observe(&mut self, state: &MembershipState) -> Option<LeaderHandoverEvent> {
189 let next = elect_leader(state);
190 if self.previous != next {
191 let event = LeaderHandoverEvent { from: self.previous.clone(), to: next.clone() };
192 self.previous = next;
193 return Some(event);
194 }
195 None
196 }
197
198 pub fn current(&self) -> Option<&Address> {
200 self.previous.as_ref()
201 }
202}
203
204#[cfg(test)]
205mod handover_tests {
206 use super::*;
207
208 fn member(addr: &str, status: MemberStatus) -> Member {
209 let mut m = Member::new(Address::local(addr), vec![]);
210 m.status = status;
211 m
212 }
213
214 #[test]
215 fn first_observation_emits_initial_election() {
216 let mut s = MembershipState::new();
217 s.add_or_update(member("a", MemberStatus::Up));
218 let mut h = LeaderHandover::new();
219 let ev = h.observe(&s).unwrap();
220 assert_eq!(ev.from, None);
221 assert_eq!(ev.to, Some(Address::local("a")));
222 }
223
224 #[test]
225 fn no_event_when_leader_unchanged() {
226 let mut s = MembershipState::new();
227 s.add_or_update(member("a", MemberStatus::Up));
228 let mut h = LeaderHandover::new();
229 h.observe(&s);
230 assert!(h.observe(&s).is_none());
231 }
232
233 #[test]
234 fn leader_leaving_triggers_handover_to_next_member() {
235 let mut s = MembershipState::new();
236 s.add_or_update(member("a", MemberStatus::Up));
237 s.add_or_update(member("b", MemberStatus::Up));
238 let mut h = LeaderHandover::new();
239 h.observe(&s);
240 assert_eq!(h.current(), Some(&Address::local("a")));
241
242 let mut leaving = MembershipState::new();
244 leaving.add_or_update(member("a", MemberStatus::Leaving));
245 leaving.add_or_update(member("b", MemberStatus::Up));
246 assert!(h.observe(&leaving).is_none());
247
248 let mut exiting = MembershipState::new();
250 exiting.add_or_update(member("a", MemberStatus::Exiting));
251 exiting.add_or_update(member("b", MemberStatus::Up));
252 let ev = h.observe(&exiting).unwrap();
253 assert_eq!(ev.from, Some(Address::local("a")));
254 assert_eq!(ev.to, Some(Address::local("b")));
255 }
256
257 #[test]
258 fn leader_becoming_unreachable_triggers_handover() {
259 let mut s = MembershipState::new();
260 s.add_or_update(member("a", MemberStatus::Up));
261 s.add_or_update(member("b", MemberStatus::Up));
262 let mut h = LeaderHandover::new();
263 h.observe(&s);
264 s.reachability.unreachable(Address::local("b"), Address::local("a"));
266 let ev = h.observe(&s).unwrap();
267 assert_eq!(ev.from, Some(Address::local("a")));
268 assert_eq!(ev.to, Some(Address::local("b")));
269 }
270
271 #[test]
272 fn no_eligible_members_emits_to_none() {
273 let mut s = MembershipState::new();
274 s.add_or_update(member("a", MemberStatus::Up));
275 let mut h = LeaderHandover::new();
276 h.observe(&s);
277
278 let mut empty = MembershipState::new();
280 empty.add_or_update(member("a", MemberStatus::Removed));
281 let ev = h.observe(&empty).unwrap();
282 assert_eq!(ev.from, Some(Address::local("a")));
283 assert_eq!(ev.to, None);
284 }
285
286 #[test]
287 fn handover_through_full_cluster_lifecycle() {
288 let mut h = LeaderHandover::new();
289
290 let mut s1 = MembershipState::new();
292 for n in ["a", "b", "c"] {
293 s1.add_or_update(member(n, MemberStatus::Up));
294 }
295 assert_eq!(h.observe(&s1).unwrap().to, Some(Address::local("a")));
296
297 let mut s2 = MembershipState::new();
299 s2.add_or_update(member("a", MemberStatus::Removed));
300 for n in ["b", "c"] {
301 s2.add_or_update(member(n, MemberStatus::Up));
302 }
303 assert_eq!(h.observe(&s2).unwrap().to, Some(Address::local("b")));
304
305 let mut s3 = MembershipState::new();
307 for n in ["a", "b"] {
308 s3.add_or_update(member(n, MemberStatus::Removed));
309 }
310 s3.add_or_update(member("c", MemberStatus::Up));
311 assert_eq!(h.observe(&s3).unwrap().to, Some(Address::local("c")));
312 }
313}